crm网站,常熟外贸网站建设,安陆市网站,网上能注册公司吗怎么注册一、简介 Reactor 是运行在 JVM 上的编程框架#xff0c;最大特点是完全非阻塞#xff0c;能高效控制 “背压”#xff0c;简单来说就是处理数据传输时速度不匹配的问题 。它能和 Java 8 里的一些功能直接搭配使用#xff0c;像处理异步结果的 CompletableFuture、处理数据…一、简介 Reactor 是运行在 JVM 上的编程框架最大特点是完全非阻塞能高效控制 “背压”简单来说就是处理数据传输时速度不匹配的问题 。它能和 Java 8 里的一些功能直接搭配使用像处理异步结果的 CompletableFuture、处理数据序列的 Stream 以及表示时间的 Duration 等。 在 Reactor 里Flux 用来处理多个数据元素Mono 处理零个或一个数据元素并且它严格按照 “响应式扩展规范” 来设计。 此外reactor-ipc 这个组件可以让不同进程间在不互相等待的情况下通信。它为 HTTP含 Websockets、TCP 和 UDP 这些网络协议提供了支持背压的网络引擎很适合用在微服务架构里还能很好地处理响应式的编解码。 响应式编程模型 二、依赖
dependencyManagement dependenciesdependencygroupIdio.projectreactor/groupIdartifactIdreactor-bom/artifactIdversion2023.0.0/versiontypepom/typescopeimport/scope/dependency/dependencies
/dependencyManagement
dependenciesdependencygroupIdio.projectreactor/groupIdartifactIdreactor-core/artifactId /dependencydependencygroupIdio.projectreactor/groupIdartifactIdreactor-test/artifactId scopetest/scope/dependency
/dependencies 三、响应式编程 响应式编程是一种关注于数据流data streams和变化传递propagation of change的异步编程方式。 这意味着它可以用既有的编程语言表达静态如数组或动态如事件源的数据流。
了解历史 在响应式编程的发展进程中微软率先行动在.NET生态系统里创建了响应式扩展库Rx。紧接着RxJava在JVM上实现了响应式编程。后来JVM平台诞生了一套标准的响应式编程规范这套规范定义了一系列标准接口和交互规则还被整合进了Java 9借助Flow类。 响应式编程常被视作面向对象编程中“观察者模式”的拓展。响应式流和“迭代子模式”也有相似之处比如都存在类似Iterable - Iterator这样的对应关系。不过它们的核心区别在于Iterator采用的是“拉取”方式也就是开发者决定何时调用next()方法获取元素属于“命令式”编程范式而响应式流基于“推送”方式当有新数据产生时由发布者Publisher主动通知订阅者Subscriber这种“推送”模式是响应式编程的关键。并且对推送数据的处理是通过声明式的方式即开发者只需描述“控制流程”就能定义对数据流的处理逻辑而不是像命令式编程那样一步步明确指令。 在响应式流里除了数据推送机制错误处理和完成信号的定义也很完善。发布者Publisher不仅能向订阅者Subscriber推送新数据调用onNext方法还能推送错误信号调用onError方法和完成信号调用onComplete方法。一旦出现错误或完成信号响应式流就会终止 。可以用下边的表达式描述 onNext x 0..N [onError | onComplete] 3.1 阻塞是对资源的浪费 现代应用面临着大量并发用户的挑战尽管现代硬件处理能力发展迅速但软件性能依旧是至关重要的因素。
从宏观角度看提升程序性能主要有两种思路 并行化利用更多线程与硬件资源以异步方式处理任务借此提升整体处理效率。 优化执行效率在现有资源的基础上通过优化代码逻辑、算法等手段提高单位时间内的任务处理量。 在 Java 开发中开发者常常采用阻塞式编程方式编写代码。这种方式本身并无不妥当程序出现性能瓶颈时一般会通过增加处理线程来缓解而新增线程中的代码依旧是阻塞式的。然而这种资源使用方式极易引发资源竞争与并发问题。 更为严重的是阻塞式编程会造成资源浪费。举例来讲当程序遭遇延迟常见于 I/O 操作如数据库读写请求或网络调用时对应的线程只能进入空闲状态等待数据返回在此期间线程资源被白白浪费。 由此可见并行化并非解决性能问题的万能良方。它虽然能够挖掘硬件潜力但同时也带来了复杂性并且容易造成资源浪费 。 3.2 异步可以解决问题吗
第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码 任务发起异步调用后执行过程会切换到另一个 使用同样底层资源 的活跃任务然后等 异步调用返回结果再去处理。
但是在 JVM 上如何编写异步代码呢Java 提供了两种异步编程方式 回调Callbacks 异步方法没有返回值而是采用一个 callback 作为参数lambda 或匿名类当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。 Futures 异步方法 立即 返回一个 FutureT该异步方法要返回结果的是 T 类型通过 Future封装。这个结果并不是 立刻 可以拿到而是等实际处理结束才可用。比如 ExecutorService 执行 CallableT 任务时会返回 Future 对象。
这些技术够用吗并非对于每个用例都是如此两种方式都有局限性。
回调很难组合起来因为很快就会导致代码难以理解和维护即所谓的“回调地狱callback hell”。
考虑这样一种情景 在用户界面上显示用户的5个收藏或者如果没有任何收藏提供5个建议。 这需要3个 服务一个提供收藏的ID列表第二个服务获取收藏内容第三个提供建议内容
回调地狱Callback Hell的例子
userService.getFavorites(userId, new CallbackListString() { public void onSuccess(ListString list) { if (list.isEmpty()) { suggestionService.getSuggestions(new CallbackListFavorite() {public void onSuccess(ListFavorite list) { UiUtils.submitOnUiThread(() - { list.stream().limit(5).forEach(uiList::show); });}public void onError(Throwable error) { UiUtils.errorPopup(error);}});} else {list.stream() .limit(5).forEach(favId - favoriteService.getDetails(favId, new CallbackFavorite() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() - uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});Reactor改造后
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup); 如果你想确保“收藏的ID”的数据在800ms内获得如果超时从缓存中获取呢在基于回调的代码中 会比较复杂。但 Reactor 中就很简单在处理链中增加一个 timeout 的操作符即可。
userService.getFavorites(userId).timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup); 额外扩展 Futures 比回调要好一点但即使在 Java 8 引入了 CompletableFuture它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的但却不易。此外Future 还有一个问题当对 Future 对象最终调用 get() 方法时仍然会导致阻塞并且缺乏对多个值以及更进一步对错误的处理。 考虑另外一个例子我们首先得到 ID 的列表然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表整个过程用异步方式来实现。 CompletableFuture 处理组合的例子
CompletableFutureListString ids ifhIds(); CompletableFutureListString result ids.thenComposeAsync(l - { StreamCompletableFutureString zip l.stream().map(i - { CompletableFutureString nameTask ifhName(i); CompletableFutureInteger statTask ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) - Name name has stats stat); });ListCompletableFutureString combinationList zip.collect(Collectors.toList()); CompletableFutureString[] combinationArray combinationList.toArray(new CompletableFuture[combinationList.size()]);CompletableFutureVoid allDone CompletableFuture.allOf(combinationArray); return allDone.thenApply(v - combinationList.stream().map(CompletableFuture::join) .collect(Collectors.toList()));
});ListString results result.join();
assertThat(results).contains(Name NameJoe has stats 103,Name NameBart has stats 104,Name NameHenry has stats 105,Name NameNicole has stats 106,Name NameABSLAJNFOAJNFOANFANSF has stats 121);3.3 从命令式编程到响应式编程
类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足 此外还会关注一下几个方面 可编排性Composability 以及 可读性Readability 使用丰富的 操作符 来处理形如 流 的数据 在 订阅subscribe 之前什么都不会发生 背压backpressure 具体来说即 消费者能够反向告知生产者生产内容的速度的能力 高层次 同时也是有高价值的的抽象从而达到 并发无关 的效果
3.3.1 可编排性与可读性 可编排性简单来说就是能够有条不紊地组织多个异步任务。比如把前一个任务的成果精准传递给下一个任务作为输入或者像 “分而治之再汇总” 的 fork - join 模式那样同时开展多个任务并最终整合结果又或者把异步任务当作可复用的离散组件在整个系统中灵活调用。 这种任务编排能力和代码的可读性、可维护性紧密相连。当异步处理的任务数量增多逻辑愈发复杂编写代码就像在荆棘丛中摸索阅读代码更是难上加难。就拿常见的回调模式来说它看似简单可一旦处理逻辑复杂起来回调里嵌套回调一层又一层就会陷入 “回调地狱”。经历过这种痛苦的开发者都知道这样的代码简直是一团乱麻想要读懂、分析清楚实在是太费劲了。 而 Reactor 在这方面表现出色它提供了多种多样的编排操作。借助这些操作代码能够清晰直观地展现处理流程所有操作基本都维持在同一层级尽可能避免了令人头疼的嵌套结构 。
3.3.2 就像装配流水线 想象一下在响应式应用里处理数据的过程就如同产品在装配流水线上作业。Reactor就好比这条流水线的传送带同时还充当着装配工人和机器人的角色。数据就像原材料从源头也就是最初的Publisher开始流动经过一道道工序加工最终变成成品等待被传送给消费者即Subscriber。 在这个过程中原材料会经历各种各样的中间处理环节可能会和其他半成品进行组装。要是流水线某个地方出现问题比如齿轮卡住了或者某个产品的包装耗时太长那么这个工位就可以给上游发送信号让它们少送点原材料过来甚至暂停供应。
3.3.3 操作符Operators 在 Reactor 中操作符operator就像装配线中的工位操作员或装配机器人。每一个操作符 对 Publisher 进行相应的处理然后将 Publisher 包装为一个新的 Publisher。就像一个链条 数据源自第一个 Publisher然后顺链条而下在每个环节进行相应的处理。最终一个订阅者 (Subscriber终结这个过程。请记住在订阅者Subscriber订阅subscribe到一个 发布者Publisher之前什么都不会发生。
理解了操作符会创建新的 Publisher 实例这一点能够帮助你避免一个常见的问题 这种问题会让你觉得处理链上的某个操作符没有起作用。
虽然响应式流规范Reactive Streams specification没有规定任何操作符 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作 到过滤操作甚至复杂的编排和错误处理操作。
3.3.4 subscribe() 之前什么都不会发生 在 Reactor 中当你创建了一条 Publisher 处理链数据还不会开始生成。事实上你是创建了 一种抽象的对于异步处理流程的描述从而方便重用和组装。 当真正“订阅subscrib”的时候你需要将 Publisher 关联到一个 Subscriber 上然后才会触发整个链的流动。这时候Subscriber 会向上游发送一个 request 信号一直到达源头的 Publisher。
3.3.5 背压 在响应式编程里向上游传递信号这一机制在实现背压方面发挥着关键作用。这就好比在真实的装配线上要是某个工位处理产品的速度跟不上流水线的整体速度它就会向上游发送反馈信号提醒调整生产节奏。 响应式流规范中定义的相关机制和上述装配线的例子十分相似。订阅者在接收数据时有两种方式一种是毫无限制地接收任由数据源头 “开足马力”推送所有数据另一种则是通过 request 机制明确告知数据源头自己一次最多能够处理 n 个元素。 不仅如此中间环节的操作同样会对 request 产生影响。例如有一种缓存buffer操作它能把每 10 个元素打包成一批。这时如果订阅者只请求 1 个元素对于数据源头而言其实需要生成 10 个元素因为缓存操作是按批处理的。此外预取策略也可以在此发挥作用比如在订阅开始前提前生成一些元素。 如此一来原本单一的 “推送” 模式就演变成了 “推送 拉取” 的混合模式。下游如果已经准备就绪便可以主动从上游拉取 n 个元素但要是上游的元素尚未准备好下游也只能乖乖等待上游推送 。
3.3.6 热Hot vs 冷Cold
在 Rx 家族的响应式库中响应式流分为“热”和“冷”两种类型区别主要在于响应式流如何 对订阅者进行响应 一个“冷”的序列指对于每一个 Subscriber都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求对于每一个订阅都会创建一个新的 HTTP 请求。 一个“热”的序列指对于一个 Subscriber只能获取从它开始 订阅 之后 发出的数据。不过注意有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说一个“热”的响应式流甚至在即使没有订阅者接收数据的情况下也可以发出数据这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突。
什么问题都可以评论区留言看见都会回复的
如果你觉得本篇文章对你有所帮助的把“文章有帮助的”打在评论区
多多支持吧
点赞加藏评论是对小编莫大的肯定。抱拳了