江门建设企业网站,网络营销策略包括哪几大策略,怎么设计公司的网站,普洱市网站建设制作1. 概念
死信队列#xff08;Dead Letter Queue#xff09;是在消息队列系统中的一种特殊队列#xff0c;用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”#xff0c;例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时#xff0c;…1. 概念
死信队列Dead Letter Queue是在消息队列系统中的一种特殊队列用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时它们会被路由到死信队列中以便进行进一步处理或分析。 死信队列能够帮助系统进行消息跟踪、监控和处理异常情况是消息队列系统中的重要组成部分。 2. 应用场景
死信队列在消息队列系统中有多种应用场景包括但不限于以下几个方面 延迟消息处理实现延迟消息投递例如实现消息的定时投递、消息重试机制等。 任务调度用于实现任务调度系统例如延迟执行任务、失败重试任务等。 异常处理处理消息消费失败或超时的情况对异常消息进行统一处理。 业务流程控制实现业务流程中的状态控制和超时处理例如订单超时取消、支付超时处理等。 监控和统计对异常消息进行统计和分析用于系统性能监控和问题排查。
这些应用场景展示了死信队列的灵活性和实用性在实际系统开发中具有广泛的应用价值。 3. 造成消息进入死信队列的原因
消息成为死信的原因有以下几种 消息被拒绝basic.reject或basic.nack并且requeue标志被设置为false。若参数requeue为true则表示还可以将此跳消息重新塞回普通队列若为false则消息被拒绝后直接进入死信队列。 消息过期。在生产者设置生产时设置若消费者未在过期时间内消费消息则消息被转发到死信队列中。x-message-ttl 队列达到最大长度。当普通队列中消息堆积数量长度达到了maxLength则会将新接收的消息转发到死信队列中去从而避免消息丢失。 4. 死信队列工作流程图 5. 代码示例
5.1 引入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.7.15/version
/dependency
5.2 RabbitMQ配置
Configuration
public class RabbitConfig {/*** 死信队列消息模型构建----------------------------------------------------------------------------------**/// 创建普通队列Beanpublic Queue basicQueue() {MapString, Object params new HashMap(8);// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称params.put(x-dead-letter-exchange, Exchange.DEMO_DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。params.put(x-dead-letter-routing-key, RoutingKey.DEMO_DEAD_ROUTING_KEY);// 注意这里是毫秒单位这里我们给10秒params.put(x-message-ttl, 10*1000);return new Queue(MyQueue.DEMO_CONSUMER_QUEUE, true, false, false, params);}//创建“基本消息模型”的基本交换机面向生产者Beanpublic TopicExchange basicExchange() {//创建并返回基本交换机实例return new TopicExchange(Exchange.DEMO_BASIC_NORMAL_EXCHANGE, true, false);}//创建“基本消息模型”的基本绑定基本交换机基本路由面向生产者Beanpublic Binding basicBinding() {//创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定不叫死信路由)return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RoutingKey.DEMO_ROUTING_KEY);}// 创建死信交换机Beanpublic TopicExchange deadLetterExchange() {//创建并返回死信交换机实例return new TopicExchange(Exchange.DEMO_DEAD_LETTER_EXCHANGE, true, false);}// 创建第二个中转站// 创建死信队列Beanpublic Queue deadLetterQueue() {return new Queue(MyQueue.DEMO_DEAD_LETTER_QUEUE, true);}// 创建死信路由及其绑定Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(RoutingKey.DEMO_DEAD_ROUTING_KEY);}public static class Exchange {public static final String DEMO_BASIC_NORMAL_EXCHANGE demo.basic.exchange;public static final String DEMO_DEAD_LETTER_EXCHANGE demo.dead.letter.exchange;}public static class RoutingKey {//交换机与报表队列绑定的RoutingKeypublic static final String DEMO_ROUTING_KEY demo.basic.routing.key;public static final String DEMO_DEAD_ROUTING_KEY demo.dead.routing.key;}/*** 队列名称* author peng.zhang* date 2024/01/30*/public static class MyQueue {//报表队列名称public static final String DEMO_CONSUMER_QUEUE demo.basic.queue;//死信队列名称public static final String DEMO_DEAD_LETTER_QUEUE demo.dead.letter.queue;}
}
5.3 消息生产者
RestController
RequestMapping(/test)
Slf4j
public class TestController {Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送消息到死信队列*/PostMapping(/testDeadQueue)public String testDeadQueue() {// 设置生产者到交换机的确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {log.info(correlationData:{}, ack:{}, cause:{}, JSON.toJSONString(correlationData), ack, cause);});// 设置消息未被队列接收时的返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, routing) - {log.info(message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}, JSON.toJSONString(message),replyCode, replyText, ex, routing);});// 生成关联数据并发送消息到交换机CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 消息内容String messageBody StrUtil.format(this message send at {}, DateUtil.format(LocalDateTime.now(), yyyy-MM-dd HH:mm:ss));rabbitTemplate.convertAndSend(RabbitConfig.Exchange.DEMO_BASIC_NORMAL_EXCHANGE, RabbitConfig.RoutingKey.DEMO_ROUTING_KEY, messageBody, correlationData);log.info({}, 发送消息:{}, DateUtil.format(LocalDateTime.now(), yyyy-MM-dd HH:mm:ss), messageBody);return OK;}}
5.4 消息消费者
Component
Slf4j
public class DeadLetterConsumer {/*** 监听 DEMO_CONSUMER_QUEUE 并处理传入的消息。* 为测试目的抛出 IOException 以模拟异常。** param messageBody 消息负载* param headers 消息头* param channel 用于消息确认的通道* throws IOException 如果抛出异常*/RabbitListener(queues RabbitConfig.MyQueue.DEMO_CONSUMER_QUEUE)RabbitHandlerpublic void testBasicQueueAndThrowsException(Payload String messageBody, Headers MapString, Object headers, Channel channel) throws IOException {/*** Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时会附带一个 Delivery Tag* 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。* RabbitMQ 保证在每个信道中每条消息的 Delivery Tag 从 1 开始递增。*/Long tag (Long) headers.get(AmqpHeaders.DELIVERY_TAG);log.info({} 普通队列消费, tag {}, 消息内容:{}, DateUtil.format(LocalDateTime.now(), yyyy-MM-dd HH:mm:ss), tag, messageBody);/*** multiple 取值为 false 时表示通知 RabbitMQ 当前消息被确认* 如果为 true则额外将比第一个参数指定的 delivery tag 小的消息一并确认*/// ACK,确认一条消息已经被消费
// channel.basicAck(deliveryTag, false);// 对应的业务操作。。。。。// doBusiness();// 模拟消息拒绝channel.basicNack(tag, false, false);}/*** 处理业务逻辑*/private void doBusiness() {System.out.println(here do some business code);}/*** 监听死信队列并处理消息。** param data 消息内容* param tag 消息标签* param channel 通道*/RabbitListener(queues RabbitConfig.MyQueue.DEMO_DEAD_LETTER_QUEUE)RabbitHandlerpublic void fromDeadLetter(Payload String data, Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {log.info({} 死信队列消费, tag {}, 消息内容:{}, DateUtil.format(LocalDateTime.now(), yyyy-MM-dd HH:mm:ss), tag, data);// 对应的业务操作。。。。。}
}
5.5 YML配置
spring:rabbitmq:username: rabbitmqpassword: rabbitmqport: 5672host: 127.0.0.1#publisher-confirm-type参数有三个可选值#SIMPLE会触发回调方法相当于单个确认发一条确认一条。#CORRELATED消息从生产者发送到交换机后触发回调方法。#NONE默认关闭发布确认模式。publisher-confirm-type: correlatedtemplate:receive-timeout: 1800000reply-timeout: 1800000retry:enabled: falselistener:direct:retry:enabled: truedefault-requeue-rejected: falsesimple:retry:# 是否开启消费者重试为false时关闭消费者重试这时消费端代码异常会一直重复收到消息enabled: true# 最大重试次数max-attempts: 1# 重试间隔时间单位毫秒initial-interval: 10000# 重试最大时间间隔单位毫秒max-interval: 300000# 应用于前一重试间隔的乘法器multiplier: 5default-requeue-rejected: false
5.6 控制台输出
从控制台可以看出消息被拒绝后大概10秒后死信队列消息被消费。