销售网站建设的会计分录,南京华佑千家装饰工程有限公司,网上购物软件哪个好,快速搭建网站wordpress保证MQ消息的可靠性#xff0c;主要从三个方面#xff1a;发送者确认可靠性#xff0c;MQ确认可靠性#xff0c;消费者确认可靠性。
1.发送者可靠性#xff1a;主要依赖于发送者重试机制#xff0c;发送者确认机制#xff1b;
发送者重试机制#xff0c;其实就是配置…保证MQ消息的可靠性主要从三个方面发送者确认可靠性MQ确认可靠性消费者确认可靠性。
1.发送者可靠性主要依赖于发送者重试机制发送者确认机制
发送者重试机制其实就是配置文件配置重试规则当消息发送失败后会根据配置的重试次数进行多次发送重试如代码
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数下次等待时长 initial-interval * multipliermax-attempts: 3 # 最大重试次数
发送者确认机制则是依赖于消息的回执这其中包括发送者回执和消费者回执两种但是这种回执都比较耗性能会导致消息消费的很慢。并且这也是需要在配置文件中做配置的
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制并设置confirm类型publisher-returns: true # 开启publisher return机制
并且还要有代码的实现这种方式极大的影响了性能
Slf4j
AllArgsConstructor
Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returned) {log.error(触发return callback,);log.debug(exchange: {}, returned.getExchange());log.debug(routingKey: {}, returned.getRoutingKey());log.debug(message: {}, returned.getMessage());log.debug(replyCode: {}, returned.getReplyCode());log.debug(replyText: {}, returned.getReplyText());}});}
}
Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑基本不会触发log.error(send message fail, ex);}Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑参数中的result就是回执内容if(result.isAck()){ // result.isAck()boolean类型true代表ack回执false 代表 nack回执log.debug(发送消息成功收到 ack!);}else{ // result.getReason()String类型返回nack时的异常描述log.error(发送消息失败收到 nack, reason : {}, result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend(hmall.direct, q, hello, cd);
}
2.MQ自身的可靠性交换机/队列/消息都实现持久化消息不会丢失如果是在项目中通过代码创建的交换机/队列/消息spring默认就是持久化的如果在mq的客户端手工配置那就要选定各个参数了。持久化后的消息会直接进入磁盘不在经过内存了正常来讲有IO的操作会慢才对但是在实际的操作中却是非常快。
MQ队列最怕的就是消息积压导致内存溢出。在3.12版本以后MQ直接默认就是Laz懒惰队列的模式了这个模式会直接加载到磁盘当用到消息的时候会从磁盘加载到内存磁盘空间很大支持数百万级别的存储所以内存溢出的可能性就会大大降低。我们可以在mq客户端手动设置为lazy队列也可以在代码中直接实现代码如下
RabbitListener(queuesToDeclare Queue(name lazy.queue,durable true,arguments Argument(name x-queue-mode, value lazy)
))
public void listenLazyQueue(String msg){log.info(接收到 lazy.queue的消息{}, msg);
}
3.消费者的可靠性
3.1消费者消费消息后向MQ发送回执让MQ知道消息是否正常被消费了目前回执有三种
ack成功处理了消息MQ从队列中就会删除消息正常。
nack失败处理了消息MQ需要再次投递消息这会出现一直重试的问题。
reject消息失败并拒绝了消息并且从队列中删除了消息。这个消息被删除了岂不是数据就丢失了。
对于以上三种回执基本回执都是固定的AMQP提供了消息确认的方式不用写代码配置就可以,配置有三种none-配置它失败了消息会被删除auto-失败了消息会回到MQ重新投递不会丢失不会被删除manual-太麻烦算了。
不过对于auto的配置对于返回的异常会有两种判断1如果是业务异常会自动返回nack
如果是消息处理或者校验异常会直接进行reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
3.2 生产者有重试机制消费者也有重试机制但是对于消费者的重试如果一直失败那就要有一定的策略可以把这个失败的消息放到另一个交换机上后续人工进行干预这样可以保证消息不丢失。
对于消费者的重试配置
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false
如何把消息发送到另一个交换机上呢
在消费者服务定义一个处理失败消息队列的交换机这样就可以把消息存储过去了
Configuration
ConditionalOnProperty(name spring.rabbitmq.listener.simple.retry.enabled, havingValue true)
public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}