网站信息抽查评估 短信,东营市城乡建设局网站,西安看个号网络科技有限公司,seo引擎优化平台培训Active MQ 02
常用API
事务
session.commit();
session.rollback();用来提交/回滚事务
Purge
清理消息
签收模式
签收代表接收端的session已收到消息的一次确认#xff0c;反馈给broker
ActiveMQ支持自动签收与手动签收
Session.AUTO_ACKNOWLEDGE
当客户端从receive…Active MQ 02
常用API
事务
session.commit();
session.rollback();用来提交/回滚事务
Purge
清理消息
签收模式
签收代表接收端的session已收到消息的一次确认反馈给broker
ActiveMQ支持自动签收与手动签收
Session.AUTO_ACKNOWLEDGE
当客户端从receiver或onMessage成功返回时Session自动签收客户端的这条消息的收条。
Session.CLIENT_ACKNOWLEDGE
客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下签收发生在Session层面签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。
Session.DUPS_OK_ACKNOWLEDGE
Session不必确保对传送消息的签收这个模式可能会引起消息的重复但是降低了Session的开销所以只有客户端能容忍重复的消息才可使用。
持久化
默认持久化是开启的
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)优先级
可以打乱消费顺序
producer.setPriority配置文件需要指定使用优先级的目的地
policyEntry queuequeue1 prioritizedMessagestrue /消息超时/过期
producer.setTimeToLive设置了消息超时的消息消费端在超时后无法在消费到此消息。
给消息设置一个超时时间 - 死信队列 - 拿出来 - 重发
死信
此类消息会进入到ActiveMQ.DLQ队列且不会自动清除称为死信
此处有消息堆积的风险
修改死信队列名称 policyEntry queuef prioritizedMessagestrue deadLetterStrategy individualDeadLetterStrategy queuePrefixDLxxQ. useQueueForQueueMessagestrue / /deadLetterStrategy /policyEntryuseQueueForQueueMessages: 设置使用队列保存死信还可以设置useQueueForTopicMessages使用Topic来保存死信
让非持久化的消息也进入死信队列 individualDeadLetterStrategy queuePrefixDLxxQ. useQueueForQueueMessagestrue processNonPersistenttrue / processNonPersistent“true”
过期消息不进死信队列
individualDeadLetterStrategy processExpiredfalse / 独占消费者 Queue queue session.createQueue(xxoo?consumer.exclusivetrue);还可以设置优先级
Queue queue session.createQueue(xxoo?consumer.exclusivetrueconsumer.priority10);消息类型
object
发送端 Girl girl new Girl(qiqi,25,398.0);Message message session.createObjectMessage(girl);接受端 if(message instanceof ActiveMQObjectMessage) {Girl girl (Girl)((ActiveMQObjectMessage)message).getObject();System.out.println(girl);System.out.println(girl.getName());}如果遇到此类报错
Exception in thread main javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)at com.mashibing.mq.Receiver.main(Receiver.java:65)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)... 1 more需要添加信任 connectionFactory.setTrustedPackages(new ArrayListString(Arrays.asList(new String[]{Girl.class.getPackage().getName()})));bytesMessage
发送端 BytesMessage bytesMessage session.createBytesMessage();bytesMessage.writeBytes(str.getBytes());bytesMessage.writeUTF(哈哈);接受端 if(message instanceof BytesMessage) {BytesMessage bm (BytesMessage)message;byte[] b new byte[1024];int len -1;while ((len bm.readBytes(b)) ! -1) {System.out.println(new String(b, 0, len));}}还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序
bm.readBoolean()
bm.readUTF()写入文件 FileOutputStream out null;try {out new FileOutputStream(d:/aa.txt);} catch (FileNotFoundException e2) {e2.printStackTrace();}byte[] by new byte[1024];int len 0 ;try {while((len bm.readBytes(by))! -1){out.write(by,0,len);}} catch (Exception e1) {e1.printStackTrace();}MapMessage
发送端 MapMessage mapMessage session.createMapMessage();mapMessage.setString(name,lucy);mapMessage.setBoolean(yihun,false);mapMessage.setInt(age, 17);producer.send(mapMessage);接收端 Message message consumer.receive();MapMessage mes (MapMessage) message;System.out.println(mes);System.out.println(mes.getString(name));消息发送原理
同步与异步
开启事务关闭事务持久化异步同步非持久化异步异步
我们可以通过以下几种方式来设置异步发送 ActiveMQConnectionFactory connectionFactory new ActiveMQConnectionFactory(admin,admin,tcp://localhost:61616);// 2.获取一个向ActiveMQ的连接connectionFactory.setUseAsyncSend(true);ActiveMQConnection connection (ActiveMQConnection)connectionFactory.createConnection();connection.setUseAsyncSend(true);消息堆积
producer每发送一个消息统计一下发送的字节数当字节数达到ProducerWindowSize值时需要等待broker的确认才能继续发送。
brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize1048576
destinationUri中设置: myQueue?producer.windowSize1048576
延迟消息投递
首先在配置文件中开启延迟和调度
schedulerSupport“true” broker xmlnshttp://activemq.apache.org/schema/core brokerNamelocalhost dataDirectory${activemq.data} schedulerSupporttrue延迟发送
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);带间隔的重复发送 long delay 10 * 1000;long period 2 * 1000;int repeat 9;message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);createProducer.send(message);Cron表达式定时发送
Cron表达式是一个字符串字符串以5或6个空格隔开分为6或7个域每一个域代表一个含义Cron有如下两种语法格式
Seconds Minutes Hours DayofMonth Month DayofWeek Year或
Seconds Minutes Hours DayofMonth Month DayofWeek
每一个域可出现的字符如下
Seconds:可出现, - * /四个字符有效范围为0-59的整数
Minutes:可出现, - * /四个字符有效范围为0-59的整数
Hours:可出现, - * /四个字符有效范围为0-23的整数
DayofMonth:可出现, - * / ? L W C八个字符有效范围为0-31的整数
Month:可出现, - * /四个字符有效范围为1-12的整数或JAN-DEc
DayofWeek:可出现, - * / ? L C #四个字符有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天2表示星期一 依次类推
Year:可出现, - * /四个字符有效范围为1970-2099年
每一个域都使用数字但还可以出现如下特殊字符它们的含义是
(1)表示匹配该域的任意值假如在Minutes域使用, 即表示每分钟都会触发事件。
(2)?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值但实际不会。因为DayofMonth和 DayofWeek会相互影响。例如想在每月的20日触发调度不管20日到底是星期几则只能使用如下写法 13 13 15 20 * ?, 其中最后一位只能用而不能使用*如果使用*表示不管星期几都会触发实际上并不是这样。
(3)-:表示范围例如在Minutes域使用5-20表示从5分到20分钟每分钟触发一次
(4)/表示起始时间开始触发然后每隔固定时间触发一次例如在Minutes域使用5/20,则意味着5分钟触发一次而2545等分别触发一次.
(5),:表示列出枚举值值。例如在Minutes域使用5,20则意味着在5和20分每分钟触发一次。
(6)L:表示最后只能出现在DayofWeek和DayofMonth域如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。
(7)W: 表示有效工作日(周一到周五),只能出现在DayofMonth域系统将在离指定日期的最近的有效工作日触发事件。例如在 DayofMonth使用5W如果5日是星期六则将在最近的工作日星期五即4日触发。如果5日是星期天则在6日(周一)触发如果5日在星期一 到星期五中的一天则就在5日触发。另外一点W的最近寻找不会跨过月份
(8)LW:这两个字符可以连用表示在某个月最后一个工作日即最后一个星期五。
(9)#:用于确定每个月第几个星期几只能出现在DayofMonth域。例如在4#2表示某月的第二个星期三。
举几个例子:
0 0 2 1 * ? * 表示在每月的1日的凌晨2点调度任务
0 15 10 ? * MON-FRI 表示周一到周五每天上午1015执行作业
0 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作
一个cron表达式有至少6个也可能7个有空格分隔的时间元素。
按顺序依次为
秒0~59
分钟0~59
小时0~23
天月0~31但是你需要考虑你月的天数
月0~11
天星期1~7 1SUN 或 SUNMONTUEWEDTHUFRISAT
年份19702099
其中每个元素可以是一个值(如6),一个连续区间(9-12),一个间隔时间(8-18/4)(/表示每隔4小时),一个列表(1,3,5),通配符。由于月份中的日期和星期中的日期这两个元素互斥的,必须要对其中一个设置?
0 0 10,14,16 * * ? 每天上午10点下午2点4点
0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时
0 0 12 ? * WED 表示每个星期三中午12点
“0 0 12 * * ?” 每天中午12点触发
“0 15 10 ? * *” 每天上午10:15触发
“0 15 10 * * ?” 每天上午10:15触发
“0 15 10 * * ? *” 每天上午10:15触发
“0 15 10 * * ? 2005” 2005年的每天上午10:15触发
“0 * 14 * * ?” 在每天下午2点到下午2:59期间的每1分钟触发
“0 0/5 14 * * ?” 在每天下午2点到下午2:55期间的每5分钟触发
“0 0/5 14,18 * * ?” 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
“0 0-5 14 * * ?” 在每天下午2点到下午2:05期间的每1分钟触发
“0 10,44 14 ? 3 WED” 每年三月的星期三的下午2:10和2:44触发
“0 15 10 ? * MON-FRI” 周一至周五的上午10:15触发
“0 15 10 15 * ?” 每月15日上午10:15触发
“0 15 10 L * ?” 每月最后一日的上午10:15触发
“0 15 10 ? * 6L” 每月的最后一个星期五上午10:15触发
“0 15 10 ? * 6L 2002-2005” 2002年至2005年的每月的最后一个星期五上午10:15触发
“0 15 10 ? * 6#3” 每月的第三个星期五上午10:15触发
监听器
可以使用监听器来处理消息接收
consumer.setMessageListener(new MyListener());需要实现接口MessageListener
public class MyListener implements MessageListener {public void onMessage(Message message) {// TODO Auto-generated method stubTextMessage textMessage (TextMessage)message;try {System.out.println(xxoo textMessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
当收到消息后会调起onMessage方法
消息过滤
消息发送 MapMessage msg1 session.createMapMessage();msg1.setString(name, qiqi);msg1.setString(age, 18);msg1.setStringProperty(name, qiqi);msg1.setIntProperty(age, 18);MapMessage msg2 session.createMapMessage();msg2.setString(name, lucy);msg2.setString(age, 18);msg2.setStringProperty(name, lucy);msg2.setIntProperty(age, 18);MapMessage msg3 session.createMapMessage();msg3.setString(name, qianqian);msg3.setString(age, 17);msg3.setStringProperty(name, qianqian);msg3.setIntProperty(age, 17);消息接收 String selector1 age 17;String selector2 name lucy;MessageConsumer consumer session.createConsumer(queue,selector2);