怎么做公司网站需要什么,阿里云服务器责任怎么做网站,建立 wiki 网站,明港网站建设背景
某某会员小程序后台提供开放平台能力#xff0c;为三方油站提供会员积分、优惠劵等api。当用户在油站加油#xff0c;油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢#xff0c;三方调用发放积分接口性能极低#xff0c;耗时30s#xff1b;
接口情况…
背景
某某会员小程序后台提供开放平台能力为三方油站提供会员积分、优惠劵等api。当用户在油站加油油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢三方调用发放积分接口性能极低耗时30s
接口情况
发放积分接口业务负责且用存储过程写的业务改动风险极大
数据库情况
优惠卷等表数据量800w甚至存在单表3000w
优化方案 数据库数据归档
归档交易、用户优惠劵等表历史数据比如归档三个月前的数据根据实际情况补充归档条件如用户优惠劵没使用或没过期的数据不能归档优化效果存储过程耗时从30s降低到7s但是作为Toc用途接口性能远远不达标优化数据库sql或许能进一步降低响应时间但是存储过程复杂优化费时费力风险大
方案描述风险工作量难度是否能解决性能问题是否解决并发冲突影响使用技术方案1java重写存储过程业务大大大一定程度能解决yes改动点多业务影响大java orm方案2保证存储过程全局串行执行小小大noyes接口性能会降低分布式锁方案3异步下存储过程全局串行执行中中中yesyesrabbitmq分布式锁自旋锁 线程池异步化分析
接口中存储串行调用改为异步调用 使用线程池异步化存在问题 开始简单使用线程池异步化但是出现锁表的情况原因存储过程没有保证原子性并且其中大量使用临时表并发下出现竞争锁表而SqlServer自带的死锁检查机制杀死事务导致发放积分失败 线程池分布式锁
异步线程【不能保证分布式环境的全局顺序执行】使用分布式锁能保证同一个时间只有一个存储过程执行问题但是并发情况会将存储过程执行堆积在线程池并发过大存在OOM风险或者处理丢失风险 rabbitmq异步改造 可行性验证报告结论
验证通过点如下
测试rabbitmq发送/接收消息【通过】测试并发下分布式锁自旋锁保证业务串行执行【通过】测试并发下分布式锁自旋锁mq保证业务串行执行【通过】测试业务幂等性保证不重复消费【通过】测试手动ack兼容原来配置保证可靠性【通过】 当前项目rabbitmq使用方式问题分析
配置发下
spring.rabbitmq.host172.18.229.23
spring.rabbitmq.port5672
spring.rabbitmq.usernametotaltest
spring.rabbitmq.passwordtotaltest
spring.rabbitmq.virtual-host/totaltest/
spring.rabbitmq.publisher-confirmsfalse该配置没有
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual若是不配默认为
spring.rabbitmq.listener.direct.acknowledge-modeauto
spring.rabbitmq.listener.simple.acknowledge-modeautorabbitmq消费者ack机制问题分析
spring.rabbitmq.listener.direct.acknowledge-mode是用于配置Spring Boot应用中RabbitMQ消息监听器的确认模式。确认模式决定了在消费者处理消息后如何通知RabbitMQ服务器来确认消息的接收情况。该配置有以下几种可选的值
AUTO: 在这种模式下消费者处理消息后RabbitMQ会自动确认消息。这意味着消息一旦被消费者接收就会立即从队列中删除。这是默认的确认模式。MANUAL: 在这种模式下消费者需要显式地发送确认消息来告知RabbitMQ服务器消息已经被成功处理。这意味着消费者可以在处理消息后决定是否要确认消息。通常在需要进行消息处理的事务性操作时使用这种模式。NONE: 在这种模式下消费者不会发送任何确认消息也不会被要求发送确认消息。这意味着消息会在被传递给消费者之后立即被视为已经被确认。
问题项目中该配置使用的模式配置以为着没有手动ack即消费者接收到消息消息就会从mq中删除若是消费者消费异常则消息丢失不可追溯复原 rabbitmq生产者ack机制问题分析
项目中配置如下
spring.rabbitmq.publisher-confirmsfalsespring.rabbitmq.publisher-confirms是Spring Boot中用于配置RabbitMQ生产者消息确认的属性。它用于控制是否启用RabbitMQ的发布确认机制以确保消息成功发送到Exchange。当spring.rabbitmq.publisher-confirms属性设置为true时表示启用了RabbitMQ的发布确认机制。在这种情况下当生产者发送消息到Exchange后RabbitMQ会发送一个确认消息给生产者告知消息是否成功到达Exchange。生产者可以根据收到的确认消息来判断消息是否成功发送从而进行相应的处理。当spring.rabbitmq.publisher-confirms属性设置为false时表示禁用了RabbitMQ的发布确认机制。在这种情况下生产者发送消息到Exchange后不会收到确认消息也无法得知消息是否成功到达Exchange。通常情况下建议将spring.rabbitmq.publisher-confirms属性设置为true以确保消息的可靠发送。当然具体是否启用发布确认机制还取决于业务场景和对消息可靠性的要求。
rabbitmq消息可靠性问题分析
通过上诉【rabbitmq生产者ack机制问题分析】和【rabbitmq消费者ack机制问题分析】可知当前项目中消息没有保证消息可靠性rabbitmq宕机恢复、消费者消费异常都会导致消息丢失导致业务完整性缺失 rabbitmq配置最小改动方案
上诉问题若想得到解决需项目中rabbitmq配置会影响到原来所有使用mq的地方避免影响范围较大 解决方案新增消费者类似通过设置不同的消费者来实现接收指定的消息需要手动 ack
测试rabbitmq配置发送接收消息【通过】
rabbitmq和springboot对应版本3. Reference创建虚拟host创建测试 交换机和queues
Exchangeexchange-1
Queuequeue-1
keyspringboot.*spring.rabbitmq.listener.order.queue.namequeue-2
spring.rabbitmq.listener.order.queue.durabletrue
spring.rabbitmq.listener.order.exchange.nameexchange-2
spring.rabbitmq.listener.order.exchange.durabletrue
spring.rabbitmq.listener.order.exchange.typetopic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptionstrue
spring.rabbitmq.listener.order.keyspringboot.*发送消息
package com.bfxy.springboot.producer;import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import com.bfxy.springboot.entity.Order;Component
public class RabbitSender {private static final Logger LOGGER LoggerFactory.getLogger(RabbitSender.class);//自动注入RabbitTemplate模板类Autowiredprivate RabbitTemplate rabbitTemplate; //回调函数: confirm确认final ConfirmCallback confirmCallback new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.err.println(correlationData: correlationData);System.err.println(ack: ack);if(!ack){System.err.println(异常处理....);}}};//回调函数: return返回final ReturnCallback returnCallback new RabbitTemplate.ReturnCallback() {Overridepublic void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,String exchange, String routingKey) {System.err.println(return exchange: exchange , routingKey: routingKey , replyCode: replyCode , replyText: replyText);}};//发送消息方法调用: 构建Message消息public void send(Object message, MapString, Object properties) {LOGGER.info(消息内容:{},message);LOGGER.info(properties:{},properties);try {MessageHeaders mhs new MessageHeaders(properties);Message msg MessageBuilder.createMessage(message, mhs);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id 时间戳 全局唯一CorrelationData correlationData new CorrelationData(1234567890);rabbitTemplate.convertAndSend(exchange-1, springboot.abc, msg, correlationData);}catch (Exception e){LOGGER.error(发送消息异常message:{},message);}}//发送消息方法调用: 构建自定义对象消息public void sendOrder(Order order) {LOGGER.info(订单消息内容:{},order);try {rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id 时间戳 全局唯一CorrelationData correlationData new CorrelationData(0987654321);rabbitTemplate.convertAndSend(exchange-2, springboot.def, order, correlationData);}catch (Exception e){LOGGER.error(订单发送消息异常message:{},order);}}}
测试代码
package com.bfxy.springboot;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import com.bfxy.springboot.entity.Order;
import com.bfxy.springboot.producer.RabbitSender;RunWith(SpringRunner.class)
SpringBootTest
public class ApplicationTests {Testpublic void contextLoads() {}Autowiredprivate RabbitSender rabbitSender;private static SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss.SSS);Testpublic void testSender1() throws Exception {MapString, Object properties new HashMap();properties.put(number, 12345);properties.put(send_time, simpleDateFormat.format(new Date()));rabbitSender.send(Hello RabbitMQ For Spring Boot!, properties);}Testpublic void testSender2() throws Exception {Order order new Order(001, 第一个订单);rabbitSender.sendOrder(order);}} 接收消息
package com.bfxy.springboot.conusmer;import java.util.Map;import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;Component
public class RabbitReceiver {/*** 用于标识方法是一个RabbitMQ消息的监听方法用于监听指定的队列并在接收到消息时调用该方法进行处理。* 可以指定队列、交换机、路由键等属性用于配置消息监听的相关信息。* 通常与RabbitHandler一起使用将消息监听和消息处理方法关联起来。*/RabbitListener(bindings QueueBinding(value Queue(value queue-1, durabletrue),exchange Exchange(value exchange-1, durabletrue, type topic, ignoreDeclarationExceptions true),key springboot.* ))/*** 用于标识方法是一个RabbitMQ消息的处理方法。* 通常与RabbitListener一起使用用于指定具体的消息处理方法。* 通过RabbitHandler注解标识的方法可以处理多个不同类型的消息通过方法参数的类型来区分不同的消息类型。*/RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.err.println(--------------------------------------);System.err.println(消费端Payload: message.getPayload());Long deliveryTag (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}/*** * spring.rabbitmq.listener.order.queue.namequeue-2spring.rabbitmq.listener.order.queue.durabletruespring.rabbitmq.listener.order.exchange.nameexchange-1spring.rabbitmq.listener.order.exchange.durabletruespring.rabbitmq.listener.order.exchange.typetopicspring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptionstruespring.rabbitmq.listener.order.keyspringboot.** param order* param channel* param headers* throws Exception*/RabbitListener(bindings QueueBinding(value Queue(value ${spring.rabbitmq.listener.order.queue.name}, durable${spring.rabbitmq.listener.order.queue.durable}),exchange Exchange(value ${spring.rabbitmq.listener.order.exchange.name}, durable${spring.rabbitmq.listener.order.exchange.durable}, type ${spring.rabbitmq.listener.order.exchange.type}, ignoreDeclarationExceptions ${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}),key ${spring.rabbitmq.listener.order.key}))RabbitHandlerpublic void onOrderMessage(Payload com.bfxy.springboot.entity.Order order, Channel channel,Headers MapString, Object headers) throws Exception {System.err.println(--------------------------------------);System.err.println(消费端order: order.getId());Long deliveryTag (Long)headers.get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}}
断点测试 测试分布式锁自旋锁测试串行执行【通过】
测试并发分布式锁顺序执行业务代码
package com.bfxy.springboot;import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;RunWith(SpringRunner.class)
SpringBootTest
public class ApplicationTests {private static final Logger LOGGER LoggerFactory.getLogger(ApplicationTests.class);AutowiredRedissonClient redissonClient;Testpublic void contextLoads() {long startTime System.currentTimeMillis();for (int i 1;i5;i){int finalI i;CompletableFuture.runAsync(()-{bizLock(String.valueOf(finalI));});}while (true){}}private void bizLock(String taskName) {RLock lock redissonClient.getLock(my-lock);boolean locked false;try {while (!locked) {locked lock.tryLock();if (locked) {try {biz(3000, taskName);System.out.println(----------------);} finally {lock.unlock();}} else {// 未获取到锁可以进行一些等待操作比如休眠一段时间后再尝试获取锁Thread.sleep(100);}}} catch (Exception e) {e.printStackTrace();}}private void biz(Integer time,String taskName) throws Exception{long startTime System.currentTimeMillis();LOGGER.info(任务序号{},任务执行开始时间{},taskName,startTime);Thread.sleep(time);long endtime System.currentTimeMillis();LOGGER.info(任务序号{},任务执行结束时间{},taskName,startTime);LOGGER.info(任务序号{},任务执行消耗时间{},taskName,(endtime-startTime));}
}
执行日志如下【测试结果并发串行执行同一时间只有一个任务执行】
2024-07-10 13:46:49.587 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号1,任务执行开始时间1720590409587
2024-07-10 13:46:52.601 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号1,任务执行结束时间1720590409587
2024-07-10 13:46:52.601 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号1,任务执行消耗时间3014
----------------
2024-07-10 13:46:52.665 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号4,任务执行开始时间1720590412665
2024-07-10 13:46:55.678 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号4,任务执行结束时间1720590412665
2024-07-10 13:46:55.678 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号4,任务执行消耗时间3013
----------------
2024-07-10 13:46:55.759 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号2,任务执行开始时间1720590415759
2024-07-10 13:46:58.761 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号2,任务执行结束时间1720590415759
2024-07-10 13:46:58.761 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号2,任务执行消耗时间3002压力测试对比资源使用情况
结论使用线程池比较消耗资源特别是内存一点并发上来可能oom
压测前 200并发 1000并发 2000并发 测试分布式锁自旋锁mq全局串行执行【通过】
使用线程池控制会导致请求积压到线程池消耗cpu和内存资源使用mq能有效削峰限流减小服务器资源消耗线上部署了两个节点即并发为2 消费者代码 /*** * spring.rabbitmq.listener.order.queue.namequeue-2spring.rabbitmq.listener.order.queue.durabletruespring.rabbitmq.listener.order.exchange.nameexchange-1spring.rabbitmq.listener.order.exchange.durabletruespring.rabbitmq.listener.order.exchange.typetopicspring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptionstruespring.rabbitmq.listener.order.keyspringboot.** param order* param channel* param headers* throws Exception*/RabbitListener(bindings QueueBinding(value Queue(value ${spring.rabbitmq.listener.order.queue.name}, durable${spring.rabbitmq.listener.order.queue.durable}),exchange Exchange(value ${spring.rabbitmq.listener.order.exchange.name}, durable${spring.rabbitmq.listener.order.exchange.durable}, type ${spring.rabbitmq.listener.order.exchange.type}, ignoreDeclarationExceptions ${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}),key ${spring.rabbitmq.listener.order.key}))RabbitHandlerpublic void onOrderMessage(Payload com.bfxy.springboot.entity.Order order, Channel channel,Headers MapString, Object headers) throws Exception {//System.err.println(--------------------------------------);//System.err.println(消费端order: order.getId());Long deliveryTag (Long)headers.get(AmqpHeaders.DELIVERY_TAG);onLockOrderMessage(order);//手工ACKchannel.basicAck(deliveryTag, false);}AutowiredRedissonClient redissonClient;private void onLockOrderMessage(com.bfxy.springboot.entity.Order order) {RLock lock redissonClient.getLock(my-lock);boolean locked false;try {while (!locked) {locked lock.tryLock();if (locked) {try {long startTime System.currentTimeMillis();String id order.getId();LOGGER.info(订单序号{},订单执行开始时间{},id,startTime);Thread.sleep(7000);long endtime System.currentTimeMillis();LOGGER.info(订单序号{},订单执行结束时间{},id,startTime);LOGGER.info(订单序号{},订单执行消耗时间{},id,(endtime-startTime));System.out.println(----------------);} finally {lock.unlock();}} else {// 未获取到锁可以进行一些等待操作比如休眠一段时间后再尝试获取锁Thread.sleep(100);}}} catch (Exception e) {e.printStackTrace();}}生产者代码 Testpublic void testSender3() throws Exception {for (int i 1;i50;i){int finalI i;CompletableFuture.runAsync(()-{Order order new Order(String.valueOf(finalI), 第finalI个订单);rabbitSender.sendOrder(order);});System.err.println(发送消息订单:finalI);if (i%50){Thread.sleep(1000);}}}启动两个消费者【验证全局串行同一时间只有一个业务执行】
记录消费日志验证是否串行
通过日志可知单个消费者消费顺序执行验证消费者1和2直接业务串行
消费者215:18:22 到 15:18:50 之间没有接收到消息【串行执行】验证消费115:18:22 到 15:18:50时间段消息情况【串行执行】
业务幂等性保障测试【通过】
mq接收到消息会将消息中的uid放入redis当重复消费时会进行判断保障业务幂等性 Testpublic void time() {// 获取字符串对象String key myKey;String value Hello, Redis!;RBucketString bucket redissonClient.getBucket(key);bucket.set(value, 30, TimeUnit.SECONDS); // 设置失效时间为10秒}幂等逻辑
// 判断key是否存在
if(bucket.isExists()){LOGGER.error(重复消费id{},id);// 重复消息不执行业务逻辑跳出直接ackbreak;
}else {marker(id);
}重复消费情况进入断点表示重复执行break会跳过业务代码 rabbitmq配置生效测试 原项目配置【自动ack测试】-【通过】
测试自动ack是否生效
package com.bfxy.springboot.config;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;Configuration
public class RabbitMQConfig {Value(${spring.rabbitmq.host})private String addresses;Value(${spring.rabbitmq.port})private String port;Value(${spring.rabbitmq.username})private String username;Value(${spring.rabbitmq.password})private String password;Value(${spring.rabbitmq.virtual-host})private String virtualHost;Value(${spring.rabbitmq.publisher-confirms})private boolean publisherConfirms;Bean/** 因为要设置回调类所以应是prototype类型如果是singleton类型则回调类为最后一次设置 */// Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory());return rabbitTemplate;}Beanpublic RabbitTemplate manualAckRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 配置手动ACKrabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setAcknowledgeMode(AcknowledgeMode.MANUAL);return rabbitTemplate;}Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory new CachingConnectionFactory();connectionFactory.setAddresses(addresses : port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);/** 如果要进行消息回调则这里必须要设置为true */connectionFactory.setPublisherConfirms(publisherConfirms);return connectionFactory;}Bean(mqContainerFactory)Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}}注释手动ack
spring.rabbitmq.addresses127.0.0.1:5672
spring.rabbitmq.usernameadmin
spring.rabbitmq.passwordadmin
spring.rabbitmq.virtual-hosttotal-api
spring.rabbitmq.connection-timeout15000发送消息没ack前控制台信息等待一会自动ack的消息从rabbitmq中删除了 新增配置【手动ack测试】-【通过】
rabbitmq如何实现接受指定的消息要手动ack其他消息自动ack
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQConfig {Beanpublic RabbitListenerContainerFactorySimpleMessageListenerContainer manualAckListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}Beanpublic RabbitListenerContainerFactorySimpleMessageListenerContainer autoAckListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQConfig {Beanpublic RabbitListenerContainerFactorySimpleMessageListenerContainer manualAckListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}Beanpublic RabbitListenerContainerFactorySimpleMessageListenerContainer autoAckListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}
}
消费者代码指定手动ack并注释手动ack查看rabbitmq中消息是否被删除预期消息不会删除放开手动ack注释再次测试
兜底保证方案
消息处理可能失败处理失败的消息记录到broker_message_log表中
-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS broker_message_log (message_id varchar(128) NOT NULL, -- 消息唯一IDmessage varchar(4000) DEFAULT NULL, -- 消息内容try_count int(4) DEFAULT 0, -- 重试次数status varchar(10) DEFAULT , -- 消息投递状态 0 投递中 1 投递成功 2 投递失败next_retry timestamp NOT NULL DEFAULT 0000-00-00 00:00:00, --下一次重试时间 或 超时时间create_time timestamp NOT NULL DEFAULT 0000-00-00 00:00:00, --创建时间update_time timestamp NOT NULL DEFAULT 0000-00-00 00:00:00, --更新时间PRIMARY KEY (message_id)
) ENGINEInnoDB DEFAULT CHARSETutf8;通过定时任务重新执行失败的消息执行点设计
定时任务重目标业务方法该方式要将业务封装某个class的某个方法中失败时会录入表中发送mq在消费中执行