想开个网站做外贸怎么做,产品设计方案格式模板,女教师网课入06654侵录屏,济南网站建设铭盛信息文章目录 1. ☃️概述2. ☃️生产者消息确认2.1 ❄️❄️概述2.2 ❄️❄️实战⛷️⛷️⛷️2.2.1 修改配置⛷️⛷️⛷️2.2.2 定义 Return 回调⛷️⛷️⛷️2.2.3 定义ConfirmCallback 3. ☃️消息持久化3.1 ❄️❄️交换机持久化3.2 ❄️❄️队列持久化3.3 ❄️❄️消息持久化… 文章目录 1. ☃️概述2. ☃️生产者消息确认2.1 ❄️❄️概述2.2 ❄️❄️实战⛷️⛷️⛷️2.2.1 修改配置⛷️⛷️⛷️2.2.2 定义 Return 回调⛷️⛷️⛷️2.2.3 定义ConfirmCallback 3. ☃️消息持久化3.1 ❄️❄️交换机持久化3.2 ❄️❄️队列持久化3.3 ❄️❄️消息持久化 4. ☃️消费者消息确认4.1 ❄️❄️三种确认模式4.2 ❄️❄️消息失败重试机制⛷️⛷️⛷️4.2.1 本地重试机制4.2.2 ⛷️⛷️⛷️失败策略 5. ☃️总结 1. ☃️概述
消息从发送到消费者接收 会经历的过程如下
丢失消息的可能性
发送时丢失 生产者发送的消息未送达exchange消息到达exchange后未到达queue MQ宕机queue将消息丢失consumer接收到消息后未消费就宕机
针对这些问题RabbitMQ分别给出了解决方案
生产者确认机制mq持久化消费者确认机制失败重试机制 2. ☃️生产者消息确认 2.1 ❄️❄️概述
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。
返回结果有两种方式
publisher-confirm发送者确认 消息成功投递到交换机返回ack消息未投递到交换机返回nack publisher-return发送者回执 消息投递到交换机了但是没有路由到队列。返回ACK及路由失败原因。 2.2 ❄️❄️实战 ⛷️⛷️⛷️2.2.1 修改配置
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true配置说明 publish-confirm-type开启publisher-confirm这里支持两种类型 simple同步等待confirm结果直到超时correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback publish-returns开启publish-return功能同样是基于callback机制不过是定义ReturnCallbacktemplate.mandatory定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息 ⛷️⛷️⛷️2.2.2 定义 Return 回调
每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目加载时配置修改publisher服务添加一个
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 投递失败记录日志log.info(消息发送失败应答码{}原因{}交换机{}路由键{},消息{},replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要可以重发消息});}
}⛷️⛷️⛷️2.2.3 定义ConfirmCallback
ConfirmCallback 可以在发送消息时指定因为每个业务处理 confirm 成功或失败的逻辑不一定相同。
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message hello, spring amqp!;// 2.全局唯一的消息ID需要封装到 CorrelationData 中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result - {if(result.isAck()){// 3.1.ack消息成功log.debug(消息发送成功, ID:{}, correlationData.getId());}else{// 3.2.nack消息失败log.error(消息发送失败, ID:{}, 原因{},correlationData.getId(), result.getReason());}},ex - log.error(消息发送异常, ID:{}, 原因{},correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend(task.direct, task, message, correlationData);// 休眠一会儿等待ack回执//Thread.sleep(20);
}3. ☃️消息持久化
生产者确认可以确保消息投递到 RabbitMQ 的队列中但是消息发送到 RabbitMQ 以后如果突然宕机也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存必须开启消息持久化机制。
交换机持久化队列持久化消息持久化 3.1 ❄️❄️交换机持久化
Bean
public DirectExchange simpleExchange(){// 三个参数①交换机名称、②是否持久化、③当没有queue与其绑定时是否自动删除return new DirectExchange(simple.direct, true, false);
}事实上默认情况下由SpringAMQP声明的交换机都是持久化的。
3.2 ❄️❄️队列持久化
Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列durable就是持久化的return QueueBuilder.durable(simple.queue).build();
}事实上默认情况下由SpringAMQP声明的队列都是持久化的。 3.3 ❄️❄️消息持久化
默认情况下SpringAMQP 交换机 队列 以及发出的任何消息都是持久化的不用特意指定。 4. ☃️消费者消息确认
RabbitMQ 是 阅后即焚 机制RabbitMQ 确认消息被消费者消费后会立刻删除。
而 RabbitMQ 是通过 消费者回执 来确认消费者是否成功处理消息的消费者获取消息后应该向 RabbitMQ 发送 ACK 回执表明自己已经处理消息。
设想这样的场景
1RabbitMQ投递消息给消费者2消费者获取消息后返回ACK给RabbitMQ3RabbitMQ删除消息4消费者宕机消息尚未处理
这样消息就丢失了。因此消费者返回ACK的时机非常重要。 4.1 ❄️❄️三种确认模式
而 SpringAMQP 则允许配置三种确认模式•manual手动ack需要在业务代码结束后调用api发送ack。•auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack。•none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除存在丢失消息的风险。
由此可知
none 模式下消息投递是不可靠的可能丢失auto 模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ackmanual自己根据业务情况判断什么时候该ack
一般我们都是使用默认的 auto 即可。
相关配置
spring:rabbitmq:listener:simple:#acknowledge-mode: none # 关闭ack#acknowledge-mode: manual # 手动ackacknowledge-mode: auto # 自动ack4.2 ❄️❄️消息失败重试机制
当消费者出现异常后消息会不断 requeue重入队到队列再重新发送给消费者然后再次异常再次 requeue无限循环导致mq的消息处理飙升带来不必要的压力怎么办呢 ⛷️⛷️⛷️4.2.1 本地重试机制
我们可以利用 Spring 的 retry 机制在消费者出现异常时利用 本地重试而不是无限制的 requeue 到 mq 队列。修改 consumer 服务的 application.yml 文件添加内容
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false重启consumer服务重复之前的测试。可以发现
在重试3次后SpringAMQP 会抛出异常 AmqpRejectAndDontRequeueException说明本地重试触发了。查看 RabbitMQ 控制台发现消息被删除了说明最后 SpringAMQP 返回的是ackmq删除消息了。
结论
开启本地重试时消息处理过程中抛出异常不会 requeue 到队列而是在消费者本地重试重试达到最大次数后Spring 会返回 ack消息会被丢弃。 4.2.2 ⛷️⛷️⛷️失败策略
在之前的测试中达到最大重试次数后消息会被丢弃这是由 Spring 内部机制决定的。
在开启重试模式后重试次数耗尽如果消息依然失败则需要有 MessageRecovery 接口来处理它包含三种不同的实现
RejectAndDontRequeueRecoverer重试耗尽后直接 reject丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer重试耗尽后返回 nack消息重新入队RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列后续由人工集中处理。1在consumer服务中定义处理失败消息的交换机和队列2定义一个RepublishMessageRecoverer关联队列和交换机代码如下
Configuration
public class ErrorMessageConfig {Bean // 处理失败消息的交换机public DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Bean // 处理失败消息的队列public Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}其实 我们在生产中会指定死信交换机来处理失败的消息 5. ☃️总结
如何确保RabbitMQ消息的可靠性
开启生产者确认机制确保生产者的消息能到达队列开启持久化功能确保消息未消费前在队列中不会丢失开启消费者确认机制为auto由spring确认消息处理成功后完成ack开启消费者失败重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机交由人工处理