天津智能网站建设费用,网站域名登录不了,深圳市建设行业主管部门官方网站,wordpress调用相关文章加速本文是我们名为“ EAI的Spring集成 ”的学院课程的一部分。 在本课程中#xff0c;向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来#xff0c;您将深入研究Spring Integration的基础知识#xff0c;例如通道#xff0c;转换器和适配器。 在这… 本文是我们名为“ EAI的Spring集成 ”的学院课程的一部分。 在本课程中向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来您将深入研究Spring Integration的基础知识例如通道转换器和适配器。 在这里查看 目录 1.简介 2.什么是Spring Integration 3. Spring Integration消息传递系统的核心概念 3.1讯息 3.2消息通道 3.3消息端点 4.组成 4.1通道适配器 4.2变压器 4.3过滤器 4.4路由器 4.5拆分器和聚合器 4.6轮询器 4.7消息桥 4.8消息处理程序链 5.同步和异步通信 5.1信息渠道 5.2网关 6.错误处理 1.简介 在第二篇教程中您将学习构成Spring Integration核心的基本概念。 在解释了这些概念之后我们将审查项目随附的不同组件。 此修订版基于3.0.1版本。 考虑到4.0.0版本即将发布您可能会发现一些本教程中未介绍的新组件。 无论如何您将获得足够的框架知识以了解未来组件的行为。 总结本教程您将学习Spring Integration如何支持不同类型的通信异步和同步以及该决定如何影响您的设计。 错误处理是一种特殊情况上一节对此进行了说明。 本教程由以下部分组成 介绍 什么是Spring Integration Spring Integration消息传递系统的核心概念 组件 同步和异步通讯 错误处理 2.什么是Spring Integration 如上一节中所述Spring Integration基于Enterprise Integration Patterns一书中解释的概念。 这是一个轻量级的消息传递解决方案它将为您的Spring应用程序添加集成功能。 作为消息传递策略它提供了一种快速共享信息的方式并且所涉及的组件或应用程序之间具有高度的去耦性。 您将学习如何在Spring处理任何底层基础架构问题的同时完成此任务。 这将使您可以专注于业务逻辑。 当前Spring Integration配置主要基于xml尽管开始包含一些注释。 本教程中显示的示例也将基于xml尽管我将尽可能显示其各自的注释。 在解释了这一点之后出现了一个问题Spring Integration可以做什么 该框架基本上允许您执行以下操作 它允许基于内存中的消息传递在应用程序中的组件之间进行通信。 这允许这些应用程序组件彼此松散耦合并通过消息通道共享数据。 图1 它允许与外部系统通信。 您只需要发送信息 Spring Integration将处理将其发送到指定的外部系统并在必要时带回响应。 当然这是相反的。 Spring Integration将处理从外部系统到您的应用程序的传入调用。 本教程稍后将对此进行解释。 图2 Spring Integration面向Spring框架的最佳实践例如使用接口进行编程或基于继承技术的组合。 它的主要优点是 组件之间的松耦合。 面向事件的体系结构。 集成逻辑由框架处理与业务逻辑分离。 在下一部分中您将学习此消息传递系统所基于的三个基本概念。 3. Spring Integration消息传递系统的核心概念 消息驱动的体系结构的基本概念是 消息 消息通道和消息端点 。 该API非常简单 消息发送到端点 端点通过MessageChannel连接在端点之间 端点可以从MessageChannel接收消息 3.1讯息 一条消息包含将在应用程序的不同组件之间共享或发送到外部系统的信息。 但是这是什么信息 消息的结构如下 图3 如下面的代码片段所示消息是一个接口以GenericMessage作为其主要实现也由框架提供 图4 标头 包含有关消息的元信息。 如果检查MessageHeaders类您将看到它只是Map的包装但是其插入操作标记为不支持。 框架这样标记它们因为消息被认为是不可变的。 创建消息后您将无法对其进行修改。 您可以以键值对的形式添加自己的标头但它们主要用于传递传输信息。 例如如果您要发送电子邮件它将包含标题例如tosubjectfrom。 有效负载 这只是一个普通的Java类其中包含您要共享的信息。 它可以是任何Java类型。 如果要创建消息则有两种选择。 第一个涉及使用构建器类 MessageBuilder 。 MessageString message MessageBuilder.withPayload(my message payload).setHeader(key1, value1).setHeader(key2, value2).build(); 构建消息之前必须先设置有效负载和必需的标头因为一旦创建了消息除非创建新消息否则将无法执行该操作。 另一个选择是使用框架提供的实现 MapString, Object headers new HashMap();
headers.put(key1, value1);
headers.put(key2, value2);MessageString message new GenericMessageString(my message payload, headers);3.2消息通道 消息通道是连接端点和消息通过的管道。 生产者将消息发送到渠道而消费者则从渠道接收消息。 通过这种机制您不需要任何类型的经纪人。 消息通道也可以用作拦截点或用于消息监视。 图5 根据消息的使用方式消息通道分类如下 3.2.1点对点 消息通道上只有一个接收器。 好吧这并非完全是100正确。 如果是可订阅的频道则可以有多个接收者但只有一个可以处理该消息。 现在请忘记这一点因为这是一个高级主题将在本课程的后面部分介绍调度程序配置。 这种类型的渠道有几种实现方式 图6 DirectChannel 实现SubscribableChannel 。 该消息通过同一接收者的线程发送给订户。 此通信是同步的并且生产方将阻塞直到收到响应为止。 这个怎么运作 生产者将消息发送到通道。 QueueChannel 实现PollableChannel 。 有一个端点连接到通道没有订阅者。 这种通信是异步的。 接收者将通过其他线程检索消息。 这个怎么运作 生产者将消息发送到通道。 ExecutorChannel 实现SubscribableChannel 。 发送被委托给TaskExecutor。 这意味着send方法将不会阻塞。 PriorityChannel 实现PollableChannel 。 与QueueChannel相似但是消息按优先级而不是FIFO排序。 RendezvousChannel 实现PollableChannel 。 与QueueChannel类似但容量为零。 生产者将阻塞直到接收者调用其receive方法。 3.2.2发布-订阅 该通道可以有多个端点订阅。 因此该消息将由不同的接收者处理。 图7 PublishSubscribeChannel 实现SubscribableChannel 。 订阅的接收者可以通过生产者线程连续调用。 如果我们指定TaskExecutor则接收器将通过不同的线程并行调用。 3.2.3临时频道 这是一种特殊的通道由没有明确定义输出通道的端点自动创建。 创建的通道是点对点匿名通道。 您可以在消息头中的replyChannel名称下看到它的定义。 发送响应后会自动删除这些类型的通道。 建议您不要显式定义输出通道如果不需要。 该框架将为您处理。 图8 3.3消息端点 它的目标是以非侵入方式将应用程序与消息传递框架连接。 如果您熟悉Spring MVC则端点将以与MVC控制器处理HTTP请求相同的方式处理消息。 端点将以MVC控制器映射到URL模式的相同方式映射到消息通道。 图9 以下是带有可用消息端点的简短描述的列表 通道适配器 将应用程序连接到外部系统单向。 网关 将应用程序连接到外部系统双向。 服务激活器 可以调用服务对象上的操作。 转换程序 转换消息的内容。 过滤器 确定消息是否可以继续发送到输出通道。 路由器 决定将消息发送到哪个通道。 拆分器 将邮件拆分为几个部分。 聚合器 将多个消息合并为一个消息。 本教程的下一部分将说明这些端点中的每个端点。 4.组成 在本节中您将学习什么是不同的端点以及如何在Spring Integration中使用它们。 4.1通道适配器 通道适配器是允许您的应用程序与外部系统连接的端点。 如果您查看参考您将看到所提供的类型例如连接到JMS队列MongoDB数据库RMIWeb服务等。 适配器有四种类型 入站通道适配器 单向。 它从外部系统接收消息。 然后它通过消息通道进入我们的消息传递系统我们将在其中进行处理。 出站通道适配器 单向。 我们的消息系统创建一条消息并将其发送到外部系统。 入站网关 双向。 一条消息进入应用程序并期望得到响应。 响应将被发送回外部系统。 出站网关 双向。 该应用程序创建一条消息并将其发送到外部系统。 然后网关将等待响应。 4.2变压器 该端点用于有效负载转换。 它将有效载荷的类型转换为另一种类型。 例如从String到XML文档。 只要考虑到转换有效负载会产生一条新消息请记住该消息是不可变的。 这种类型的端点增加了生产者与消费者之间的松散耦合因为消费者不需要知道生产者是什么类型的。 转换器将负责处理并交付用户正在等待的内容类型。 Spring Integration提供了Transformer的几种实现 。 这里有些例子 HeaderEnricher允许在消息中添加标题值。 ObjectToMapTransformer将对象转换为地图将其属性转换为地图值。 ObjectToStringTransformer将对象转换为字符串。 它通过调用其toString操作对其进行转换。 PayloadSerializingTransformer / PayloadDeserializingTransformer从Object转换为字节数组 反之亦然 。 让我们看几个例子 假设我们有以下模型 public class Order implements Serializable {private static final long serialVersionUID 1L;private int id;private String description;public Order() {}public Order(int id, String description) {this.id id;this.description description;}Overridepublic String toString() {return String.valueOf(this.getId());}//Setters Getters
} 当此消息发送到名为“ requestChannel”的消息通道时以下代码段将通过调用Order实例的toString方法将其自动转换为String int:object-to-string-transformer input-channelrequestChannel output-channeltransformedChannel/ 结果字符串将被发送到名为transformedChannel的输出通道。 如果需要更定制的转换则可以实现自己的转换器这是一个普通的bean。 您将需要在transformer元素中指定引用的bean如下所示 int:transformer refmyTransformer methodtransforminput-channelrequestChannel output-channeltransformedChannel/ 转换器将调用名为“ myTransformer”的bean的“ transform”方法。 该bean如下所示 Component(myTransformer)
public class MyTransformer {public Order transform(Order requestOrder) {return new Order(requestOrder.getId(), requestOrder.getDescription()_modified);}
} 在此示例中变压器元素的method属性不是必需的因为变压器只有一种方法。 如果它有几种方法则需要设置“ method”属性以告知框架要调用的方法。 或者如果您更喜欢注释则可以在方法级别使用Transformer注释指定方法 Component(myTransformer)
public class MyTransformer {Transformerpublic Order transform(Order requestOrder) {return new Order(requestOrder.getId(), requestOrder.getDescription()_modified);}public Order doOtherThings(Order requestOrder) {//do other things}
}4.3过滤器 过滤器用于确定消息是否应继续其发送方式或者相反是否已丢弃。 要决定要做什么它基于一些标准。 以下过滤器实现将从输入通道接收Order实例并丢弃带有无效描述的实例。 有效订单将发送到输出通道 int:filter refmyFilter methodfilterInvalidOrders input-channelrequestChannel output-channelfilteredChannel/ 过滤器方法返回布尔类型。 如果返回false则该消息将被丢弃 Component(myFilter)
public class MyFilter {public boolean filterInvalidOrders(Order order) {if (order null || invalid order.equals(order.getDescription())) {return false;}return true;}
} 与转换器一样仅当在filter bean中定义了多个method method属性才是必需的。 要指定您要调用的方法请使用Filter批注 Filter
public boolean filterInvalidOrders(Order order) { Spring表达语言 如果您的过滤器非常简单则可以跳过任何Java类来实现过滤器。 您可以使用SpEL定义过滤器。 例如以下代码片段将实现与上述相同的过滤器但没有Java代码 int:filter expression!payload.description.equals(invalid order) input-channelrequestChannel output-channelfilteredChannel/ 丢弃消息 使用默认配置丢弃的消息只是被静默丢弃。 我们可以更改它如果我们决定这样做我们有两个选择 1.我们可能不想丢失任何消息。 在这种情况下我们可以抛出一个异常 int:filter expression!payload.description.equals(invalid order) input-channelrequestChannel output-channelfilteredChannelthrow-exception-on-rejectiontrue/ 2.我们要注册所有丢弃的消息。 我们可以配置一个丢弃通道 int:filter expression!payload.description.equals(invalid order) input-channelrequestChannel output-channelfilteredChanneldiscard-channeldiscardedOrders/4.4路由器 路由器允许您根据条件将消息重定向到特定的消息通道。 与往常一样该框架提供了一些最基本的实现。 以下示例使用有效负载类型路由器。 它将从请求通道接收消息并且根据有效负载的类型它将把它发送到另一个输出通道 int:payload-type-router input-channelrequestChannelint:mapping typeString channelstringChannel/int:mapping typeInteger channelintegerChannel/
/int:payload-type-router 您可以在此处查看完整列表。 现在让我们回到订单示例我们将实现一个路由器该路由器将根据订单说明重定向消息。 int:router refmyRouter input-channelrequestChannel default-output-channelgenericOrders/ 路由器实现包含一个方法该方法返回将消息重定向到的消息通道的名称 Component(myRouter)
public class MyRouter {public String routeOrder(Order order) {String returnChannel genericOrders;if (order.getDescription().startsWith(US-)) {returnChannel usOrders;}else if (order.getDescription().startsWith(EU-)) {returnChannel europeOrders;}return returnChannel;}
} 如果有几种方法可以使用Router批注 Router
public String routeOrder(Order order) { 与过滤器相同您可以基于Spring表达式语言路由消息。 4.5拆分器和聚合器 拆分器的目标是接收消息并将其划分为几个部分。 这些零件然后分别发送以便可以独立处理。 该端点通常与聚合器组合。 聚合器获取消息列表并将它们组合为一条消息。 这与拆分器相反。 您将通过一个示例更好地看到这一点 我们将修改订单示例以便拆分器接收订单包。 该软件包包含拆分器将分离的几个相关订单。 拆分器获取订单包并返回订单列表 int:splitter input-channelrequestChannel refmySplitter output-channelsplitChannel/ 拆分器的实现非常简单 Component(mySplitter)
public class MySplitter {public ListOrder splitOrderPackage(OrderPackage orderPackage) {return orderPackage.getOrders();}
} 拆分器返回订单列表但它可以返回以下任意值 消息的集合或数组。 Java对象的集合或数组。 每个列表元素将作为消息有效内容包含在内。 一个消息。 一个Java对象将包含在消息有效负载中。 在此示例之后有一个聚合器端点该端点连接到“ splitChannel”通道。 该聚合器获取列表并合并其订单以形成订单确认并添加每个订单的数量 int:channel idsplitChannel/int:aggregator refmyAggregator input-channelsplitChannel output-channeloutputChannel/ 聚合器实现 Component(myAggregator)
public class MyAggregator {public OrderConfirmation confirmOrders(ListOrder orders) {int total 0;for (Order order:orders) {total order.getQuantity();}OrderConfirmation confirmation new OrderConfirmation(3);confirmation.setQuantity(total);return confirmation;}
} 4.5.1相关和发布策略 当消息由拆分器端点拆分时将设置两个标头 MessageHeaders.CORRELATION_ID MessageHeaders.SEQUENCE_SIZE 聚合器端点使用这些标头能够正确组合消息。 它将保留消息直到准备好一组具有相同相关性ID的消息为止。 何时准备就绪 达到序列大小后即可准备就绪。 相关策略 允许对消息进行分组。 默认情况下它将在CORRELATION_ID标头中将所有具有相同值的消息分组。 有几种策略可供选择。 发布策略 默认情况下当一组消息的大小达到消息头SEQUENCE_SIZE指定的值时它将被视为完整。 4.6轮询器 在Spring Integration中有两种类型的使用者 活跃的消费者 被动消费者 被动组件是那些订阅了可订阅频道的组件。 这样当消息发送到这种类型的通道时该通道将调用其订户。 消费者的方法将被被动调用。 活动组件是连接到可轮询通道的组件。 这样消息将排队进入通道等待用户主动从通道中检索消息。 轮询器用于指定活动使用者如何检索这些消息。 以下是几个示例 基本轮询器配置 它将在一秒钟的间隔内轮询消息通道 int:service-activator methodprocessOrder input-channelpollableChannel reforderProcessorint:poller fixed-rate1000/
/int:service-activator 使用Cron表达式配置的轮询器 它将每30分钟轮询一次消息通道 int:service-activator methodprocessOrder input-channelpollableChannel reforderProcessorint:poller cron0 0/30 * * * ?/
/int:service-activator 要考虑的一件事是如果使用者连接到可轮询的频道则将需要一个轮询器。 如果没有将引发异常。 如果不想为每个活动的使用者配置轮询器则可以定义一个默认轮询器 int:poller iddefaultPoller fixed-rate1000 defaulttrue/ 不要忘记设置default和id属性。 4.7消息桥 这种类型的端点连接两个消息通道或两个通道适配器。 例如您可以将SubscribableChannel通道连接到PollableChannel通道。 这是一个示例 int:channel idrequestChannel/int:bridge input-channelrequestChannel output-channelpollableChannel/int:channel idpollableChannelint:queue capacity5/
/int:channelint:service-activator methodprocessOrder input-channelpollableChannel reforderProcessor/int:poller iddefaultPoller fixed-rate1000 defaulttrue/ 在此示例中消息传递桥从输入通道接收消息并将其发布到输出通道。 在这种情况下我们将服务激活器连接到输出通道。 订单处理器服务激活器将每隔一秒钟轮询一次消息通道。 4.8消息处理程序链 当您有多个以线性方式连接的消息处理程序时消息处理程序链可用于简化配置。 下面的示例向您展示一个消息传递配置该配置将通过处理程序链进行简化 int:channel idrequestChannel/
int:channel idresponseChannel/int:filter refmyFilter methodfilterInvalidOrders input-channelrequestChannel output-channelfilteredChannel/int:channel idfilteredChannel/int:transformer refmyTransformer methodtransforminput-channelfilteredChannel output-channeltransformedChannel/int:channel idtransformedChannel/int:service-activator methodprocessOrder input-channeltransformedChannel reforderProcessor output-channelresponseChannel/ 消息通过过滤器然后到达转换器最后消息将由服务激活器处理。 完成后消息将发送到输出通道“ responseChannel”。 使用消息过滤器链配置将简化如下 int:channel idrequestChannel/
int:channel idresponseChannel/int:chain input-channelrequestChannel output-channelresponseChannelint:filter refmyFilter methodfilterInvalidOrders/int:transformer refmyTransformer methodtransform/int:service-activator reforderProcessor methodprocessOrder/
/int:chain5.同步和异步通信 如本课程的第一篇教程中所述通信可以同步或异步执行。 本节说明如何更改此通信。 5.1信息渠道 根据您配置消息通道的方式将同步或异步检索消息。 无需更改很多东西只需更改配置即可。 例如假设我们有一个类似下面的点对点直接渠道 int:channel idrequestChannel/ 发送到该通道的消息将立即传递给被动使用者订户。 如果期望得到响应则发件人将等待直到将其发送给他。 为了改变这一点我们只需要添加一个队列 int:channel idrequestChannelint:queue capacity5/
/int:channel 而已。 现在该通道最多可以将五个消息排队。 使用者将从与发件人不同的线程中主动检索在此通道中排队的消息。 现在发布-订阅频道如何 让我们以配置同步通道为例 int:publish-subscribe-channel idmySubscribableChannel/ 在这种情况下我们将使用任务执行程序来更改其行为 int:publish-subscribe-channel idmySubscribableChannel task-executormyTaskExecutor/task:executor idmyTaskExecutor pool-size5/5.2网关 网关是一种通道适配器可用于 提供消息传递系统的进入/退出机制。 这样应用程序可以将消息发送到消息传递系统消息传递系统将通过其消息端点对其进行处理。 向外部系统发送消息并等待响应输出网关 接收来自外部系统的消息并在处理后发送响应入站网关。 本示例使用第一种情况。 该应用程序将通过网关发送消息并等待消息传递系统对其进行处理。 在这里我们将使用同步网关。 因此测试应用程序将发送消息并阻止等待响应。 介面 网关将捕获对其sendOrder方法的所有调用。 看到没有该接口的实现。 网关将包装它以拦截那些呼叫。 public interface OrderService {Gatewaypublic OrderConfirmation sendOrder(Order order);
} 配置 网关链接到接口以便拦截其呼叫并将消息发送到消息传递系统。 int:gateway default-request-channelrequestChannel service-interfacexpadro.spring.integration.service.OrderService/int:channel idrequestChannel/ 考试 服务接口网关被注入到应用程序中。 调用sendOrder方法会将Order对象发送到消息传递系统并包装在消息中。 Autowired
private OrderService service;Test
public void testSendOrder() {OrderConfirmation confirmation service.sendOrder(new Order(3, a correct order));Assert.assertNotNull(confirmation);Assert.assertEquals(confirmed, confirmation.getId());
} 在另一个示例中测试类将阻塞直到将订单确认发送回为止。 现在我们将对其进行配置以使其异步 介面 唯一的变化是返回未来 public interface OrderService {Gatewaypublic FutureOrderConfirmation sendFutureOrder(Order order);
} 考试 现在测试必须处理将从网关返回的Future对象。 Autowired
private OrderService service;Test
public void testSendCorrectOrder() throws ExecutionException {FutureOrderConfirmation confirmation service.sendFutureOrder(new Order(3, a correct order));OrderConfirmation orderConfirmation confirmation.get();Assert.assertNotNull(orderConfirmation);Assert.assertEquals(confirmed, orderConfirmation.getId());
}6.错误处理 本教程的最后一部分将说明错误处理的差异具体取决于我们配置的通信类型是同步还是异步。 在同步通信中发送者在使用同一线程将消息发送到消息传递系统时阻塞。 显然如果引发了异常它将到达应用程序上一节示例中的测试。 但是在异步通信中使用者从另一个线程检索消息。 如果引发异常它将无法到达应用程序。 Spring Integration如何处理它 这是错误通道进入的地方。 引发异常时它将包装到MessagingException中 成为新消息的有效内容。 该消息发送至 错误通道此通道在原始消息头中定义为名为“ errorChannel”的头。 全局错误通道如果在消息头中未定义任何错误通道则将其发送到全局错误通道。 这个通道是Spring Integration默认定义的。 全局错误通道 该频道是发布-订阅频道。 这意味着我们可以将自己的端点订阅到此通道并接收引发的任何错误。 实际上Spring Integration已经预订了一个端点一个日志处理程序。 该处理程序记录发送到全局错误通道的所有消息的有效负载。 要订阅另一个端点以处理异常我们只需要按如下方式配置它 int:service-activator input-channelerrorChannel refmyExceptionHandler methodhandleInvalidOrder/bean idmyExceptionHandler classxpadro.spring.integration.activator.MyExceptionHandler/ 我们的服务激活器端点的handleInvalidOrder方法将收到消息传递异常 public class MyExceptionHandler {ServiceActivatorpublic void handleInvalidOrder(MessageMessageHandlingException message) {//Retrieve the failed order (payload of the source message)Order requestedOrder (Order) message.getPayload().getFailedMessage().getPayload();//Handle exception...}
}翻译自: https://www.javacodegeeks.com/2015/09/spring-integration-fundamentals.html