模板式自助建站,网站建设宁夏凤凰云,西安到北京火车时刻表查询,磁力蜘蛛种子搜索一. 重复消息为什么会出现消息重复#xff1f;消息重复的原因有两个#xff1a;1.生产时消息重复#xff0c;2.消费时消息重复。1.1 生产时消息重复由于生产者发送消息给MQ#xff0c;在MQ确认的时候出现了网络波动#xff0c;生产者没有收到确认#xff0c;实际上MQ已经…一. 重复消息为什么会出现消息重复消息重复的原因有两个1.生产时消息重复2.消费时消息重复。1.1 生产时消息重复由于生产者发送消息给MQ在MQ确认的时候出现了网络波动生产者没有收到确认实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。生产者中如果消息未被确认或确认失败我们可以使用定时任务(redis/db)来进行消息重试。ComponentSlf4Jpublic class SendMessage { Autowired private MessageService messageService; Autowired private RabbitTemplate rabbitTemplate; // 最大投递次数 private static final int MAX_TRY_COUNT 3; /** * 每30s拉取投递失败的消息, 重新投递 */ Scheduled(cron 0/30 * * * * ?) public void resend() { log.info(开始执行定时任务(重新投递消息)); List msgLogs messageService.selectTimeoutMsg(); msgLogs.forEach(msgLog - { String msgId msgLog.getMsgId(); if (msgLog.getTryCount() MAX_TRY_COUNT) { messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL); log.info(超过最大重试次数, 消息投递失败, msgId: {}, msgId); } else { messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数1 CorrelationData correlationData new CorrelationData(msgId); rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投递 log.info(第 (msgLog.getTryCount() 1) 次重新投递消息); } }); log.info(定时任务执行结束(重新投递消息)); }}1.2 消费时消息重复消费者消费成功后再给MQ确认的时候出现了网络波动MQ没有接收到确认为了保证消息被消费MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。修改消费者模拟异常RabbitListener(queuesToDeclare Queue(value javatrip, durable true))public void receive(String message, Headers Map headers, Channel channel) throws Exception{ System.out.println(重试System.currentTimeMillis()); System.out.println(message); int i 1 / 0;}配置yml重试策略spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者进行重试 max-attempts: 5 # 最大重试次数 initial-interval: 3000 # 重试时间间隔由于重复消息是由于网络原因造成的因此不可避免重复消息。但是我们需要保证消息的幂等性。二. 如何保证消息幂等性让每个消息携带一个全局的唯一ID即可保证消息的幂等性具体消费过程为消费者获取到消息后先根据id去查询redis/db是否存在该消息如果不存在则正常消费消费完毕后写入redis/db如果存在则证明消息被消费过直接丢弃。生产者PostMapping(/send)public void sendMessage(){ JSONObject jsonObject new JSONObject(); jsonObject.put(message,Java旅途); String json jsonObject.toJSONString(); Message message MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding(UTF-8).setMessageId(UUID.randomUUID()).build(); amqpTemplate.convertAndSend(javatrip,message);}消费者ComponentRabbitListener(queuesToDeclare Queue(value javatrip, durable true))public class Consumer { RabbitHandler public void receiveMessage(Message message) throws Exception { Jedis jedis new Jedis(localhost, 6379); String messageId message.getMessageProperties().getMessageId(); String msg new String(message.getBody(),UTF-8); System.out.println(接收到的消息为msg消息id为messageId); String messageIdRedis jedis.get(messageId); if(messageId messageIdRedis){ return; } JSONObject jsonObject JSONObject.parseObject(msg); String email jsonObject.getString(message); jedis.set(messageId,messageId); }}如果需要存入db的话可以直接将这个ID设为消息的主键下次如果获取到重复消息进行消费时由于数据库主键的唯一性则会直接抛出异常。