网站制作费会计分录怎么做,记事本做网站怎么不行啦,电商设计师和美工有什么区别,网站经营性备案并发查询parallel简单#xff0c;有效和安全的并发是RxJava的设计原则之一。 然而#xff0c;具有讽刺意味的是#xff0c;它可能是该库中最容易被误解的方面之一。 让我们举一个简单的例子#xff1a;假设我们有一堆UUID并且对于每个UUID #xff0c;我们必须执行一组任务… 并发查询parallel 简单有效和安全的并发是RxJava的设计原则之一。 然而具有讽刺意味的是它可能是该库中最容易被误解的方面之一。 让我们举一个简单的例子假设我们有一堆UUID并且对于每个UUID 我们必须执行一组任务。 第一个问题是每个UUID都要执行I / O密集型操作例如从数据库加载对象 FlowableUUID ids Flowable.fromCallable(UUID::randomUUID).repeat().take(100);ids.subscribe(id - slowLoadBy(id)); 首先为了测试我将生成100个随机UUID。 然后对于每个UUID我想使用以下方法加载记录 Person slowLoadBy(UUID id) {//...
} slowLoadBy()的实现是无关紧要的请记住它是缓慢且阻塞的。 使用subscribe()调用slowLoadBy()有许多缺点 subscribe()根据设计是单线程的无法解决。 每个UUID顺序加载 当您调用subscribe() 无法进一步转换Person对象。 这是终端操作 一种更健壮甚至更残破的方法是map()每个UUID FlowablePerson people ids.map(id - slowLoadBy(id)); //BROKEN 这是非常可读的但不幸的是损坏了。 就像订阅者一样运算符也是单线程的。 这意味着在任何给定时间只能映射一个UUID 此处也不允许并发。 更糟糕的是我们从上游继承线程/工作者。 这有几个缺点。 如果上游使用某些专用的调度程序产生事件我们将劫持该调度程序中的线程。 例如许多操作符例如interval() Schedulers.computation()透明地使用Schedulers.computation()线程池。 我们突然开始在完全不适合该功能的池上执行I / O密集型操作。 此外我们通过这一阻塞性顺序步骤降低了整个管道的速度。 非常非常糟糕。 您可能已经听说过这个subscribeOn()运算符以及它如何启用并发。 确实但是在应用它时必须非常小心。 以下示例再次是错误的 import io.reactivex.schedulers.Schedulers;FlowablePerson people ids.subscribeOn(Schedulers.io()).map(id - slowLoadBy(id)); //BROKEN 上面的代码段仍然损坏。 observeOn() subscribeOn() 以及该事件的observeOn() 几乎不会将执行切换到其他工作程序线程而不会引入任何并发性。 流仍然按顺序处理所有事件但是在不同的线程上。 换句话说我们现在不是在从上游继承的线程上顺序使用事件而是在io()线程上顺序使用事件。 那么这个神话般的flatMap()运算符呢 flatMap()运算符可以进行救援 flatMap()运算符通过将事件流分成子流来启用并发。 但首先还有一个破碎的示例 FlowablePerson asyncLoadBy(UUID id) {return Flowable.fromCallable(() - slowLoadBy(id));
}FlowablePerson people ids.subscribeOn(Schedulers.io()).flatMap(id - asyncLoadBy(id)); //BROKEN 哦天哪这还是坏了 flatMap()运算符在逻辑上做两件事 在每个上游事件上应用转换 id - asyncLoadBy(id) –这将产生FlowableFlowablePerson 。 这是有道理的对于每个上游UUID我们都有一个FlowablePerson因此最终得到的是Person对象流 然后flatMap()尝试一次订阅所有这些内部子流。 每当任何子流发出Person事件时它都会作为外部Flowable的结果透明传递。 从技术上讲 flatMap()仅创建并预订前128个默认情况下可选的maxConcurrency参数子流。 同样当最后一个子流完成时 Person外部流也将完成。 现在这到底为什么被打破 除非明确要求否则RxJava不会引入任何线程池。 例如这段代码仍在阻塞 log.info(Setup);
FlowableString blocking Flowable.fromCallable(() - {log.info(Starting);TimeUnit.SECONDS.sleep(1);log.info(Done);return Hello, world!;});
log.info(Created);
blocking.subscribe(s - log.info(Received {}, s));
log.info(Done); 仔细查看输出特别是涉及的事件和线程的顺序 19:57:28.847 | INFO | main | Setup
19:57:28.943 | INFO | main | Created
19:57:28.949 | INFO | main | Starting
19:57:29.954 | INFO | main | Done
19:57:29.955 | INFO | main | Received Hello, world!
19:57:29.957 | INFO | main | Done 没有任何并发没有额外的线程。 仅将阻塞代码包装在Flowable中不会神奇地增加并发性。 您必须显式使用… subscribeOn() log.info(Setup);
FlowableString blocking Flowable.fromCallable(() - {log.info(Starting);TimeUnit.SECONDS.sleep(1);log.info(Done);return Hello, world!;}).subscribeOn(Schedulers.io());
log.info(Created);
blocking.subscribe(s - log.info(Received {}, s));
log.info(Done); 这次的输出更有希望 19:59:10.547 | INFO | main | Setup
19:59:10.653 | INFO | main | Created
19:59:10.662 | INFO | main | Done
19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting
19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done
19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world! 但是我们上次确实使用了subscribeOn() 这是怎么回事 嗯外部流级别的subscribeOn()基本上说所有事件都应在此流中的不同线程上顺序处理。 我们并没有说应该同时运行许多子流。 并且由于所有子流都处于阻塞状态因此当RxJava尝试订阅所有子流时它会有效地依次依次订阅。 asyncLoadBy()并不是真正的async 因此当flatMap()运算符尝试对其进行订阅时它会阻塞。 修复很容易。 通常您会将subscribeOn()放在asyncLoadBy()但出于教育目的我将其直接放置在asyncLoadBy()道中 FlowablePerson people ids.flatMap(id - asyncLoadBy(id).subscribeOn(Schedulers.io())); 现在它就像一个魅力 默认情况下RxJava将接收前128个上游事件 UUID 将它们转换为子流并订阅所有这些事件。 如果子流是异步且高度可并行化的例如网络调用 asyncLoadBy()获得128个并发调用asyncLoadBy() 。 并发级别128可通过maxConcurrency参数配置 FlowablePerson people ids.flatMap(id -asyncLoadBy(id).subscribeOn(Schedulers.io()),10 //maxConcurrency); 那是很多工作您不觉得吗 并发不应该更具声明性吗 我们不再处理Executor和期货但似乎这种方法太容易出错。 它不能像Java 8流中的parallel()一样简单吗 输入ParallelFlowable 让我们首先来看一下我们的示例并通过添加filter()使它更加复杂 FlowablePerson people ids.map(this::slowLoadBy) //BROKEN.filter(this::hasLowRisk); //BROKEN hasLowRisk()是慢速谓词 boolean hasLowRisk(Person p) {//slow...
} 我们已经知道针对此问题的惯用方法是使用flatMap()两次 FlowablePerson people ids.flatMap(id - asyncLoadBy(id).subscribeOn(io())).flatMap(p - asyncHasLowRisk(p).subscribeOn(io())); asyncHasLowRisk()相当模糊-谓词通过时返回单元素流失败则返回空流。 这是使用flatMap()模拟filter() flatMap() 。 我们可以做得更好吗 从RxJava 2.0.5开始有一个新的运算符叫做… parallel() 令人惊讶的是由于许多误解和滥用在RxJava成为1.0之前已删除了同名的运算符。 2.x中的parallel()似乎最终以一种安全且声明性的方式解决了惯用并发问题。 首先让我们看一些漂亮的代码 FlowablePerson people ids.parallel(10).runOn(Schedulers.io()).map(this::slowLoadBy).filter(this::hasLowRisk).sequential(); 就这样 parallel()和sequential()之间的代码块parallel()运行。 我们有什么在这里 首先新的parallel()运算符将FlowableUUID转换为ParallelFlowableUUID 该API的API比Flowable小得多。 您将在第二秒看到原因。 可选的int参数在我们的例子中为10 定义并发性或者如文档所述定义创建并发“ rails”的数量。 因此对于我们来说我们将单个FlowablePerson分成10个并发的独立轨道认为是thread 。 来自UUID原始流的事件被拆分 modulo 10 为彼此独立的不同轨子流。 将它们视为将上游事件发送到10个单独的线程中。 但是首先我们必须使用方便的runOn()运算符定义这些线程的来源。 这比Java 8流上的parallel()好得多在Java 8流上您无法控制并发级别。 至此我们有了一个ParallelFlowable 。 当事件出现在上游 UUID 中时它将委派给10个“轨道”并发独立的管道之一。 管道提供了可以安全地同时运行的运算符的有限子集例如map()和filter() 还包括reduce() 。 没有buffer() take()等因为一次在多个子流上调用它们的语义尚不清楚。 我们的阻塞slowLoadBy()和hasLowRisk()仍按顺序调用但仅在单个“ rail”内部。 因为我们现在有10个并发的“ rails”所以我们无需花费太多精力就可以有效地并行化它们。 当事件到达子流“轨道”的末尾时它们会遇到sequential()运算符。 该运算符将ParallelFlowable回Flowable 。 只要我们的映射器和过滤器是线程安全的 parallel() / sequential()对就提供了非常简单的并行化流的方法。 一个小警告-您将不可避免地使邮件重新排序。 顺序map()和filter()始终保留顺序就像大多数运算符一样。 但是一旦在parallel()块中运行它们顺序就会丢失。 这允许更大的并发性但是您必须牢记这一点。 是否应该使用parallel()而不是嵌套的flatMap()来并行化代码 这取决于您但是parallel()似乎更容易阅读和掌握。 翻译自: https://www.javacodegeeks.com/2017/09/idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html并发查询parallel