建设厅网站打不开,wordpress 追格时光轴购物主题,麻章网站开发公司,莆田网站建设哪里便宜RocketMQ系列文章
RocketMQ(一)#xff1a;基本概念和环境搭建
RocketMQ(二)#xff1a;原生API快速入门
RocketMQ(三)#xff1a;集成SpringBoot
RocketMQ(四)#xff1a;重复消费、消息重试、死信消息的解决方案 目录 一、重复消费1、消息重复的情况2、MySql唯一索引…RocketMQ系列文章
RocketMQ(一)基本概念和环境搭建
RocketMQ(二)原生API快速入门
RocketMQ(三)集成SpringBoot
RocketMQ(四)重复消费、消息重试、死信消息的解决方案 目录 一、重复消费1、消息重复的情况2、MySql唯一索引3、redis分布式锁 二、消息重试1、生产者重试2、消费者重试 三、死信消息四、消费堆积 一、重复消费
1、消息重复的情况
发送时消息重复 当一条消息已被成功发送到服务端并完成持久化此时出现了网络闪断或者客户端宕机导致服务端对客户端应答失败如果此时生产者意识到消息发送失败并尝试再次发送消息消费者后续会收到两条内容相同并且 Message ID 也相同的消息 投递时消息重复 消息消费的场景下消息已投递到消费者并完成业务处理当客户端给服务端反馈应答的时候网络闪断为了保证消息至少被消费一次消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息消费者后续会收到两条内容相同并且 Message ID 也相同的消息 负载均衡时消息重复包括但不限于网络抖动、Broker 重启以及订阅方应用重启 当消息队列RocketMQ的Broker 或客户端重启、扩容或缩容时会触发 Rebalance此时消费者可能会收到重复消息
2、MySql唯一索引
因为 Message ID 有可能出现冲突重复的情况所以用业务唯一标识作为幂等处理的关键依据 生产者 相同的唯一业务编号发送两次
Test
void test1() {// 业务唯一编号String key 1300;MessageString message MessageBuilder.withPayload(我是一个带key的消息).setHeader(RocketMQHeaders.KEYS, key).build();// 相同的key发送两次rocketMQTemplate.syncSend(repeatedTopic, message);rocketMQTemplate.syncSend(repeatedTopic, message);System.out.println(发送完成);
}消费者 创建user表结构num_no字段设置为唯一索引当唯一的业务id插入唯一索引的num_no字段只能插入一次第二次会报唯一索引重复当获取到重复数据直接返回即可就不在执行业务代码
Component
RocketMQMessageListener(topic repeatedTopic, consumerGroup repeated-consumer-group)
public class RepeatMysqlListener implements RocketMQListenerMessageExt {Autowiredprivate JdbcTemplate jdbcTemplate;Overridepublic void onMessage(MessageExt message) {// 唯一的业务id如果是相同的两次请求则keys值一定相同String messageKey message.getKeys();try {jdbcTemplate.execute(INSERT INTO user (num_no,name) VALUES( messageKey ,名称));} catch (DataAccessException e) {// 该message可能是重复的if (e instanceof DuplicateKeyException) {System.out.println(messageKey的业务编号数据重复了,直接return,就算消费了此重复数据);return;}}// 获取消息执行业务System.out.println(获取消息内容:【 new String(message.getBody()) 】执行业务);}
}执行结果
发送完成
获取消息内容:【我是一个带key的消息】执行业务
1300的业务编号数据重复了,直接return,就算消费了此重复数据3、redis分布式锁 Redisson分布式锁配置 Configuration
public class RedissonConfig {Beanpublic Redisson redisson() {Config config new Config();config.useSingleServer().setAddress(redis://localhost:6390).setPassword(xc1234).setDatabase(0);return (Redisson) Redisson.create(config);}
}生产者 Test
void test1() {// 业务唯一编号String key 1400;MessageString message MessageBuilder.withPayload(我是一个带key的消息).setHeader(RocketMQHeaders.KEYS, key).build();// 相同的key发送两次rocketMQTemplate.syncSend(repeatedTopic, message);rocketMQTemplate.syncSend(repeatedTopic, message);System.out.println(发送完成);
}消费者 因为消费者是多线程并发消费如果遇到相同的唯一业务id则上锁依次执行将执行过的唯一业务id放入redis下次相同业务id进入与redis集合对比存在则证明已经执行过了
Component
RocketMQMessageListener(topic repeatedTopic, consumerGroup repeated-consumer-group)
public class RepeatRedisListener implements RocketMQListenerMessageExt {Autowiredprivate Redisson redisson;Autowiredprivate StringRedisTemplate stringRedisTemplate;Overridepublic void onMessage(MessageExt message) {// 唯一的业务id如果是相同的两次请求则keys值一定相同String messageKey message.getKeys();RLock redissonLock redisson.getLock(messageKey);try {// 添加redisson锁并实现锁续命功能// 默认过期时间是30s每10s触发一次锁续命功能redissonLock.lock();ListString topicBusinessKeyList stringRedisTemplate.opsForList().range(topicBusinessKey,0,-1);if ( ObjectUtils.isNotEmpty(topicBusinessKeyList) topicBusinessKeyList.contains(messageKey)) {System.out.println(messageKey 的业务编号数据重复了,直接return,就算消费了此重复数据);return;}// 获取消息执行业务System.out.println(获取消息内容:【 new String(message.getBody()) 】执行业务);// 讲businessKey存入redisstringRedisTemplate.opsForList().rightPush(topicBusinessKey, messageKey);} finally {redissonLock.unlock();}}
}执行结果
发送完成
获取消息内容:【我是一个带key的消息】执行业务
1400的业务编号数据重复了,直接return,就算消费了此重复数据二、消息重试
1、生产者重试
可以分别设置同步消息和异步消息发送的重试次数广播方式不提供失败重试特性即消费失败后失败消息不再重试继续消费新的消息默认重试间隔时间为 1 秒次数为2次发送消息超时时间默认3000毫秒如果因为超时那么便不再尝试重试 application.yml配置文件设置 2、消费者重试
默认的重试间隔10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h默认多线程模式下重试16次设置超过 16 次的重试时间间隔均为每次 2 小时某条消息在一直消费失败的前提下将会在接下来的 4 小时 46 分钟之内进行 16 次重试超过这个时间范围消息将不再重试投递在单线程的顺序模式下重试Integer.MAX_VALUE次间隔1秒 消费者配置 实现RocketMQPushConsumerLifecycleListener接口从prepareStart方法中获取消费者并设置它消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效
Component
RocketMQMessageListener(topic retryTopic,consumerGroup retry-consumer-group
)
public class RetryListener implements RocketMQListenerMessageExt, RocketMQPushConsumerLifecycleListener {Overridepublic void onMessage(MessageExt message) {//获取消息的重试次数System.out.println(message.getReconsumeTimes());System.out.println(消息内容:new String(message.getBody()));}Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 设置消费者重试次数defaultMQPushConsumer.setMaxReconsumeTimes(2);// 实例名称-控制面板可以看到defaultMQPushConsumer.setInstanceName(消费者1号);}
}设置重试二次的执行结果 三、死信消息
当消费重试到达阈值以后消息不会被投递给消费者了而是进入了死信队列死信队列是死信Topic下分区数唯一的单独队列死信Topic名称为%DLQ%原消费者组名死信队列的消息将不会再被消费 上一节的消费者重试两次后就会将消息放入死信队列 处理死信消息方式一 监听死信队列处理消息
Component
RocketMQMessageListener(topic %DLQ%retry-consumer-group,consumerGroup retry-dead-consumer-group
)
public class RetryDeadConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {// 处理消息 签收了System.out.println(记录到特别的位置 文件 mysql 通知人工处理);}
}处理死信消息方式二 控制重试次数重试几次后直接记录到数据库等等
Component
RocketMQMessageListener(topic %DLQ%retry-consumer-group,consumerGroup retry-dead-consumer-group
)
public class RetryDeadConsumer2 implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {// 业务处理try {int i 1 / 0;} catch (Exception e) {// 重试int reconsumeTimes messageExt.getReconsumeTimes();if (reconsumeTimes 3) {// 不要重试了System.out.println(记录到特别的位置 文件 mysql 通知人工处理);}else {throw new RuntimeException(异常);}}}
}四、消费堆积
一般认为单条队列消息差值10w时 算堆积问题 什么情况下会出现堆积 生产太快 生产方可以做业务限流增加消费者数量,但是消费者数量队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n1)) 消费者消费出现问题