ping站长工具,海安县住房和城乡建设局网站,网站建设备案计划书,网站pv uv统计如何确保RabbitMQ消息的可靠性#xff1f;
开启生产者确认机制#xff0c;确保生产者的消息能到达队列开启持久化功能#xff0c;确保消息未消费前在队列中不会丢失开启消费者确认机制为auto#xff0c;由spring确认消息处理成功后完成ack开启消费者失败重试机制#xff…如何确保RabbitMQ消息的可靠性
开启生产者确认机制确保生产者的消息能到达队列开启持久化功能确保消息未消费前在队列中不会丢失开启消费者确认机制为auto由spring确认消息处理成功后完成ack开启消费者失败重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机交由人工处理
1.生产者确认机制 对应配置:
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug # Debug Info Warn Error Fatal
spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /publisher-confirm-type: correlated #ConfirmCallback 生产者消费确认到交换机publisher-returns: true #ConfirmCallback ReturnCallback 到队列template:mandatory: true启动配置类 每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目启动过程中配置
ApplicationContextAware -bean工厂通知-拿到rabbitTemplate
Slf4j
Configuration
//生产者消息确认确认信心到达队列
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {//判断是否是延迟消息if(message.getMessageProperties().getReceivedDelay()0){return;}//失败时才会回调//处理记录日志log.error(消息发送到队列失败响应码{}失败原因{}交换机{}路由key{}消息{},replyCode,replyText,exchange,routingKey,message);//可以得到所有的错误信息有需要的话可以选择重发信息});}
}消息发送 Test//生产者消息确认确认信息到达交换机public void testSendMessage2SimpleQueue1() throws InterruptedException {String routingKey red;// 1.消息体String message hello, spring amqp!;// 2.全局唯一的消息ID需要封装到CorrelationData中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(confirm - {if (confirm.isAck()){//ASClog.debug(消息发送到交换机成功ID:{},correlationData.getId());}else {//nASClog.debug(消息发送到交换机失败ID:{}原因{},correlationData.getId(),confirm.getReason());}}, throwable - {log.error(消息发送异常, ID:{}, 原因{},correlationData.getId(),throwable.getMessage());});// 4.发送消息rabbitTemplate.convertAndSend(exchange.direct, routingKey, message,correlationData);// 休眠一会儿等待ack回执Thread.sleep(2000);}2.消息持久化
交换机持久化 RabbitMQ中交换机默认是非持久化的mq重启后就丢失 默认情况下由SpringAMQP声明的交换机都是持久化的
Bean
public DirectExchange simpleExchange(){// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange(simple.direct, true, false);
}RabbitListener
value Queue(name dl.ttl.queue, durable true), 持久化exchange Exchange(name dl.ttl.direct,durable true), //死信交换机队列持久化 RabbitMQ中队列默认是非持久化的mq重启后就丢失 默认情况下由SpringAMQP声明的队列都是持久化的
Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列durable就是持久化的return QueueBuilder.durable(simple.queue).build();
}消息持久化 利用SpringAMQP发送消息时可以设置消息的属性MessageProperties指定delivery-mode 默认情况下SpringAMQP发出的任何消息都是持久化的不用特意指定
3.1消费者确认机制
RabbitMQ是阅后即焚机制RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的消费者获取消息后应该向RabbitMQ发送ACK回执表明自己已经处理消息。
设想这样的场景
RabbitMQ投递消息给消费者消费者获取消息后返回ACK给RabbitMQRabbitMQ删除消息消费者宕机消息尚未处理
这样消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式
manual手动ack需要在业务代码结束后调用api发送ack。auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nacknone关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除
由此可知
none模式下消息投递是不可靠的可能丢失auto模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ackmanual自己根据业务情况判断什么时候该ack
一般我们都是使用默认的auto即可
3.2消费失败重试机制
重试接收的交换机及队列配置类
Configuration
public class ExchangeErrorQueueConfig {private final String ExchangeName error.direct;private final String QueueName error.queue;private final String RoutingKey error;Bean//定义错误交换机public DirectExchange errorMessageExchange(){return new DirectExchange(ExchangeName);}//定义错误处理队列Beanpublic Queue errorQueue(){return new Queue(QueueName);}//将交换机和队列绑定Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RoutingKey);}//定义一个RepublishMessageRecoverer关联队列和交换机Beanpublic RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,ExchangeName,RoutingKey);}
}消费者两种模式配置
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug
spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1#acknowledge-mode: none # 关闭ack 消息处理抛异常时消息依然被RabbitMQ删除acknowledge-mode: auto # ack 自动返回结果retry:enabled: true # 开启消费者失败重试 在消费者本地重试不会返回队列initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false