网站开发 自学,莱芜论坛招工,中国互联网站建设,重庆住房和城乡建设部网站的打印准考证RocketMQ系列文章
RocketMQ(一)#xff1a;基本概念和环境搭建
RocketMQ(二)#xff1a;原生API快速入门
RocketMQ(三)#xff1a;集成SpringBoot 目录 一、搭建环境二、不同类型消息1、同步消息2、异步消息3、单向消息4、延迟消息5、顺序消息6、带tag消息7、带key消息 一…RocketMQ系列文章
RocketMQ(一)基本概念和环境搭建
RocketMQ(二)原生API快速入门
RocketMQ(三)集成SpringBoot 目录 一、搭建环境二、不同类型消息1、同步消息2、异步消息3、单向消息4、延迟消息5、顺序消息6、带tag消息7、带key消息 一、搭建环境
需要创建两个服务消息生产服务和消息消费者服务生产消息存在多个服务消费则统一由一个服务处理这样可以做到解耦 pom.xml 生产者和消费者都需要
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.2/version
/dependency生产者配置文件 设置统一的生产者组这样发送消息时就不用指定了
rocketmq:name-server: 127.0.0.1:9876 # rocketMq的nameServer地址producer:group: boot-producer-group # 生产者组别send-message-timeout: 3000 # 消息发送的超时时间retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数max-message-size: 4194304 # 消息的最大长度生产者配置文件 不能设置统一的消费者组因为不同的消费者订阅关系不一致需要设置不同的消费者组
rocketmq:name-server: localhost:9876二、不同类型消息
直接引入即可
Autowired
private RocketMQTemplate rocketMQTemplate;1、同步消息 生产消息 消息由消费者发送到broker后会得到一个确认是具有可靠性的比如重要的消息通知短信通知等
rocketMQTemplate.syncSend(bootTestTopic, 我是boot的一个消息);消费消息 RocketMQListener的泛型类型即消息类型 MessageExt类型是消息的所有内容其他类型则就只是消息体内容没有消息头内容keys、msgId、延迟时间、重试次数、主题名称... onMessage方法内没有报错就是签收了报错就是拒收会重试
Component
RocketMQMessageListener(topic bootTestTopic, consumerGroup boot-test-consumer-group)
public class ABootSimpleMsgListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}2、异步消息
发送异步消息发送完以后会有一个异步通知不影响程序往下执行
rocketMQTemplate.asyncSend(bootAsyncTestTopic, 我是boot的一个异步消息, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(成功);}Overridepublic void onException(Throwable throwable) {System.out.println(失败 throwable.getMessage());}
});3、单向消息
不关心发送结果的场景这种方式吞吐量很大但是存在消息丢失的风险例如日志信息的发送
rocketMQTemplate.sendOneWay(bootOnewayTopic, 单向消息);4、延迟消息
RocketMQ不支持任意时间的延时只支持以下18个固定的延时等级等级1就对应1s以此类推最高支持2h延迟private String messageDelayLevel “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;发送一个延时消息延迟等级为4级也就是30s后被监听消费
MessageString msg MessageBuilder.withPayload(我是一个延迟消息).build();
rocketMQTemplate.syncSend(bootMsTopic, msg, 3000, 4);5、顺序消息 生产消息 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去 消费者 需要单线程消费
ListMsgModel msgModels Arrays.asList(new MsgModel(qwer, 1, 下单),new MsgModel(qwer, 1, 短信),new MsgModel(qwer, 1, 物流),new MsgModel(zxcv, 2, 下单),new MsgModel(zxcv, 2, 短信),new MsgModel(zxcv, 2, 物流)
);
msgModels.forEach(msgModel - {// 发送 一般都是以json的方式进行处理// 根据第三个参数计算hash值决定消息放入哪个队列rocketMQTemplate.syncSendOrderly(bootOrderlyTopic, JSON.toJSONString(msgModel), msgModel.getOrderSn());
});消费消息 默认是并发消费模式可以设置为单线程顺序模式设置消费重试次数
Component
RocketMQMessageListener(topic bootOrderlyTopic,consumerGroup boot-orderly-consumer-group,consumeMode ConsumeMode.ORDERLY, // 顺序消费模式 单线程maxReconsumeTimes 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {MsgModel msgModel JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}6、带tag消息
tag带在主题后面用:来携带
rocketMQTemplate.syncSend(bootTagTopic:tagA, 我是一个带tag的消息);7、带key消息
MessageString message MessageBuilder.withPayload(我是一个带key的消息).setHeader(RocketMQHeaders.KEYS, 10086).build();
rocketMQTemplate.syncSend(bootKeyTopic, message);获取带key和tag的消费者 过滤模式有两种正则表达式和sql92方式keys从MessageExt对象中获取
Component
RocketMQMessageListener(topic bootTagTopic,consumerGroup boot-tag-consumer-group,selectorType SelectorType.TAG,// tag过滤模式selectorExpression tagA || tagB
// selectorType SelectorType.SQL92,// sql92过滤模式
// selectorExpression a in (3,5,7) // broker.conf中开启enbalePropertyFiltertrue
)
public class CTagMsgListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {System.out.println(获取keys: message.getKeys());System.out.println(消息内容: new String(message.getBody()));}
}查看源码 destination目标 主题 : 标签keys从消息头里面获取