三网合一网站建设费用,wordpress图片投票插件,郑州网站托管助企,黑马程序员线上课程概述
介绍
JMS#xff08;Java Message Service#xff09;即 Java 消息服务应用程序接口#xff0c;是一个 Java 平台中关于面向消息中间件#xff08;MOM#xff09;的 API#xff0c;用于在两个应用程序之间#xff0c;或分布式系统中发送消息#xff0c;进行异步…概述
介绍
JMSJava Message Service即 Java 消息服务应用程序接口是一个 Java 平台中关于面向消息中间件MOM的 API用于在两个应用程序之间或分布式系统中发送消息进行异步通信。Java 消息服务是一个与具体平台无关的 API绝大多数 MOM 提供商都对JMS 提供支持。
简短来说JMS 是一种与厂商无关的 API是 sun 公司为了统一厂商的接口规范而定义出的一组api接口用来访问消息收发系统消息。它类似于 JDBCJava Database Connectivity提供了应用程序之间异步通信的功能。 JMS 体系结构
JMS 提供者JMS 的实现者比如 activemq、jbossmq、tonglinkmq 等JMS 客户使用提供者发送消息的程序或对象例如在 12306 中负责发送一条购票消息到处理队列中用来解决购票高峰问题那么发送消息到队列的程序和从队列获取消息的程序都叫做客户JMS 生产者producer、sender负责创建并发送消息的客户JMS 消费者customer、listener负责接收并处理消息的客户JMS 消息message在 JMS 客户之间传递数据的对象JMS 队列queue一个容纳那些被发送的等待阅读的消息的区域JMS 主题topic一种支持发送消息给多个订阅者的机制 JMS 对象模型
连接工厂connectionFactory客户端使用 JNDI 查找连接工厂然后利用连接工厂创建一个 JMS 连接JMS 连接表示 JMS 客户端和服务器端之间的一个活动的连接是由客户端通过调用连接工厂的方法建立的JMS 会话session 标识 JMS 客户端和服务端的会话状态。会话建立在 JMS 连接上标识客户与服务器之间的一个会话进程。JMS 目的Destination 又称为消息队列是实际的消息源生产者和消费者消息类型分为队列类型优先先进先出以及订阅类型 消息监听器
MessageListener
MessageListener 是最原始的消息监听器它是 JMS 规范中定义的一个接口。其中定义了一个用于处理接收到的消息的 onMessage() 方法该方法只接收一个 Message 参数。
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;public class ConsumerMessageListener implements MessageListener {public void onMessage(Message message) {// 若生产者发送的是一个纯文本消息可以直接进行强制转换或者直接把onMessage方法的参数改成Message的子类TextMessageTextMessage textMsg (TextMessage) message;System.out.println(接收到一个纯文本消息。);try {System.out.println(消息内容是 textMsg.getText());} catch (JMSException e) {e.printStackTrace();}}
}SessionAwareMessageListener
SessionAwareMessageListener 是 Spring 提供的它不是标准的 JMS MessageListener。
MessageListener 的设计只是纯粹用来接收消息的假如在使用 MessageListener 处理接收到的消息时需要发送一个消息通知对方已经收到这个消息了那么这个时候就需要在代码里面去重新获取一个 Connection 或 Session。而 SessionAwareMessageListener 的设计就是为了方便在接收到消息后发送一个回复的消息它同样提供了一个处理接收到的消息的 onMessage() 方法但是这个方法可以同时接收两个参数一个是表示当前接收到的消息Message另一个就是可以用来发送消息的 Session 对象。
使用 SessionAwareMessageListener 监听器可以在监听并消费了消息后不用重新获取一个 Connection 或 Session而是直接向原 Connection 或 Session 的某一个队列发送消息。
代码示例
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener {private Destination destination;public void onMessage(TextMessage message, Session session) throws JMSException {System.out.println(收到一条消息);System.out.println(消息内容是 message.getText());MessageProducer producer session.createProducer(destination);Message textMessage session.createTextMessage(ConsumerSessionAwareMessageListener。。。);producer.send(textMessage);}public Destination getDestination() {returndestination;}public void setDestination(Destination destination) {this.destination destination;}
}说明定义了一个 SessionAwareMessageListener在这个 Listener 中在接收到了一个消息之后利用对应的 Session 创建了一个到 destination 的生产者和对应的消息然后利用创建好的生产者发送对应的消息。 MessageListenerAdapter
MessageListenerAdapter 类实现了 MessageListener 接口和 SessionAwareMessageListener 接口它的主要作用是将接收到的消息进行类型转换然后通过反射的形式把它交给一个普通的 Java 类进行处理。 MessageListenerAdapter 会把接收到的消息做如下转换 TextMessage 转换为 String 对象BytesMessage 转换为 byte 数组MapMessage 转换为 Map 对象ObjectMessage 转换为对应的 Serializable 对象 代码示例 // 目标处理器类
public class ConsumerListener { public void handleMessage(String message) { System.out.println(ConsumerListener通过handleMessage接收到一个纯文本消息消息内容是 message); } public void receiveMessage(String message) { System.out.println(ConsumerListener通过receiveMessage接收到一个纯文本消息消息内容是 message); }
} !-- 消息监听适配器 --
bean idmessageListenerAdapter classorg.springframework.jms.listener.adapter.MessageListenerAdapter property namedelegate bean classcom.tiantian.springintejms.listener.ConsumerListener/ /property property namedefaultListenerMethod valuereceiveMessage/
/bean !-- 消息监听适配器对应的监听容器 --
bean idmessageListenerAdapterContainer classorg.springframework.jms.listener.DefaultMessageListenerContainer property nameconnectionFactory refconnectionFactory/ property namedestination refadapterQueue/ !-- 使用MessageListenerAdapter来作为消息监听器 -- property namemessageListener refmessageListenerAdapter/
/bean 注意 MessageListenerAdapter 会把接收到的消息做一个类型转换然后利用反射把它交给真正的目标处理器一个普通的 Java 类ConsumerListener进行处理。 如果真正的目标处理器是一个 MessageListener 或者是一个 SessionAwareMessageListener那么 Spring 将直接使用接收到的Message 对象作为参数调用它们的 onMessage 方法而不会再利用反射去进行调用。 故在定义一个 MessageListenerAdapter 的时候就需要为它指定这样一个目标类。这个目标类可以通过 MessageListenerAdapter 的构造方法参数指定也可以通过它的 delegate 属性来指定。 MessageListenerAdapter 另外一个主要的功能是可以通过 MessageListenerAdapter 注入的 handleMessage 方法自动的发送返回消息。 当用于处理接收到的消息的方法默认是 handleMessage的返回值不为空null或者void的时候Spring 会自动将它封装为一个 JMS Message然后自动进行回复。这个回复消息将发送到的地址主要有两种方式可以指定 可以通过发送的 Message 的 setJMSReplyTo 方法指定该消息对应的回复消息的目的地通过 MessageListenerAdapter 的 defaultResponseDestination 属性来指定 基本使用
依赖
!-- jms --
dependencygroupIdjavax.jms/groupIdartifactIdjavax.jms-api/artifactId
/dependency
!-- spring jms --
dependencygroupIdorg.springframework/groupIdartifactIdspring-jms/artifactId
/dependency!-- tonglinkMq jms api --
dependencygroupIdcom.tongtech.tlq/groupIdartifactIdTongJMS-without-atomikos/artifactIdversion8.1.0-SNAPSHOT/version
/dependencySpringBoot 集成 jms
jms 配置类
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.tongtech.tmqi.ConnectionFactory;EnableJms // 声明对 JMS 注解的支持
Configuration
public class TestCreator {private String host;private Integer port;private String queueManager;private String channel;private String username;private String password;private int ccsid;private String queueName;private long receiveTimeout;// 配置连接工厂(tonglinkMq)Beanpublic ConnectionFactory connectionFactory() throws JMSException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setProperty(tmqiAddressList, tlq://127.0.0.1:10024);connectionFactory.setProperty(tmqiDefaultUsername, admin);connectionFactory.setProperty(tmqiDefaultPassword, 123456);return connectionFactory;}// 配置缓存连接工厂 不配置该类则每次与MQ交互都需要重新创建连接大幅降低速度。BeanPrimarypublic CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {CachingConnectionFactory cachingConnectionFactory new CachingConnectionFactory();cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);cachingConnectionFactory.setSessionCacheSize(500);cachingConnectionFactory.setReconnectOnException(true);return cachingConnectionFactory;}// 配置DefaultJmsListenerContainerFactory, 用JmsListener注解来监听队列消息时尤其存在多个监听的时候通过实例化配置DefaultJmsListenerContainerFactory来控制消息分发Bean(name jmsQueueListenerCF)public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {DefaultJmsListenerContainerFactory factory new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(cachingConnectionFactory);// 设置连接数。如果对消息消费有顺序要求这里建议设置为1-1// 注使用同一个监听工厂类监听多个队列时连接数需大于等于监听队列数factory.setConcurrency(3-10); // 下限-上限// 重连间隔时间factory.setRecoveryInterval(1000L);// factory.setPubSubDomain(true); // 支持发布订阅功能topic// factory.setConcurrency(1); // topic 模式并发必须设置为1不然一条消息可能会被消费多次return factory;}// 配置JMS模板实例化jmsTemplate后可以在方法中通过autowired的方式注入模板用方法调用发送/接收消息// 注如果只是接收消息可以不配置此步Beanpublic JmsTemplate jmsQueueTemplate(CachingConnectionFactory cachingConnectionFactory) {JmsTemplate jmsTemplate new JmsTemplate(cachingConnectionFactory);jmsTemplate.setReceiveTimeout(receiveTimeout); // 设置超时时间// jmsTemplate.setPubSubDomain(true); // 开启发布订阅功能topicreturn jmsTemplate;}
}发送消息
public class jmsUtil {Autowiredprivate JmsTemplate jmsQueueTemplate;/*** 发送原始消息 Message*/public void send(){jmsQueueTemplate.send(queue1, new MessageCreator() {Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage(我是原始消息);}});}/*** 发送消息自动转换成原始消息* 注关于消息转换还可以通过实现MessageConverter接口来自定义转换内容*/public void convertAndSend(){jmsQueueTemplate.convertAndSend(queue1, 我是自动转换的消息);}
}监听接收消息
采用注解 JmsListener 来设置监听方法
Slf4j
Component
// 此处继承MessageListenerAdapter非必需。但若只使用JmsListener注解监听可能会出现监听消息获取不及时或者获取不到消息的情况加上继承MessageListenerAdapter后便不会出现
public class MdxpMessageListener extends MessageListenerAdapter {/*** 消息队列监听器* destination 队列地址此处使用静态变量支持配置化详见下文* containerFactory 监听器容器工厂包含配置源, 若存在2个以上的监听容器工厂,需进行指定*/OverrideJmsListener(destination TEST_QUEUE,containerFactory jmsQueueListenerCF)public void onMessage(Message message) {// JmsListener收到消息后会自动封装成自己特有的数据格式需要自行根据消息类型解析原始消息String msgText ; double d 0; try { if (msg instanceof TextMessage) { msgText ((TextMessage) msg).getText(); } else if (msg instanceof StreamMessage) { msgText ((StreamMessage) msg).readString(); d ((StreamMessage) msg).readDouble(); } else if (msg instanceof BytesMessage) { byte[] block new byte[1024]; ((BytesMessage) msg).readBytes(block); msgText String.valueOf(block); } else if (msg instanceof MapMessage) { msgText ((MapMessage) msg).getString(name); }log.info(接收消息{}, msgText);} catch (JMSException e) { log.error(消息接收异常, e);}}JmsListener(destination TEST_QUEUE2,containerFactory jmsQueueListenerCF)// Payload是消费者接受生产者发送的队列消息将队列中的json字符串变成对象的注解注意填充类需要实现序列化接口public void messageListener2(payload User user){log.info(message{}, user)}
}JmsListener 注解 destination 支持配置化
注入配置读取类
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 队列名称配置* 这里切记要Data或手动set和get*/
Component
Data
public class QueueNameConfig {Value(${ibmmq.queue-test})private String testQueue;}队列监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.TextMessage;/*** MQ消费者*/
Component
Slf4j
public class ReceiveMessage extends MessageListenerAdapter {/*** destination监听的队列名称使用SpEL表达式写入* containerFactory监听的工厂类为配置类中所配置的名字*/OverrideJmsListener(destination #{queueNameConfig.testQueue}, containerFactory jmsListenerContainerFactory)public void onMessage(Message message) {TextMessage textMessage (TextMessage) message; //转换成文本消息try {String text textMessage.getText();log.info(接收信息{}, text);} catch (Exception e) {e.printStackTrace();}}
}javax 原生 jms
public class jmstest {public static void main(String[] args) throws Exception { // 配置工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setProperty(tmqiAddressList, tlq://127.0.0.1:10024);connectionFactory.setProperty(tmqiDefaultUsername, admin);connectionFactory.setProperty(tmqiDefaultPassword, 123456);// 获取连接和会话Connection mqConn connectionFactory.createConnection(); // 创建会话。CLIENT_ACKNOWLEDGE手动应答AUTO_ACKNOWLEDGE自动应答Session mqSession mqConn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); // 创建队列Queue queuemq Session.createQueue(queueName);// 获取消费者MessageConsumer consumer mqSession.createConsumer(mqSession.createQueue(queueName)); // 设置监听器consumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { // JmsListener收到消息后会自动封装成自己特有的数据格式需要自行根据消息类型解析原始消息String msgText ; double d 0; try { if (msg instanceof TextMessage) {msgText ((TextMessage) msg).getText(); } else if (msg instanceof StreamMessage) { msgText ((StreamMessage) msg).readString(); d ((StreamMessage) msg).readDouble(); } else if (msg instanceof BytesMessage) { byte[] block new byte[1024]; ((BytesMessage) msg).readBytes(block); msgText String.valueOf(block); } else if (msg instanceof MapMessage) { msgText ((MapMessage) msg).getString(name); }log.info(接收消息{}, msgText);// 手动应答textMessage.acknowledge();} catch (JMSException e) { log.error(消息接收异常, e);}}}); // 启动连接mqConn.start(); }// 获取生产者MessageProducer producer mqSession.createProducer(mqSession.createQueue(queueName)); // topic广播模式// Topic topic Session.createTopic(queueName);// MessageProducer producer mqSession.createProducer(topic); producer.setDeliveryMode(DeliveryMOde.NON_PERSISTENT);producer.send(mqSession.createTexttMessage(这是一条消息));// 关闭资源producer.close();// 断开连接connection.close();
}