唐山网站推广优化,广州市企业网站建设怎么样,制作网页页面,建设音乐网站的目的目录 概述1 声朋一个简单的集成流1.1 使用XML定义集成流1.2 使用Java配置集成流1.3 使用Spring lntegration 的 DSL 配置 2 Spring integration 功能概览2.1 消息通道2.2 过滤器2.3 转换器2.4 路由器2.5 切分器2.6 服务激活器2.7 网关2.8 通道适配器2.9 端点模块 概述
就像我们… 目录 概述1 声朋一个简单的集成流1.1 使用XML定义集成流1.2 使用Java配置集成流1.3 使用Spring lntegration 的 DSL 配置 2 Spring integration 功能概览2.1 消息通道2.2 过滤器2.3 转换器2.4 路由器2.5 切分器2.6 服务激活器2.7 网关2.8 通道适配器2.9 端点模块 概述
就像我们需要连接互联网才能提高生产效率一样很多应用都需要连接外部系统才能完成它们的功能。应用程序可能需要读取或发送电子邮件、与外部 API 交或者对写人数据库的数据做出反应。而且由于数据是在外部系统读取或写人的应用可能需要以某种方式处理这些数据将其转换为应用程序自己的领域类。 因此在本文中我们会看到如何使用 Spring Integration 实现通用的集成模式。Spring Integration 是众多集成模式的现成实现这些模式在Gregor Hohpe 和 Bobby Woolf编写的Enterprise Integration Patterns (Addison-Wesley2003 年)中进行了归类。每个模式都实现为一个组件消息会通过该组件在管道中传递数据。借助 Spring 配置可以将这些组件组装成一个管道数据可以通过这个管道来流动。我们从定义一个简单的集成流开始这个流包含了Spring Integration 的众多特性和特点。
1 声朋一个简单的集成流
通常来讲 Spring Integration 可以创建集成流通过集成流应用程序能够接收向应用程序之外的资源发送数据。应用程序可能集成的资源之一就是文件系统。因此Spring integration 的很多组件都有读入和写入文件的通道适配器(channel adapter)。为了熟悉 Spring Integration我们会创建一个集成流这个流会写人数据到文件综中。首先需要添加Spring Integration 到项目的构建文件中。对于Maven 构建来讲必要的依赖如下所示: dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-integration/artifactIdversionX.X.X.RELEASE/version/dependencydependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-file/artifactIdversionX.X.X.RELEASE/version/dependency第一项依赖是 Spring Integration的 Spring Boot starter。不管我们与哪种流进行交互对于Spring Integration 流的开发来讲,这个依赖都是必需的。与所有的Spring Boot starter 一样在 Initializr表单中这个依赖也可以通过复选框选择。 第二项依赖是Spring Integration的文件端点模块。这个模块是与外部系统集成的20余模块之一。我们会在 2.9 小节中更加详细地讨论端点模块。但是目前我们只需要知道文件端点模块提供了将文件从文件系统导人集成流和将流中的数据写人文件系统的能力。 接下来我们需要为应用创建一种方法让它能够发送数据到集成流中这样它才能写人文件。为了实现这一点我们需要创建一个网关接口这样的网关接口程序如下
package sia6;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;MessagingGateway(defaultRequestChannel textInChannel) //声明消息网关
public interface FileWriterGateway {void writeToFile(Header(FileHeaders.FILENAME) String filename, String data);
}尽管这只是一个很简单的Java 接口但是关于 FileWriterGateway,有很多东西需要介绍。我们首先看到它使用了MessagingGateway 注解。这个注解会告诉 Spring Integration 要在运行时生成该接口的实现这与 Spring Data 在运行时生成存储库接口的实现非常类似。其他地方的代码在希望写人文件时将会调用它。 MessagingGateway 的 defaultRequestChannel属性表明接口方法调用时所返回的消息要发送至给定的消息通道(message channel)。在本例中我们声明调用 writeToFile() 所形成的消息应该发送至名为 textInChannel的通道中。 对于writeToFile()方法来说,它以 String类型的形式接受一个文件名,另外一个String包含了要写人文件的文本。关于这个方法的签名还需要注意 filename 参数上带有Header。在本例中Header 注解表明传递给 filename 的值应该包含在消息头信息中(通过 FileHeaders.FILENAME 声明它将会被解析成file_name)而不是放到消息载荷(payload)中。 现在我们已经有了消息网关接下来就需要配置集成流了。尽管我们往构建文件中添加的 Spring Integration starter 依赖能够启用 Spring Integration 的自动配置功能但是满足应用需求的流定义则需要我们自行编写额外的配置。在声明集成流方面我们有3种配置方案可供选择:
XML 配置;Java 配置;使用DSL的Java配置。
我们会依次了解Spring Integration 的这3种配置风格,从较为老式的XML配置开始。
1.1 使用XML定义集成流
尽管在开发中我们应尽量避免使用XML配置但是Spring Integration 有使用XML定义集成流的漫长历史。所以展现一个 XML 定义集成流的样例还是很有价值的。下面的程序展现了如何使用XML配置示例集成流。
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:inthttp://www.springframework.org/schema/integrationxmlns:int-filehttp://www.springframework.org/schema/integration/filexsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/integrationhttp://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/filehttp://www.springframework.org/schema/integration/file/spring-integration-file.xsd!-- 声明textInChannel--int:channel idtextInChannel /!-- 转换文本--int:transformer idupperCaseinput-channeltextInChanneloutput-channelfileWriterChannelexpressionpayload.toUpperCase() /!-- 声明WriterChanncl--int:channel idfileWriterChannel/!-- 将文本写人文件 低版本可能不支持特性 append-new-line--int-file:outbound-channel-adapter idwriterchannelfileWriterChanneldirectory/tmp/sia6/filesmodeAPPENDappend-new-linetrue auto-create-directorytrue/
/beans讲解
我们首先配置了一个名为 textInChannel 的通道。可以发现,它就是 FileWriterGateway的请求通道。当FileWriterGateway 的 writeToFile()方法被调用的时候结果形成的消息会发布到这个通道上。我们还配置了一个转换器(transformer )它会从 textInChannel 接收消息。它使用Spring 表达式语言(Spring Expression LanguageSpEL)为消息载荷调用toUpperCase()方法。进行大写操作之后的结果会发布到 fileWriterChannel 上。随后我们配置了名为 fileWriterChannel 的通道。这个通道会作为一根导线将转换器与出站通道适配器 (outbound channel adapter) 连接在一起。 最后我们使用 int-file 命名空间配置了出站通道适配器。这个 XML 命名空间是由Spring Integration 的文件模块提供的实现文件写入的功能。按照我们的配置它从fileWriterChannel接收消息并将消息的载荷写入一个文件这个文件的名称是由消息头信息中的 file_name属性指定的,而存入的目录则是由这里的 directory 属性指定的。如果文件已经存在会以新行的方式进行追加文件内容而不会覆盖原文件。
图,使用Enterprise Integration PatternsEIP 中的图形元素样式阐述了这个流。 这个流包含了 5 个组件:一个网关、两个通道、一个转换器和一个通道适配器。能够组装到集成流中的组件有很多这只是其中很少的一部分。我们会在第 2 节讨论这些组件以及 Spring Integration 支持的其他组件。
如果想要在 Spring Boot 应用中使用XML 配置需要将XML作为源导人Spring应川最简单的实现方式就是在应用的某个Java 配置类上用Spring的ImportRerource 注解
Confiquration
ImportResource(classpath:/filowrltor-config.xml)
public class FileWriterIntegrationConfig {....
} 尽管基于XML的配置能够很好地用于 Spring integration,但是大多数的开发人员对千XML 的使用越来越谨慎。(尽量避免使用 XML 配置。现在我们抛开尖括号看一下 Spring Integration 的 Java 配置风格。
1.2 使用Java配置集成流
大多数的现代 Spring 应用程序都会避免使用XML 配置而更加青睐于 Java 配置。实际上在 Spring Boot 应用中Java 配置是自动化配置功能更自然的补充形式。因此如果要为 Spring Boot 应用添加集成流最好使用Java 来定义流程。 下列程序展示了使用 Java 配置编写集成流的一个样例。这里的代码依然是功能相同的文件写人集成流但是这次我们使用 Java 来实现。
package sia6;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;import java.io.File;Configuration
public class FileWriterIntegrationConfig {Bean// 声明转换器Transformer(inputChannel textInChannel, outputChannel fileWriterChannel)public GenericTransformerString, String upperCaseTransformer() {return text - text.toUpperCase();}Bean// 声明文件写人器ServiceActivator(inputChannel fileWriterChannel)public FileWritingMessageHandler fileWriter() {FileWritingMessageHandler handler new FileWritingMessageHandler(new File(/tmp/ala6/tilea));handler.setExpectReply(false);handler.setFileExistsMode(FileExistsMode.APPEND);handler.setAppendNewLine(true);return handler;}
}在Java 配中我们声明了两个 bean: 一个转换器和一个文件写入消息处理器。这里的转换器是 GenericTransformer。因为 GenericTransformer 是函数式接口所以我们可以使用 lambda 表达式为其提供实现这里调用了消息文本的 toUpperCase() 方法。我们为转换器 bean 使用了Transformer 注解这样会将其声明成集成流中的一个转换器。他接受来自 textInChannel 通道的消息然后将消息写人名为 fileWriterChannel 的通道。
而负则文件写人的 bean 则使用了ServicActivator 注解表明它会接受来fileWriterChannel 的消息并且会将消息传递给 FileWritingMessageHandler 实例所定义的服务。FileWritingMessageHandler 是一个消息处理器可以将消息的载荷写入特定目录下的文件而文件的名称是通过消息的 file_name 头信息指定的。与XML 样例类似FileWritingMessageHandler 也配置为以新行的方式为文件追加内容。
FileWritingMessageHandler bean 的一个独特之处在于它调用了 setExpectReply(false)方法,能够通过这个方法告知服务激活器(service activator)不要期望存在答复通道(reply channel通过这样的通道我们可以将某个值返回到流中的上游组件 )。如果我们不调用setExpectReply(false),那么文件写入 bean 的默认值是 true尽管管道的功能和预期一样但是在日志中会看到一些错误信息提示我们没有设置答复通道。
你会发现我们在这里没有必要显式声明通道。如果名为 textInChannel 和fileWriterChannel的 bean 不存在这两个通道将会自动创建。但是如果想要更加精确地控制通道如何配置可以按照如下的方式显式构建这些 bean: Beanpublic MessageChannel textInChannel() {return new DirectChannel();}Beanpublic MessageChannel fileWriterChannel() {return new DirectChannel();}基于 Java 的配置方案可能更易于阅读、更简洁也符合倡导的纯 Java配置风格。但是如果使用 Spring Integration 的 Java DSL配置风格配置过程可以更加流畅·。
1.3 使用Spring lntegration 的 DSL 配置
我们再次尝试文件写人集成流的定义。这一次我们依然使用 Java 进行定义但是会使用Spring Integration的 Java DSL。我们不再将流中的每个组件都声明为单独的bean,而是使用一个 bean 来定义整个流程序如下所示为集成流的设计提供一个流畅的API。
package sia6;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;import java.io.File;Configuration
public class FileWriterIntegrationConfig {Beanpublic IntegrationFlow fileWriterFlow() {return IntegrationFlows.from(MessageChannels.direct(textInChannel)).String, Stringtransform(t - t.toUpperCase())//声明转换器.handle(Files.//处理文件写人outboundAdapter(new File(/tmp/sia6/files)).fileExistsMode(FileExistsMode.APPEND).appendNewLine(true)).get();}
}这种新的配置方式在一个bean 方法中定义了整个流做到了尽可能简洁。IntegrationFlows类初始化构建器 API我们可以通过这个API来定义流。 在上面程序中我们首先从名为 textInchannel的通道接收消息然后消息进人一个转换器这个转换器会将消息载荷转换成大写形式。在转换器之后消息会交由出站通道适配器处理这个适配器是由 Spring Integration file模块的 Files类型创建的。最后通过对 get()的调用返回要构建的IntegrationFlow。简言之这个 bean 方法定义了与XML和Java配置样例相同的集成流。 你可能已经发现与 Java 配置样例类似我们不需要显式声明通道 bean。我们引用了textInChannel如果该名字对应的通道不存在Spring Integration 会自动创建它。不过我们也可以显式声明 bean。 对于连接转换器和出站通道适配器的通道我们甚至没有通过名字引用它。如果需要显式配置通道可以在流定义的时候通过调用 channel()来引用它的名称: Beanpublic IntegrationFlow fileWriterFlow() {return IntegrationFlows.from(MessageChannels.direct(textInChannel)).String, Stringtransform(t - t.toUpperCase())//声明转换器.channel(MessageChannels.direct(FileWriterChannel)).handle(Files//处理文件写人.outboundAdapter(new File(/tmp/sia6/files)).fileExistsMode(FileExistsMode.APPEND).appendNewLine(true)).get();}使用 Spring Integration 的Java DSL (与其他的 fluent API类似)时必须要巧妙地使用空格来保持可读性。在这里的样例中我小心翼翼地使用缩进来保证代码块的可谈性。对于更长、更复杂的流我们甚至可以考虑将流的一部分抽取到单独的方法或子流中以实现更好的可读性。 现在我们已经看到了如何使用3 种不同的方式来定义一个简单的流接下来我们回过头来看一下Spring Integration 的全景。
2 Spring integration 功能概览
Spring Integration 涵盖了大量的集成场景。如果想将所有的内容放到一章中就像把一头大象装进信封一样不现实。在这里只会向你展示 Spring Integration 这头大象的照片而不是对 Spring Integration 进行面面俱到的讲解目的就是让你能够了解它是如何运行的。随后我们会再创建一个集成流为 Taco Cloud 应用添加新的功能。 集成流是由一个或多个如下介绍的组件组成的。在继续编写代码之前我们先看一下这些组件在集成流中所扮演的角色。 通道 (channel): 将消息从一个元素传递到另一个元素。 过滤器 (filter): 基于某些断言条件化地允许某些消息通过流。 转换器(transformer): 改变消息的值、将消息载荷从一种类型转换成另一种类型。 路由器(router): 将消息路由至一个或多个通道通常会基于消息的头信息进行路由。 切分器(splitter): 将传入的消息切分成两份或更多份然后发送至不同的通道。 聚合器(aggregator): 与切分器的操作相反将来自不同通道的多个消息合并成一个消息。 服务激活器(service activator): 将消息传递给某个 Java 方法处理并将返回值发布到输出通道上。 通道适配器 (channel adapter): 将通道连接到某些外部系统或传输方式。可以接受输入也可以写出到外部系统。 网关 (gateway): 通过接口将数据传递到集成流中。
在定义文件写人集成流时我们已经看过其中的一些组件了。FileWriterGateway 是个网关通过它应用可以提交要写人文件的文本。我们还定义了一个转换器将给定的文本转换成大写的形式随后我们定义了一个出站通道适配器它执行将文本写人文件的任务。这个流有两个通道:textInChannel 和 fileWriterChannel它们将应用中的其他组件连接在一起。现在我们按照承诺快速看一下这些集成流组件。
2.1 消息通道
消息通道是消息穿行集成通道的一种方式如下图。它们是连接 Spring Integration其他组成部分的管道。 Spring Integration 提供了多种通道实现。
PublishSubscribeChannel: 发送到 PublishSubscribeChannel的消息会传递到一个或多个消费者中。如果有多个消费者则它们都会接收到消息。QueueChannel: 发送到 QueueChannel 的消息会存储到一个队列中按照 FIFO的方式被拉取。如果有多个消费者只有其中的一个消费者会接收到消息。PriorityChannel:与QueueChannel类似但它不是FIFO的方式而是会基于消息的 priority 头信息被消费者拉取。RendezvousChannel:与 QueueChannel 类似但是发送者会一直阻塞通道直到消费者接收到消息。它实际上会同步发送者和消费者。DirectChannel:与 PublishSubscribeChannel 类似但是消息只会发送至一个消费者。它会在与发送者相同的线程中调用消费者。这种方式允许跨通道的事务.。ExecutorChannel:与 DirectChannel类似但消息分发是通过 TaskExecutor 实现的这样会在与发送者独立的线程中执行。这种通道类型不支持跨通道的事务。FluxMessageChannel: 反应式流的发布者消息通道基于 Reactor 项目的 Flux。
在 Java 和JavaDSL 中输人通道都自动创建的默认使用 DirectChannel 但是如果想要使用不同的通道实现就需要将通道声例为 bean 并在集应流中引用它。可例如要声明 PublishSubscribeChannel需要明如下的Bean 方法 Beanpublic MessageChannel orderChannel(){return new PublishSubscribeChannel();}随后可以在集成流定义中根据通道名称引用它。例如如果这个通道要被一个服务激活器 bean 所消费我们可以在ServiceActivator 注解的 inputChannel 属性中用它: ServicenActivator(inputChannel orderChannel)或者使用Java DSL配置风格可以调用 channel() 来引用它: Beanpublic IntegrationFlow orderFlow() {return IntegrationFlows.....channel(orderChannel)....get();}很重要的一点是如果使用 QueueChannel消费者必须配置一个 poller。例如假设我们声明了一个这样的QueueChannel bean: Beanpublic MessageChannel orderChannel() {return new QueueChannel();}那么我们需要确保消费者配置成轮询该通道的消息。如果是消息激活器ServiceActivator 注解可能会如下所示 ServiceActivator(inputChannel orderChannel,poller Poller(fixedRate 1000))在本例中服务激活器每秒(或者说每 1000 毫秒)都会轮询名为 orderChannel 的通道。
2.2 过滤器
过滤器放置于集成管道的中间它能够根据断言允许或拒绝消息进入流程的下一步。 例如假设消息包含了整型的值要通过名为 numberChannel 进行发布但是我们只想让偶数进人名为 evenNumberChannel 的通道。在这种情况下可以使用 Filter 注解定义一个过滤器: Filter(inputChannel numberChannel,outputChannel evenNumberChannel)public boolean evenNumberFilter(Integer number) {return number % 2 0;}作为替代方案如果使用 Java DSL 配置风格来定义集成流可以按照如下的方式来调用 filter(): Beanpublic IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {return IntegrationFlows....Integer filter((p) - p % 2 0)....get();}在本例中我们使用 lambda 表达式来实现过滤器。但实际上filter()方法会接受GenericSelector 作为参数。这意味着如果我们的过滤器过于复杂不适合放到一个简单的lambda 表达式中那么我们可以实现 GenericSelector 接口作为替代方案。
2.3 转换器
转换器会对消息执行一些操作一般会导致不同的消息形成还有可能会产生不同的载荷类型(如图所示)。转换过程可以非常简单比如执行数字的数学运算或者操作String值。转换过程也可以比较复杂比如根据代表 ISBN 的 String 值查询并返回对应图书的详细信息。 例如假设整型值会通过名为 numberChannel 的通道进行发布我们希望将这些宇转换成它们的罗马数字形式以 String 类型来表示。在这种情况下可以声明一个GenericTransforer类型的 bean 并为其添加Transfommer 注解: BeanTransformer(inputChannel numberChannel, outputChannel romanNumberChannel)public GenericTransformerInteger, String romanNumTransformer() {return RomanNumbers::toRoman;}Transformer注解可以将这个 bean 声明为转换器 bean,它会从名为 numberChannel的通道接收 Integer 值然后使用静态方法 toRoman()进行转换(toRoman()是静态方法定义在名为 RomanNumbers 的类中这里使用方法引用来使用它)。转换后的结果会发布到名为romanNumberChannel的通道中。 在Java DSL配置风格中调用 transform()会更加简单我们只需将对toRoman()的方法引用传递进来: Beanpublic IntegrationFlow transformerFlow() {return IntegrationFlows....transform(RomanNumbers::toRoman)....get();}尽管这两个转换器代码中都使用了方法引用但是转换器也可以使用 lambda 表达式声明。或者如果转换器足够复杂需要使用一个单独的类那么可以将其作为一个bean注人流定义并将引用传递给 transform()方法: Bean public RomanNumberTransformer romanNumberTransformer() {return new RomanNumberTransformer();}Beanpublic IntegrationFlow transformerFlow(RomanNumberTransformer romanNumberTransformex) {return IntegrationFlows....transform(romanNumberTransformer)....get();}在这里,我们声明了RomanNumberTransformer类型的bean,它本身是Spring Integration Transfomer 或 GenericTransfomer 接口的实现。这个bean注人了 tansformerFlow()方法并且在定义集成流的时候传递给了 transform()方法。
2.4 路由器
路由器能够基于其个路由断言实现集成流的分支从而将消息发送至不同的通道上如图所示。 例如很设我们有一个名为 numberChannel 的通道它会传输整型值。我们想要将带有偶数的消息定向到名为 evenChannnel 的通道将带有奇数的消息定向到名为 oddChannel的通道。要在集成流中创建这样一个路由器我们可以声明一个 AbstractMessageRouter类型的 bean并为其添加Router 注解: BeanRouter(inputChannel numberChannel)public AbstractMessageRouter evenOddRouter() {return new AbstractMessageRouter() {Overrideprotected CollectionMessageChanneldetermineTargetChannels(Message? message) {Integer number (Integer) message.getPayload();if (number % 2 0) {return Collections.singleton(evenChannel());}return Collections.singleton(oddChannel());}};}Beanpublic MessageChannel evenChannel() {return new DirectChannel();}Beanpublic MessageChannel oddChannel() {return new DirectChannel();}这里定义的AbstractMessageRouter 接收名为 numberChannel 的输人通道的消息。它的实现以匿名内部类的形式检查消息的载荷,如果是偶数,返回名为 evenChannel 的通道(在路由器 bean 之后同样以 bean 的方式进行了声明)。否则通道载荷中的数字必然是奇数在这种情况下返回名为 oddChannel 的通道 (同样以 bean 方法的形式进行了声明)。
在Java DSL 风格中路由器是通过在流定义中调用 route()方法来声明的如下所式 Beanpublic IntegrationFlow numberRoutingFlow(AtomicInteger source) {return IntegrationFlows....Integer, String route(n - n % 2 0 ? EVEN : ODD, mapping - mapping.subFlowMapping(EVEN, sf - sf.Integer, Integertransform(n - n * 10).handle((i, h) - {...})).subFlowMapping(ODD, sf - sf.transform(RomanNumbers::toRoman).handle((i, h) - ( ... )))).get();}尽管我们依然可以定义 AbstractMessageRouter 并将其传递到 route()但是在这个样例中使用了 lambda 表达式来确定消息载荷是偶数还是奇数:对于偶数返回 EVEN:对于奇数返回 ODD。然后这些值会用来确定该使用哪个子映射处理消息。
2.5 切分器
在集成流中有时候将一个消息切分为多个消息独立处理可能会非常有用。切分器将会负责切分并处理这些消息如图所示。 在很多场景中切分器都非常有用尤其是以下两种特殊的场景。
消息载荷中包含了相同类型条目的一个列表。我们希望将它们作为单独的消息载荷来进行处理。例如消息中携带了一个商品列表可以切分为多个消息,每个消息的载荷分别对应一件商品。消息载荷所携带的信息尽管有所关联但是可以拆分为两个或更多个不同类型的消息。例如一个购买订单可能会包含投递信息、账单、商品项的信息。可以将投递细节交由某个子流来处理账单交由另一个子流来处理而商品项再交由其他的子流来处理。在这种情况下切分器后面通常会紧跟一个路由器根据消息的载荷类型进行路由确保数据都由正确的子流处理。
在我们将消息我荷切分为两个成更多个不同类型的消息时通常定义一个POJO 就足够了。它提取传人消息不同的组成部分并将其以元素集合的形式返回。 例如假设我们想要将带有购买订单的消息切分为两个消息其中一个会携带账单信息另一个携带商品项的信息。如下的 OrderSplitter 就可以完成该任务; public class OrderSplltter {public CollectionObject splitOrderIntoParts(PurchaseOrder po) {ArrayLlstObject parts new ArrayList();parts.add(po.getBillIngInfo()) ;parts.add(po.getLineItema());return Parts;}}接下来我们声明一个 OrderSplitter bean并通过Splitter 注解将其作为集成流的一部分: BeanSplitter(inputChannel poChannel,outputChannelsplitOrderChannel)public OrderSplitter orderSplitter() {return new OrderSplitter();}在这里购买订单会到达名为 poChannel 的通道它们会被 OrderSplitter 切分。然后所返回集合中的每个条目都会作为集成流中独立的消息发布到名为 splitOrderChannel 的通道中。此时我们可以在流中声明一个 PayloadTypeRouter将账单信息和商品项分别路由至它们自己的子流: BeanRouter(inputChannel splitOrderChannel)public MessageRouter splitOrderRouter() {PayloadTypeRouter router new PayloadTypeRouter();router.setChannelMapping(BillingInfo.class.getName(), billingInfoChannel);router.setChannelMapping(List.class.getName(), lineItemsChannel);return router;}顾名思义PayloadTypeRouter 会根据消息的载荷将它们路由至不同的通道。按照这里的配置载荷为 BillingInfo 类型的消息将会被路由至名为 billingInfoChannel 的通道供后续进行处理。至于商品项它们会放到一个java.util.List 集合中因此我们将 List类型的载荷映射到名为lineItemsChannel的通道中。 按照目前的状况流将会被切分成两个子流一个是 BillingInfo 对象的流另一个则是 ListLineItem的流。假设我们想要进一步进行拆分例如不想处理 LineItems 的列表而是想要分别处理每个 LineItem又该怎么办呢? 要将商品列表拆分为多个消息其中每个消息包含一个条目只需要编写一个方法(而不是一个bean)这个方法带有Splitter 注解并且返回 LineItem 的集合如下所示: Splitter(inputChannellineItemsChannel, outputChannellineItemChannel)public ListLineItem lineItemSplitter(ListLineItem lineItems) {return lineItems;}当带有 ListLineItem载荷的消息抵达名为 lineItemsChannel 通道时消息会进入lineItemSplitter()。按照切分器的规则这个方法必须要返回切分后条目的集合。在本例中我们已经有了 LineItem 的集合所以我们直接返回这个集合就可以了。这样做的结果是集合中的每个 LineItem 都会发布到一个消息中这些消息会被发送到名为 lineItemChannel的通道中。如果想要使用 Java DSL 声明相同的 splitter/router 配置则可以通过调用 split()和route()来实现: return IntegrationFlows.from(MessageChannels.direct(textInChannel)).split(orderSplitter()).Object, Stringroute(p - {if (p.getClass().isAssignableFrom(BillingInfo.class)) {return BILLING INFO;} else {return LINE ITEMS;}}, mapping - mapping.subFlowMapping(BILLING INFO, sf - sf.BillingInfohandle((billingInfo, h) - {...})).subFlowMapping(LINE ITEMS, sf - sf.split().LineItemhandle((lineItem, h) - {...}))).get();DSL 所组成的流定义相当简洁但是可能有点难以理解。它使用与 Java 配置样例相同的 OrderSplitter 来切分订单。我们可以将 lambda 表达式抽取到方法中使其更为整洁例如使用如下所示的 3 个方法来取代流定义中的 lambda 表达式:
private String route(Object p) {return p.getClass().isAssignableFrom(BillingInfo.class)?BILLING INFO:LINE ITEMS;
}private BillingInfo handleBillingInfo(BillingInfo billingInfo, MessageHeaders h){// ...
}private LineItem handleLineItems(LineItem lineItem, MessageHeaders h) {// ...
}
然后使用方法引用重写集成流:
return IntegrationFlows....split().route(this::route,mapping - mapping.subFlowMapping(BILLING INFO,sf - sf.BillingInfo handle(this::handleBillingInfo)).subFlowMapping(LINE ITEMS,sf - sf.split().LineItem handle(this::handleLineItems)));不管采用哪种方式都会像 Java 配置样例那样使用相同的 OrderSplitter 的切分订单。在订单切分之后根据类型路由至两个独立的子流。
2.6 服务激活器
服务激活器接收来自输人通道的消息并将这些消息发送至一个 MessageHandler 的实现如图所示。 Spring Integration 提供了多个 “开箱即用” 的MessageHandler (PayloadTypeRouter甚至就是 MessageHandler 的一个实现)但是我们通常会需要为其提供一些自定义的实现作为服务激活器。作为样例如下的代码展现了如何声明 MessageHandler bean 并将其配置为服务激活器 BeanServiceActivator(inputChannel someChannel)public MessageHandler sysoutHandler() {return message - {System.out.println(Message payload: message.getPayload());};}这个 bean 使用了ServiceActivator 注解表明它会作为一个服务活器处来自someChannel 通道的消息。对于 MessageHandler 本身它是通过一个 lambda 表达式现的。这是一个简单的 MessageHandler当得到消息之后它会将消息的载荷打印至标准输出流。 我们还可以声明一个服务激活器让它在返回新载荷之前处理输入消息中的数据在这种情况下bean 应该是 GenericHandler而不是 MessageHandler: BeanServiceActivator(inputChannel orderChannel,outputChannel completeChannel)public GenericHandlerEmailOrder orderHandler(OrderRepository orderRepo) {return (payload, headers) - {return orderRepo.save(payload);};}在本例中服务激活器是一个GenericHandler它会接收载荷类型为 EmailOrder 的消息。订单抵达时我们会通过一个存储库将它保存起来并返回保存之后的EmailOrder这个 EmailOrder 随后被发送至名为 completeChannel的输出通道。 你可能已经注意到了GenericHandler 不仅能够得到载荷还能得到消息头(虽然我们这个样例根本没有用到这些头信息 )。我们还可以在Java DSL配置风格中使用服激活器只需将 MessageHandler 或 GenericHandler 传递到流定义的 handle()方法中: public IntegrationFlow someFlow() {return IntegrationFlows....handle(msg - {System.out.println(Message payload: msg.getPayload());}).get();}在本例中MessageHandler 会得到一个 lambda 表达式但是我们也可以为其提供个方法引用甚至实现 MessageHandler 接口的类实例。如果想要为其提供 lambda 表达式或方法引用需要记住它们均接受消息作为其参数。 类似地如果不想将服务激活器作为流的终点handle()还可以接受 GenericHandler如果要将前面提到的订单保存服务激活器添加进来,可以按照如下的形式使用 Java DSL配置流: public IntegrationFlow orderFlow(OrderRepository orderRepo) {return IntegrationFlows....EmailOrder handle((payload, headers) - {return orderRepo.save(payload);})....get();}使用 GenericHandler 时lambda 表达式或方法引用会接受消息载荷和头信息作为参数。如果选择使用 GenericHandler 作为流的终点就需要其返回 null否则就会出现错误提示没有指定输出通道。
2.7 网关
通过网关,应用可以提交数据到集成流中,并且能够可选地接收流的结果作为响应,网关会声明为接口借助 Spring Integration 的实现应用可以调用它来向集成流发送消息(如图所示)。 我们已经看过消息网关的样例也就是 FileWriterGateway。FileWriterGateway 是一个单向的网关有一个接受 String 类型的方法该方法会将文本写入到文件中并返回void。编写双向的网关同样简单。在编写网关接口时需要确保方法要返回某个值以便推送到集成流中。 作为样例假设网关面对的是一个简单的集成流这个流会接受一个 String 并将给定的 String 转换成全大写的形式。这个网关接口大致如下所示:
package sia6;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;Component
MessagingGateway(defaultRequestChannel inChannel,defaultReplyChannel outChannel)
public interface UpperCaseGateway {String uppercase(String in);
}
让人开心的是这个接口不需要实现。Spring lntegration 会在运行时自动提供一个通过特定通道发送和接收消息的实现。
当uppercase()被调用时给定的 String 会发布到集成流中进人名为inChannel通道。不管流是如何定义的、千了些什么当数据进入名为 outChannel 通道时都会从uppercase()方法返回。 我们这个用以转换大写格式的集成流是一个非常简单的流只需要将一个 String 转换成大写格式的步骤。它可以通过Java DSL配置声明如下: Beanpublic IntegrationFlow uppercaseFlow() {return IntegrationFlows.from(inChannel).String, Stringtransform(s - s.toUpperCase()).channel(outChannel).get();}按照这里的定义这个流随着进入 inChannel 通道的数据开始。消息荷会由转换器处理执行大写操作(在这里是通过lambda 表达式定义的)。形成的结果消息会到名为outChannel的通道也就是我们在UpperCaseGateway中声明的答复通道。
2.8 通道适配器
通道适配器代表了集成流的入口和出口。数据通过入站通道适配器(inbound channel adapter)进人一个集成流通过出站通道适配器离开一个集成流。如图所示。 根据要引人集成流的数据源入站通道适配器可以有很多形式。例如我们可以声明一个入站通道适配器将来自 AtomicInteger 的、不断递增的数引入流。使用Java配置则如下所示: BeanInboundChannelAdapter(poller Poller(fixedRate 1000), channel numberChannel)public MessageSourceInteger numberSource(AtomicInteger source) {return () - {return new GenericMessage(source.getAndIncrement());};}这个Bean方法通过InboundChannelAdapter 注解声明了一个入站通道适配器它根据注人的 AtomicInteger 每隔一秒(也就是 1000 毫秒提交一个数字给名为 numberChannel 的通道。 使用 Java 配置时我们可以通过InboundChannelAdapter 注解声明入站通道适配器而使用Java DSL 定义集成流时我们需要使用 form()方法完成同样的事情。如下的流定义展现了类似的入站通道适配器它是使用 Java DSL 定义的: Beanpublic IntegrationFlow someFlow (AtomicInteger integerSource) {return IntegrationFlows.from(integerSource,getAndIncrement,c - c.poller(Pollers.fixedRate(1000)))....get();}通常通道适配器是由 Spring Integration 的众多端点模块提供的。假设我们需要一个人站通道适配器监控一个特定的目录并将写入该目录的文件以消息的形式提交到 file-channel通道中。如下的 Java 配置使用来自 Spring Integration file 端点模块的 FileReadingMessageSource 实现该功能: BeanInboundChannelAdapter(channelfile-channel,pollerPoller(fixedDelay1000))public MessageSourceFile fileReadingMessageSource(){FileReadingMessageSource sourceReader new FileReadingMessageSource();sourceReader.setDirectory(new File(INPUT_DIR));sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));return sourceReader;}如果要使用 Java DSL 编写同等功能的人站通道适配器可以使用 Files 类的inboundAdapter()方法。出站通道适配器是集成流的终点会将最终的消息传递给应用或其他外部系统: Beanpublic IntegrationFlow fileReaderFlow() {return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)).patternFilter(FILE_PATTERN)).get();}我们通常会将消息激活器实现为消息处理器让它作为出站通道适配器对数据需要传递给应用本身的情况更是如此。我们已经讨论过消息激活器这里就没有必要重复讨论了。 但是要注意Spring Integration 端点模块为多个通用场景提供了消息处理器。在1.2小节使用Java配置来定义集成流的程序中我们已经见过这种出站通道适配器的样例 FileWritingMessageHandler。提到Spring Integration 端点模块不妨看一下都有哪些直接可用的集成端点模块。
2.9 端点模块
Spring Integration 允许我们创建自己的通道适配器这一点非常好但更棒的是Spring Integration 提供了 20 余个包含通道适配器(同时包括入站和出站的适配器)的点模块用于和各种常见的外部系统实现集成如下表。
模块依赖的 artifact ID ( Group ID: org.springframework.integration )AMQPspring-integration-amqp应用事件spring-integration-eventAtom和RSSspring-integration-feed电子邮件spring-integration-mail文件系统spring-integration-fileFTP/FTPSspring-integration-ftpGemFirespring-integration-gemfireHTTPspring-integration-httpJDBCspring-integration-jdbcJMSspring-integration-jmsJMXspring-integration-jmxJPAspring-integration-jpaKafkaspring-integration-kafkaMongoDBspring-integration-mongodbMQTTspring-integration-mqttR2DBCspring-integration-r2dbcRedisspring-integration-redisRMIspring-integration-rmiRSocketspring-integration-rsocketSFTPspring-integration-sftpSTOMPspring-integration-stompStreamspring-integration-streamSyslogspring-integration-syslogTCP/UDPspning-integration-ipWebFluxspring-integration-webfluxWeb Servicesspring-integration-wsWebSocketspring-integration-websocketXMPPspring-integration-xmppZeroMQspring-integration-zeromqZooKeeperspring-integration-zookeeper
从表中可以清楚地看到Spring Integration 提供了用途广泛的一组组件能够满足非常多的集成需求。虽然大多数应用程序使用的功能只是 Spring Integration 所提供功能的九牛一毛但我们最好知道 Spring Integration能够提供哪些功能。 另外我们不可能在一篇文章中介绍表中的所有的通道适配器。我们已经看到了如何使用文件系统模块写人文件的样例。其他的如果需要请自行查阅相关文档。 对于每个端点模块的通道适配器我们可以在 Java 配置中将其声明为 bean也可以在 Java DSL 配置中以静态方法的方式引用它们。我建议你探索一下自己最感兴趣的其他端点模块。你会发现它们在使用方式上是非常一致的。 最后本篇文章到此结束。