免费行情网站在线,带有互动的网站开发,logo商标设计,内蒙古建设网官网查询中心目录 一、简介1.1 定义1.2 何时进入死信队列#xff1f;1.3 死信消息的变化1.4 死信队列的应用场景1.5 死信消息的生命周期 二、代码实现2.1 死信队列的配置步骤2.2 配置类2.3 配置文件2.4 生产者2.5 业务消费者2.6 死信消费者2.7 测试结果 三、总结 RabbitMQ 是流行的开源消息… 目录 一、简介1.1 定义1.2 何时进入死信队列1.3 死信消息的变化1.4 死信队列的应用场景1.5 死信消息的生命周期 二、代码实现2.1 死信队列的配置步骤2.2 配置类2.3 配置文件2.4 生产者2.5 业务消费者2.6 死信消费者2.7 测试结果 三、总结 RabbitMQ 是流行的开源消息队列中间件使用 erlang 语言开发由于其社区活跃度高维护更新较快深得很多企业的喜爱。
一、简介
1.1 定义
死信队列Dead Letter Queue简称 DLX是 RabbitMQ 中一种特殊的队列用于处理无法正常被消费者消费的消息。当消息在原始队列中因为 达到最大重试次数、过期、或者 满足特定条件 时可以 将这些消息重新路由到一个预定义的死信队列中 进行进一步处理或记录。
1.2 何时进入死信队列
当发生以下情况业务队列中的消息会进入死信队列
消息被否定确认使用 channel.basicNack 或 channel.basicReject并且此时 requeue 属性被设置为 false。消息过期消息在队列的存活时间超过设置的 TTL 时间。消息溢出队列中的消息数量已经超过最大队列长度。
当发生以上三种情况后该消息将成为 死信。死信消息会被 RabbitMQ 进行特殊处理
如果配置了死信队列那么该消息将会被丢进死信队列中如果没有配置则该消息将会被丢弃。
1.3 死信消息的变化
那么 死信 被丢到死信队列后会发生什么变化呢
如果队列配置了 x-dead-letter-routing-key 的话“死信” 的路由键会被替换成该参数对应的值。如果没有配置则保留该消息原有的路由键。
举个例子
原有队列的路由键是 RoutingKey1有以下两种情况
如果配置队列的 x-dead-letter-routing-key 参数值为 RoutingKey2则该消息成为 “死信” 后会将路由键更改为 RoutingKey2从而进入死信交换机中的死信队列。如果没有配置 x-dead-letter-routing-key 参数则该消息成为 “死信” 后路由键不会更改也不会进入死信队列。 当配置了 x-dead-letter-routing-key 参数后消息成为 “死信” 后会在消息的 Header 中添加很多奇奇怪怪的字段我们可以在死信队列的消费端通过以下方式进行打印
log.info(死信消息properties: {}, message.getMessageProperties());日志内容如下
2024-01-07 21:16:19.745 INFO 11776 --- [ntContainer#3-1] c.d.receiver.DeadLetterMessageReceiver :消息properties: MessageProperties [headers{x-first-death-exchangedemo.simple.business.exchange, x-death[{reasonrejected, count1, exchangedemo.simple.business.exchange, timeSun Jan 07 21:16:19 CST 2024, routing-keys[], queuedemo.simple.business.queuea}], x-first-death-reasonrejected, x-first-death-queuedemo.simple.business.queuea}, contentTypetext/plain, contentEncodingUTF-8, contentLength0, receivedDeliveryModePERSISTENT, priority0, redeliveredfalse, receivedExchangedemo.simple.deadletter.exchange, receivedRoutingKeydemo.simple.deadletter.queuea.routingkey, deliveryTag1, consumerTagamq.ctag-RPfmKjM8Lau9X7Fl0CtbEA, consumerQueuedemo.simple.deadletter.queuea]格式化后 Header 中看起来有很多信息实际上并不多只是值比较长而已。下面就简单说明一下 Header 中的值
字段名含义x-first-death-exchange第一次成为死信时的交换机名称。x-first-death-reason第一次成为死信的原因rejected消息在进入队列时被队列拒绝。expired消息过期。maxlen队列内消息数量超过队列最大容量。x-first-death-queue第一次成为死信时的队列名称。x-death历史被投入死信交换机的信息列表同一个消息每进入一次死信交换机这个数组的信息就会被更新。
1.4 死信队列的应用场景
通过上面的信息我们已经知道如何使用死信队列了那么死信队列一般在什么场景下使用呢
死信队列 一般用在较为重要的业务队列中确保未被正确消费的消息不被丢弃一般发生消费异常可能原因主要是消息信息本身存在错误导致处理异常处理过程中参数校验异常或者因网络波动导致的查询异常等等。当发生异常时当然 不能每次通过日志来获取原消息然后让运维帮忙重新投递消息 没错以前很多人这么干的 。通过配置死信队列可以让未正确处理的消息暂存到另一个队列中待后续排查清楚问题后编写相应的处理代码来处理死信消息这样比手工恢复数据要好得多。
1.5 死信消息的生命周期
死信消息的生命周期如下
业务消息被 投入业务队列消费者 消费业务队列的消息由于处理过程中 发生异常于是 进行了 Nack 或 Reject 操作被 Nack 或 Reject 的消息由 RabbitMQ 投递到死信交换机中死信交换机将消息 投入相应的死信队列死信队列的消费者 消费死信消息。 二、代码实现
2.1 死信队列的配置步骤
死信队列的配置可以分为以下三步
配置业务队列绑定到业务交换机上为业务队列 配置死信交换机、路由键为死信交换机 配置死信队列。 注意 并不是直接声明一个公共的死信队列然后所有死信消息就会自己进入死信队列中了。而是为每个需要使用死信的业务队列配置一个死信交换机这里同一个项目的死信交换机可以共用一个然后每个业务队列分配一个单独的路由键。 有了死信交换机和路由键后接下来就像配置业务队列一样配置死信队列并绑定在死信交换机上。看到这里大家应该可以明白
死信队列 并不是什么特殊的队列只不过是绑定在死信交换机上的队列。死信交换机 也不是什么特殊的交换机只不过是用来接收死信队列的交换机。
所以死信交换机可以为任何类型【Direct、Fanout、Topic】。一般来说因为开发过程中会为每个业务队列分配一个独有的路由 key并对应的配置一个死信队列进行监听。
有了前面的这些描述后我们接下来实战操作一下。
2.2 配置类
配置类中声明了两个交换机
业务交换机广播绑定了两个业务队列 业务队列A业务队列B。 死信交换机直连绑定了两个死信队列并配置了相应的路由键 死信队列A死信队列B。
RabbitMQConfig.java
package com.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** p Title RabbitMQOrderConfig* p Description RabbitMQ配置** author ACGkaka* date 2023/12/22 14:05*/
Configuration
public class RabbitMQConfig {/** 业务队列 */public static final String BUSINESS_EXCHANGE_NAME demo.simple.business.exchange;public static final String BUSINESS_QUEUEA_NAME demo.simple.business.queuea;public static final String BUSINESS_QUEUEB_NAME demo.simple.business.queueb;/** 死信队列 */public static final String DEAD_LETTER_EXCHANGE demo.simple.deadletter.exchange;public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY demo.simple.deadletter.queuea.routingkey;public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY demo.simple.deadletter.queueb.routingkey;public static final String DEAD_LETTER_QUEUEA_NAME demo.simple.deadletter.queuea;public static final String DEAD_LETTER_QUEUEB_NAME demo.simple.deadletter.queueb;// 声明业务交换机广播Beanpublic FanoutExchange businessExchange() {return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}// 声明死信交换机直连Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 声明业务队列ABeanpublic Queue businessQueueA() {MapString, Object args new HashMap(2);// 声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, DEAD_LETTER_EXCHANGE);// 声明当前队列绑定的死信路由键args.put(x-dead-letter-routing-key, DEAD_LETTER_QUEUEA_ROUTING_KEY);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();}// 声明业务队列BBeanpublic Queue businessQueueB() {MapString, Object args new HashMap(2);// 声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, DEAD_LETTER_EXCHANGE);// 声明当前队列绑定的死信路由键args.put(x-dead-letter-routing-key, DEAD_LETTER_QUEUEB_ROUTING_KEY);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();}// 声明死信队列ABeanpublic Queue deadLetterQueueA() {return new Queue(DEAD_LETTER_QUEUEA_NAME);}// 声明死信队列BBeanpublic Queue deadLetterQueueB() {return new Queue(DEAD_LETTER_QUEUEB_NAME);}// 声明业务队列A绑定关系Beanpublic Binding businessBindingA(Queue businessQueueA, FanoutExchange businessExchange) {return BindingBuilder.bind(businessQueueA).to(businessExchange);}// 声明业务队列B绑定关系Beanpublic Binding businessBindingB(Queue businessQueueB, FanoutExchange businessExchange) {return BindingBuilder.bind(businessQueueB).to(businessExchange);}// 声明死信队列A绑定关系Beanpublic Binding deadLetterBindingA(Queue deadLetterQueueA, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueueA).to(deadLetterExchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);}// 声明死信队列B绑定关系Beanpublic Binding deadLetterBindingB(Queue deadLetterQueueB, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueueB).to(deadLetterExchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);}
}2.3 配置文件
application.yml
server:port: 8081spring:application:name: springboot-rabbitmq-dead-letterrabbitmq:# 此处不建议单独配置host和port单独配置不支持连接RabbitMQ集群addresses: 127.0.0.1:5672username: guestpassword: guest# 虚拟host 可以不设置使用server默认hostvirtual-host: /# 是否开启发送端消息抵达队列的确认publisher-returns: true# 发送方确认机制默认为NONE即不进行确认SIMPLE同步等待消息确认CORRELATED异步确认publisher-confirm-type: correlated# 消费者监听相关配置listener:simple:acknowledge-mode: manual # 确认模式默认auto自动确认manual手动确认default-requeue-rejected: false # 消费端抛出异常后消息是否返回队列默认值为trueprefetch: 1 # 限制每次发送一条数据concurrency: 1 # 同一个队列启动几个消费者max-concurrency: 1 # 启动消费者最大数量# 重试机制retry:# 开启消费者(程序出现异常)重试机制默认开启并一直重试enabled: true# 最大重试次数max-attempts: 3# 重试间隔时间(毫秒)initial-interval: 30002.4 生产者
为了方便测试写一个简单的消息生产者通过controller层来生产消息。
SendMessageController.java
import com.demo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** p Title SendMessageController* p Description 推送消息接口** author ACGkaka* date 2023/1/12 15:23*/
Slf4j
RestController
public class SendMessageController {/*** 使用 RabbitTemplate这提供了接收/发送等方法。*/Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendMessage)public String sendMessage(String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, , message);return OK;}
}2.5 业务消费者
接下来是业务队列的消费端代码
BusinessMessageReceiver.java
import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** p Title BusinessMessageReceiver* p Description RabbitMQ业务队列消费端** author ACGkaka* date 2024/1/7 17:43*/
Slf4j
Component
public class BusinessMessageReceiver {RabbitListener(queues RabbitMQConfig.BUSINESS_QUEUEA_NAME)public void receiveA(String body, Message message, Channel channel) throws IOException {log.info(业务队列A收到消息: {}, body);try {if (body.contains(deadletter)) {throw new RuntimeException(dead letter exception);}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error(业务队列A消息消费发生异常error msg: {}, e.getMessage(), e);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}RabbitListener(queues RabbitMQConfig.BUSINESS_QUEUEB_NAME)public void receiveB(String body, Message message, Channel channel) throws IOException {log.info(业务队列B收到消息: {}, body);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}2.6 死信消费者
接下来是死信队列的消费端代码
DeadLetterMessageReceiver.java
import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** p Title DeadLetterMessageReceiver* p Description RabbitMQ死信队列消费端** author ACGkaka* date 2024/1/7 18:14*/
Slf4j
Component
public class DeadLetterMessageReceiver {RabbitListener(queues RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)public void receiveA(String body, Message message, Channel channel) throws IOException {log.info(死信队列A收到消息: {}, body);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}RabbitListener(queues RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)public void receiveB(String body, Message message, Channel channel) throws IOException {log.info(死信队列B收到消息: {}, body);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}2.7 测试结果
消费正常消息请求结果
请求地址http://localhost:8081/sendMessage?messageHello 从日志可以看到两个业务队列成功消费。 消费错误消息请求结果
请求地址http://localhost:8081/sendMessage?messagedeadletter 从日志可以看到业务队列A和B都收到了消息但是 业务队列A消费发生异常然后消息就被 转到了死信队列死信队列消费端成功消费。 三、总结
死信队列其实并没有什么神秘的地方不过是绑定在死信交换机上的普通队列而死信交换机也只是一个普通的交换机不过是用来专门处理死信的交换机。
死信消息 时 RabbitMQ 为我们做的一层保障其实我们 也可以不使用死信队列而是 在消息消费异常的时候将消息主动投递到另一个交换机中当你明白了这些之后这些 Exchange 和 Queue 想怎样配合就可以怎样配合。比如
从死信队列拉取消息然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间来进行延时消费等等。
整理完毕完结撒花~ 参考地址
1.【RabbitMQ】一文带你搞定RabbitMQ死信队列https://cloud.tencent.com/developer/article/1463065