百度指数查询官方下载,厦门seo排名收费,互联网行业都有哪些工作赚钱,毕设做系统与网站这里主要对比#xff1a;Kafka、RocketMQ、RabbitMQ 介绍一下消息生产、存储、消费三者的架构形式。
消息丢失可能存在的场景#xff1a;
情况一#xff1a; 生产者发送给MQ的过程消息丢失 在写消息的过程中因为网络的原因#xff0c;还没到mq消息就丢失了#xff1b;或…这里主要对比Kafka、RocketMQ、RabbitMQ 介绍一下消息生产、存储、消费三者的架构形式。
消息丢失可能存在的场景
情况一 生产者发送给MQ的过程消息丢失 在写消息的过程中因为网络的原因还没到mq消息就丢失了或者是到了mq还没返回确认收到消息消息丢失了怎么处理 情况二 Broker存储消息丢失 mq的master节点在接受到消息后还没来得及同步给slave上面的follower副本master节点直接宕机了消息还在内存中丢失消息了怎么处理 情况三 消费者拉取消息后消息丢 消费者拉取到了消息但是还没来得及消费消费者实例挂掉了mq以为这条消息已经消费了怎么处理 针对“情况一”的解决思路
方法1 AMQP很多都支持事务机制那是不是说我们可以生产者采用同步模式给mq发送消息的过程打开事务机制发送一条mq接受到后返回确认信息后再发送下一条消息这样肯定行不通的会使生产者的吞吐量直线下降。 方法2 对于 RabbitMQ 它提供了一个异步confirm机制发完了生产者就不用管了rabbitmq接收到消息会回调生产者的本地一个接口告诉你消息接收成功了如果rabbitmq在接收消息是报错了就会回调这个接口告诉你这个消息接收失败了可以重试发送了不会阻塞下一条消息的发送。 对于RocketMQ它提供了一个异步接收模式 Async APIProducer只需要把消息发送给mq就可以。mq处理完成之后调用producer定义好的回调函数就可以确认是否收到对应messageID的回调确认消息没有收到就触发retry机制。 对于Kafka它也提供了异步async处理模式send()方法会返回Futrue对象通过调用Futrue对象的get()方法等待直到结果返回。 针对“情况二”MQ把消息弄丢了的解决思路 1、必须开启持久化就是消息写入mq后就会刷盘就算mq自己挂掉了恢复之后也会读取存储的数据一般不会丢失除非还没来得及持久化就挂掉了这种情况很少概率极低。 2、配合producer的confirm机制只有消息被持久化或者大多数副本同步完成后才返回ack 3、kafka消费者丢失消息跟rabbitmq处理机制是一样的如kafka broker自己把消息给丢失了生产者将消息发送给partion leader但是接收完leader就宕机了还没同步给follower而马上另外的partion follower被选举成了leader这一块数据丢失了怎么处理必须将kafka的副本参数设置大于1就是说至少有2个副本kafka还需要设置副本leader至少感知不到一个follower挂掉了才算leader挂掉了才能重新选举broker副本的follower全部同步成功时才能算生产者写入消息队列写入成功【这样可以保证一条不丢】。 针对“情况三”消费者拉取消息后消息丢失的解决思路 1、消费者丢失消息一般是消费者打开了autoAck的机制自动提交但还在处理中消费者系统宕机了mq以为消费完了造成了消费者丢失消息怎么处理 解决办法关闭autoAck每次处理完了再提交ack/offset如果还没处理完就宕机了长时间收不到ackmq也会重新将这条消息分配给其他的消费者去处理。但是此时你需要自己保证消息的幂等性因为如果消费者将消息已经消费了但是返回给mq的确认信息失败了mq以为你没消费它会把消息发送给给另外的消费者或者等该实例恢复后再次发送引起消息的重复消费此时你可以将messageID做唯一键处理。 如果保证消息一条不丢必须关闭消息过期清理的机制否则消息积压会触发消息清理的机制也会造成消息丢失。