网站设计的基本过程,做网站公司流程,枣庄网站制作费用,网站开发中的渲染是什么意思本文是我们名为“ Spring Integration for EAI ”的学院课程的一部分。 在本课程中#xff0c;向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来#xff0c;您将深入研究Spring Integration的基础知识#xff0c;例如通道#xff0c;转换器和适… 本文是我们名为“ Spring Integration for EAI ”的学院课程的一部分。 在本课程中向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来您将深入研究Spring Integration的基础知识例如通道转换器和适配器。 在这里查看 目录 1.简介 2.准备环境 3. JMS适配器接收 3.1。 入站通道适配器活动接收 3.2。 入站通道适配器无源接收 4. JMS适配器发送 5.使用网关 6.消息转换 7. JMS支持的消息通道 8.动态目标解析 9. AMQP集成 9.1。 安装 9.2。 演示应用 1.简介 本教程重点介绍如何将应用程序与Spring Integration和JMS消息传递集成。 为此我将首先向您展示如何安装Active MQ它将是本教程中的代理。 下一部分将显示使用Spring Integration JMS通道适配器发送和接收JMS消息的示例。 在这些示例之后我们将看到一些通过配置消息转换和目标解析来自定义这些调用的方法。 本教程的最后一部分简要介绍了如何将Spring Integration与AMQP协议一起使用。 它将完成RabbitMQ的安装最后给出一个基本的消息传递示例。 本教程由以下部分组成 介绍 准备环境 JMS适配器接收 JMS适配器发送 使用网关 讯息转换 JMS支持的消息通道 动态目的地解析 AMQP集成 2.准备环境 如果要通过JMS发送消息则首先需要一个代理。 本教程中包含的示例是通过Active MQ一种开源消息传递代理执行的。 在本节中我将帮助您安装服务器并实现一个简单的Spring应用程序以测试它是否已正确设置。 该说明基于Windows系统。 如果您已经安装了服务器则跳过此部分。 第一步是从Apache.org下载Apache MQ服务器。 下载完成后只需将其解压缩到您选择的文件夹中即可。 要启动服务器你只需要执行其位于Apache的ActiveMQ的-5.9.0 \ bin文件夹中文件的ActiveMQ。 图1 好的服务器正在运行。 现在我们只需要实现该应用程序。 我们将创建一个生产者一个使用者一个spring配置文件和一个测试。 制片人 您可以使用任何Java类代替我的TicketOrder对象。 public class JmsProducer {AutowiredQualifier(jmsTemplate)private JmsTemplate jmsTemplate;public void convertAndSendMessage(TicketOrder order) {jmsTemplate.convertAndSend(order);}public void convertAndSendMessage(String destination, TicketOrder order) {jmsTemplate.convertAndSend(destination, order);}
} 消费者 public class SyncConsumer {Autowiredprivate JmsTemplate jmsTemplate;public TicketOrder receive() {return (TicketOrder) jmsTemplate.receiveAndConvert(test.sync.queue);}
} Spring配置文件 bean idconsumer classxpadro.spring.integration.consumer.SyncConsumer/
bean idproducer classxpadro.spring.integration.producer.JmsProducer/!-- Infrastructure --
bean idconnectionFactory classorg.apache.activemq.ActiveMQConnectionFactoryproperty namebrokerURL valuetcp://localhost:61616 /
/beanbean idcachingConnectionFactory classorg.springframework.jms.connection.CachingConnectionFactoryproperty nametargetConnectionFactory refconnectionFactory/
/beanbean idjmsTemplate classorg.springframework.jms.core.JmsTemplateproperty nameconnectionFactory refcachingConnectionFactory/property namedefaultDestination refsyncTestQueue/
/bean!-- Destinations --
bean idsyncTestQueue classorg.apache.activemq.command.ActiveMQQueueconstructor-arg valuetest.sync.queue/
/bean 考试 ContextConfiguration(locations {/xpadro/spring/integration/test/jms-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestJmsConfig {Autowiredprivate JmsProducer producer;Autowiredprivate SyncConsumer consumer;Testpublic void testReceiving() throws InterruptedException, RemoteException {TicketOrder order new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplates default destinationproducer.convertAndSendMessage(order);Thread.sleep(2000);TicketOrder receivedOrder consumer.receive();assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
} 如果测试通过则说明所有设置正确。 现在我们可以转到下一部分。 3. JMS适配器接收 Spring Integration提供了多个适配器和网关来接收来自JMS队列或主题的消息。 下面简要讨论这些适配器 入站通道适配器 它在内部使用JmsTemplate主动从JMS队列或主题接收消息。 消息驱动通道适配器 内部使用Spring MessageListener容器被动接收消息。 入站通道适配器活动接收 本节说明如何使用上一节中介绍的第一个适配器。 JMS入站通道适配器主动轮询队列以从中检索消息。 由于它使用轮询器因此您必须在Spring配置文件中对其进行定义。 适配器检索到消息后它将通过指定的消息通道发送到消息传递系统中。 然后我们可以使用端点如转换器过滤器等来处理消息也可以将其发送给服务激活器。 本示例从JMS队列检索票单消息并将其发送到服务激活器服务激活器将对其进行处理并确认订单。 通过将订单发送到某种存储库来确认该订单该存储库具有包含所有已注册订单的简单列表。 我们使用与“ 2准备环境”部分中相同的生产者 bean idproducer classxpadro.spring.integration.producer.JmsProducer/!-- Infrastructure --
!-- Connection factory and jmsTemplate configuration --
!-- as seen in the second section --!-- Destinations --
bean idtoIntQueue classorg.apache.activemq.command.ActiveMQQueueconstructor-arg valueint.sync.queue/
/bean 测试将使用生产者将消息发送到“ toIntQueue”。 现在我们将设置Spring Integration配置 Integration-jms.xml context:component-scan base-packagexpadro.spring.integration/int-jms:inbound-channel-adapter idjmsAdapter destinationtoIntQueue channeljmsChannel/int:channel idjmsChannel/int:service-activator methodprocessOrder input-channeljmsChannel refticketProcessor/int:poller idpoller defaulttrue fixed-delay1000/ JMS入站通道适配器将使用定义的轮询器从“ toIntQueue”中检索消息。 您必须为适配器配置轮询器否则它将抛出运行时异常。 在这种情况下我们定义了一个默认的轮询器。 这意味着任何需要轮询的端点都将使用此轮询器。 如果未配置默认轮询器则需要为每个主动检索消息的端点定义一个特定的轮询器。 消费者 服务激活器只是一个bean通过组件扫描自动检测到 Component(ticketProcessor)
public class TicketProcessor {private static final Logger logger LoggerFactory.getLogger(TicketProcessor.class);private static final String ERROR_INVALID_ID Order ID is invalid;Autowiredprivate OrderRepository repository;public void processOrder(TicketOrder order) {logger.info(Processing order {}, order.getFilmId());if (isInvalidOrder(order)) {logger.info(Error while processing order [{}], ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}float amount 5.95f * order.getQuantity();TicketConfirmation confirmation new TicketConfirmation(123, order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}private boolean isInvalidOrder(TicketOrder order) {if (order.getFilmId() -1) {return true;}return false;}
} 在前面的代码片段中 processOrder方法接收一个TicketOrder对象并直接对其进行处理。 但是您可以改为定义消息 或Message TicketOrder以便接收消息。 这样您将可以访问消息的有效负载及其标题。 还要注意该方法返回void。 由于消息流在此处结束因此我们不需要返回任何内容。 如果需要您还可以定义服务适配器的回复通道并返回确认。 此外例如我们随后将向该回复通道订阅端点或网关以便将确认发送到另一个JMS队列将其发送到Web服务或将其存储到数据库。 最后让我们看一下测试以了解如何执行所有测试 ContextConfiguration(locations {/xpadro/spring/integration/test/jms-config.xml,/xpadro/spring/integration/test/int-jms-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsConfig {Autowiredprivate JmsProducer producer;Autowiredprivate OrderRepository repository;Testpublic void testSendToIntegration() throws InterruptedException, RemoteException {TicketOrder order new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplates default destinationproducer.convertAndSendMessage(int.sync.queue, order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf repository.getConfirmations().get(0);assertEquals(123, conf.getId());}
} 我已将Thread.sleep为四秒钟以等待消息发送。 我们本可以使用while循环来检查是否已收到消息直到达到超时为止。 入站通道适配器无源接收 JMS接收部分的第二部分使用消息驱动的通道适配器。 这样消息一旦发送到队列便会立即传递到适配器而无需使用轮询器。 这是我们向其订阅者传递消息的消息通道。 该示例与上一节中看到的示例非常相似。 我将仅显示在配置中所做的更改。 我从上一个示例更改的唯一内容是spring集成配置 context:component-scan base-packagexpadro.spring.integration/int-jms:message-driven-channel-adapter idjmsAdapter destinationtoIntQueue channeljmsChannel /int:channel idjmsChannel/int:service-activator methodprocessOrder input-channeljmsChannel refticketProcessor/ 我删除了轮询器并更改了消息驱动通道适配器的JMS入站适配器。 而已; 适配器将被动地接收消息并将其传递到jmsChannel 。 请考虑到消息侦听器适配器至少需要以下组合之一 消息侦听器容器。 连接工厂和目的地。 在我们的示例中我们使用了第二个选项。 目标在适配器配置中指定连接工厂在jms-config文件中定义该文件也由测试导入。 4. JMS适配器发送 在上一节中我们已经了解了如何接收外部系统发送到JMS队列的消息。 本节向您展示出站通道适配器使您可以在系统之外发送JMS消息。 与入站适配器相比出站适配器只有一种类型。 该适配器在内部使用JmsTemplate发送消息并且为了配置此适配器您将需要指定以下至少一项 一个JmsTemplate。 连接工厂和目的地。 与入站示例一样我们使用第二个选项将消息发送到JMS队列。 配置如下 对于此示例我们将为jms配置jms-config.xml创建一个新队列。 这是我们的Spring Integration应用程序将消息发送到的位置 bean idtoJmsQueue classorg.apache.activemq.command.ActiveMQQueueconstructor-arg valueto.jms.queue/
/bean 好的现在我们使用JMS出站适配器配置集成配置 context:component-scan base-packagexpadro.spring.integration/int:gateway default-request-channelrequestChannel service-interfacexpadro.spring.integration.service.TicketService/int:channel idrequestChannel/int-jms:outbound-channel-adapter idjmsAdapter channelrequestChannel destinationtoJmsQueue/ 我们正在使用网关作为邮件系统的入口。 测试将使用此接口发送新的TicketOrder对象。 网关将接收消息并将其放入requestChannel通道。 由于它是直接通道 它将被发送到JMS出站通道适配器。 适配器收到一个Spring Integration消息。 然后它可以通过两种方式发送消息 将消息转换为JMS消息。 这是通过将适配器的属性“ extract-payload”设置为true默认值来完成的。 这是我们在示例中使用的选项。 按原样发送消息即Spring Integration消息。 您可以通过将“ extract-payload”属性设置为false来完成此操作。 该决定取决于期望您的消息的系统类型。 如果另一个应用程序是Spring Integration应用程序则可以使用第二种方法。 否则请使用默认值。 在我们的示例中另一端有一个简单的Spring JMS应用程序。 因此我们必须选择第一个选项。 继续我们的示例现在我们看一下测试该测试使用网关接口发送消息并使用自定义使用者接收消息。 在此测试中使用者将扮演一个JMS应用程序的角色该应用程序使用jmsTemplate从JMS队列中检索它 ContextConfiguration(locations {/xpadro/spring/integration/test/jms-config.xml,/xpadro/spring/integration/test/int-jms-out-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutboundConfig {Autowiredprivate SyncConsumer consumer;Autowiredprivate TicketService service;Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order new TicketOrder(1, 5, new Date());service.sendOrder(order);TicketOrder receivedOrder consumer.receive(to.jms.queue);assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
}5.使用网关 除了通道适配器之外Spring Integration还提供了入站和出站网关。 您可能还记得以前的教程网关提供了与外部系统的双向通信这意味着发送和接收或接收和回复操作。 在这种情况下它允许请求或重试操作。 在本节中我们将看到一个使用JMS出站网关的示例。 网关将向队列发送JMS消息并等待答复。 如果未发送回任何答复则网关将抛出MessageTimeoutException 。 Spring Integration配置 context:component-scan base-packagexpadro.spring.integration/int:gateway idinGateway default-request-channelrequestChannel service-interfacexpadro.spring.integration.service.TicketService/int:channel idrequestChannel/int-jms:outbound-gateway idoutGateway request-destinationtoAsyncJmsQueue request-channelrequestChannel reply-channeljmsReplyChannel/int:channel idjmsReplyChannel/int:service-activator methodregisterOrderConfirmation input-channeljmsReplyChannel refticketProcessor/ 流程如下 包装在Spring Integration Message中的TicketOrder将通过“ inGateway”网关进入消息传递系统。 网关会将消息放入“ requestChannel”通道。 通道将消息发送到其订阅的端点JMS出站网关。 JMS出站网关提取消息的有效负载并将其包装为JMS消息。 网关发送消息并等待答复。 当答复到来时网关以包装在JMS消息中的TicketConfirmation形式将获得有效负载并将其包装到Spring Integration消息中。 该消息将发送到“ jmsReplyChannel”通道服务激活器TicketProcessor将在该通道中处理该消息并将其注册到我们的OrderRepository。 订单处理器非常简单。 它收到TicketConfirmation并将其添加到票证存储库 Component(ticketProcessor)
public class TicketProcessor {Autowiredprivate OrderRepository repository;public void registerOrderConfirmation(TicketConfirmation confirmation) {repository.confirmOrder(confirmation);}
} 考试 RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutGatewayConfig {Autowiredprivate OrderRepository repository;Autowiredprivate TicketService service;Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf repository.getConfirmations().get(0);assertEquals(321, conf.getId());}
} 外部系统 为了完全理解该示例我将向您展示将消息传递到JMS队列时发生的情况。 侦听Spring Integration发送消息的队列有一个侦听器asyncConsumer bean idtoAsyncJmsQueue classorg.apache.activemq.command.ActiveMQQueueconstructor-arg valueto.async.jms.queue/
/bean!-- Listeners --
jms:listener-container connection-factoryconnectionFactoryjms:listener destinationto.async.jms.queue refasyncConsumer/
/jms:listener-container 侦听器接收到该消息并使用票证确认创建新消息并进行回复。 注意我们必须将回复消息的相关性ID设置为与请求消息相同的值。 这将使客户知道我们正在响应哪个消息。 另外我们将目标设置为请求消息中配置的回复通道。 Component(asyncConsumer)
public class AsyncConsumer implements MessageListener {Autowiredprivate JmsTemplate template;Overridepublic void onMessage(Message order) {final Message msgOrder order;TicketOrder orderObject;try {orderObject (TicketOrder) ((ObjectMessage) order).getObject();} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}float amount 5.95f * orderObject.getQuantity();TicketConfirmation confirmation new TicketConfirmation(321, orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount);try {template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setJMSCorrelationID(msgOrder.getJMSCorrelationID());return message;}});} catch (JmsException | JMSException e) {throw JmsUtils.convertJmsAccessException((JMSException) e);}}
}6.消息转换 消息通道适配器和网关都使用消息转换器将传入消息转换为Java类型或者采用相反的方式。 转换器必须实现MessageConverter接口 public interface MessageConverter {P MessageP toMessage(Object object);P Object fromMessage(MessageP message);} Spring Integration带有MessageConverter接口的两种实现 MapMessageConverter 它的fromMessage方法使用两个键创建一个新的HashMap 有效负载值为消息的有效负载 message.getPayload 。 标头该值是另一个HashMap具有来自原始消息的所有标头。 “ toMessage”方法期望一个具有相同结构有效负载和标头键的Map实例并构造一个Spring Integration消息。 SimpleMessageConverter 这是适配器和网关使用的默认转换器。 您可以从源代码中看到它与对象之间的转换 public Message? toMessage(Object object) throws Exception {if (object null) {return null;}if (object instanceof Message?) {return (Message?) object;}return MessageBuilder.withPayload(object).build();
}public Object fromMessage(Message? message) throws Exception {return (message ! null) ? message.getPayload() : null;
} 无论如何如果需要自己的实现则可以在通道适配器或网关配置中指定自定义转换器。 例如使用网关 int-jms:outbound-gateway idoutGateway request-destinationtoAsyncJmsQueue request-channelrequestChannel reply-channeljmsReplyChannel message-convertermyConverter/ 只要记住您的转换器应该实现MessageConverter Component(myConverter)
public class MyConverter implements MessageConverter {7. JMS支持的消息通道 通道适配器和网关用于与外部系统进行通信。 JMS支持的消息通道用于在同一应用程序内的使用者和生产者之间发送和接收JMS消息。 尽管在这种情况下我们仍然可以使用通道适配器但是使用JMS通道要简单得多。 与集成消息通道的区别在于JMS通道将使用JMS代理发送消息。 这意味着消息将不仅仅存储在内存通道中。 相反它将被发送到JMS提供程序从而也可以使用事务。 如果使用事务它将按以下方式工作 如果回滚事务则将消息发送到JMS支持的通道的生产者将不会编写该消息。 如果事务回滚订阅JMS支持的通道的使用者将不会从该通道中删除消息。 对于此功能Spring Integration提供了两个渠道点对点和发布/订阅渠道。 它们配置如下 点对点直接渠道 int-jms:channel idjmsChannel queuemyQueue/ 发布/订阅频道 int-jms:publish-subscribe-channel idjmsChannel topicmyTopic/ 在下面的示例中我们可以看到一个简单的应用程序其中有两个端点使用JMS支持的通道相互通信。 组态 发送到消息传递系统 TicketOrder对象的消息到达服务激活器票证处理器。 然后该处理器将订单 sendJMS 发送到JMS支持的消息。 订阅此通道有一个相同的处理器将接收消息 receiveJms 对其进行处理以创建TicketConfirmation并将其注册到票证存储库 context:component-scan base-packagexpadro.spring.integration/int:gateway default-request-channelrequestChannel service-interfacexpadro.spring.integration.service.TicketService/int:channel idrequestChannel/int:service-activator methodsendJms input-channelrequestChannel output-channeljmsChannel refticketJmsProcessor/int-jms:channel idjmsChannel queuesyncTestQueue/int:service-activator methodreceiveJms input-channeljmsChannel refticketJmsProcessor/ 处理器 实现两种方法 sendJms和receiveJms Component(ticketJmsProcessor)
public class TicketJmsProcessor {private static final Logger logger LoggerFactory.getLogger(TicketJmsProcessor.class);Autowiredprivate OrderRepository repository;public TicketOrder sendJms(TicketOrder order) {logger.info(Sending order {}, order.getFilmId());return order;}public void receiveJms(TicketOrder order) {logger.info(Processing order {}, order.getFilmId());float amount 5.95f * order.getQuantity();TicketConfirmation confirmation new TicketConfirmation(123, order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}
} 考试 ContextConfiguration(locations {/xpadro/spring/integration/test/jms-config.xml,/xpadro/spring/integration/test/int-jms-jms-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsToJmsConfig {Autowiredprivate OrderRepository repository;Autowiredprivate TicketService service;Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf repository.getConfirmations().get(0);assertEquals(123, conf.getId());}
} JMS支持的通道提供了不同的可能性例如配置队列名称而不是队列引用或使用目标解析器 int-jms:channel idjmsChannel queue-namemyQueuedestination-resolvermyDestinationResolver/8.动态目标解析 目标解析器是一个类它允许我们将目标名称解析为JMS目标。 任何目标解析器都必须实现以下接口 public interface DestinationResolver {Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)throws JMSException;
} 可以在JMS通道适配器JMS网关和JMS支持的通道上指定目标解析器。 如果您未明确配置目标解析器Spring将使用默认实现即DynamicDestinationResolver 。 下面将解释该解析器作为Spring提供的其他实现 DynamicDestinationResolver 通过使用标准JMS Session.createTopic和Session.createQueue方法将目标名称解析为动态目标。 BeanFactoryDestinationResolver 它将在Spring上下文中查找名称类似于提供的目标名称的bean并期望其类型为javax.jms.Destination 。 如果找不到它将抛出DestinationResolutionException 。 JndiDestinationResolver 它将假定目标名称是JNDI位置。 如果我们不想使用默认的动态解析器则可以实现自定义解析器并在所需的端点中对其进行配置。 例如以下JMS支持的通道使用不同的实现 int-jms:channel idjmsChannel queue-namemyQueuedestination-resolvermyDestinationResolver/9. AMQP集成 安装 要安装并启动RabbitMQ服务器您只需要遵循以下步骤即可。 如果您已经安装了服务器则跳过此部分。 第一步是安装RabbitMQ服务器所需的erlang。 转到以下URL下载系统版本并安装它 http://www.erlang.org/download.html 下一步是下载并安装RabbitMQ。 如果要使用与本教程相同的版本请下载版本3.2.4。 http://www.rabbitmq.com/ 现在打开命令提示符。 如果您是Windows用户则可以通过单击开始菜单并在RabbitMQ文件夹中选择RabbitMQ命令提示符直接进入。 激活管理插件 rabbitmq-plugins enable rabbitmq_management 启动服务器 rabbitmq-server.bat 好的现在我们将测试RabbitMQ是否已正确安装。 转到http// localhost15672并使用“ guest”作为用户名和密码登录。 如果使用的是3.0之前的版本则端口为55672。 如果您看到网络用户界面则一切就绪。 演示应用 为了将AMQP与Spring Integration结合使用我们需要在pom.xml文件中添加以下依赖项 SpringAMQP适用于RabbitMQ dependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit/artifactIdversion1.3.1.RELEASE/version
/dependency Spring Integration AMQP端点 dependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-amqp/artifactIdversion3.0.2.RELEASE/version
/dependency 现在我们将创建一个新的配置文件amqp-config.xml其中将包含RabbitMQ配置例如我们在本教程先前使用的JMS的jms-config。 beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xmlns:rabbithttp://www.springframework.org/schema/rabbitxsi:schemaLocationhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsdrabbit:connection-factory idconnectionFactory /rabbit:template idamqpTemplate connection-factoryconnectionFactory /rabbit:admin connection-factoryconnectionFactory /rabbit:queue namerabbit.queue /rabbit:direct-exchange namerabbit.exchangerabbit:bindingsrabbit:binding queuerabbit.queue keyrabbit.key.binding //rabbit:bindings/rabbit:direct-exchange
/beans 下一个文件是Spring Integration文件其中包含通道和通道适配器 beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:contexthttp://www.springframework.org/schema/contextxmlns:inthttp://www.springframework.org/schema/integrationxmlns:int-amqphttp://www.springframework.org/schema/integration/amqpxsi:schemaLocationhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsdcontext:component-scan base-packagexpadro.spring.integration.amqp/int:gateway default-request-channelrequestChannel service-interfacexpadro.spring.integration.amqp.service.AMQPService/int:channel idrequestChannel/int-amqp:outbound-channel-adapterchannelrequestChannel amqp-templateamqpTemplate exchange-namerabbit.exchangerouting-keyrabbit.key.binding/int-amqp:inbound-channel-adapter channelresponseChannelqueue-namesrabbit.queue connection-factoryconnectionFactory /int:channel idresponseChannel/int:service-activator refamqpProcessor methodprocess input-channelresponseChannel//beans 流程如下 测试应用程序向网关发送一条消息该消息将是一个简单的String。 从网关它将通过“ requestChannel”通道到达出站通道适配器。 出站通道适配器将消息发送到“ rabbit.queue”队列。 订阅此“ rabbit.queue”队列我们已经配置了入站通道适配器。 它将接收发送到队列的消息。 该消息通过“ responseChannel”通道发送到服务激活器。 服务激活器仅打印消息。 用作消息传递系统入口的网关包含一个方法 public interface AMQPService {Gatewaypublic void sendMessage(String message);
} 服务激活器amqpProcessor非常简单。 它收到一条消息并打印其有效负载 Component(amqpProcessor)
public class AmqpProcessor {public void process(MessageString msg) {System.out.println(Message received: msg.getPayload());}
} 为了完成该示例以下是通过调用网关包装的服务来启动流的应用程序 ContextConfiguration(locations {/xpadro/spring/integration/test/amqp-config.xml,/xpadro/spring/integration/test/int-amqp-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationAMQPConfig {Autowiredprivate AMQPService service;Testpublic void testSendToJms() throws InterruptedException, RemoteException {String msg hello;service.sendMessage(msg);Thread.sleep(2000);}
}翻译自: https://www.javacodegeeks.com/2015/09/enterprise-messaging.html