成都市建网站公司,网站备案认领,网站没内容,数据库网站开发价格文章目录2. 消息队列2.1 MQ有什么用#xff1f;2.2 说一说生产者与消费者模式2.3 消息队列如何保证顺序消费#xff1f;2.4 消息队列如何保证消息不丢#xff1f;2.5 消息队列如何保证不重复消费#xff1f;2.6 MQ处理消息失败了怎么办#xff1f;2.7 请介绍消息队列推和拉…
文章目录2. 消息队列2.1 MQ有什么用2.2 说一说生产者与消费者模式2.3 消息队列如何保证顺序消费2.4 消息队列如何保证消息不丢2.5 消息队列如何保证不重复消费2.6 MQ处理消息失败了怎么办2.7 请介绍消息队列推和拉的使用场景2.8 RabbitMQ和Kafka有什么区别2.9 Kafka为什么速度快2.10 RabbitMQ如何保证消息已达2. 消息队列
2.1 MQ有什么用
参考答案
消息队列有很多使用场景比较常见的有3个解耦、异步、削峰。
解耦传统的软件开发模式各个模块之间相互调用数据共享每个模块都要时刻关注其他模块的是否更改或者是否挂掉等等使用消息队列可以避免模块之间直接调用将所需共享的数据放在消息队列中对于新增业务模块只要对该类消息感兴趣即可订阅该类消息对原有系统和业务没有任何影响降低了系统各个模块的耦合度提高了系统的可扩展性。异步消息队列提供了异步处理机制在很多时候应用不想也不需要立即处理消息允许应用把一些消息放入消息中间件中并不立即处理它在之后需要的时候再慢慢处理。削峰在访问量骤增的场景下需要保证应用系统的平稳性但是这样突发流量并不常见如果以这类峰值的标准而投放资源的话那无疑是巨大的浪费。使用消息队列能够使关键组件支撑突发访问压力不会因为突发的超负荷请求而完全崩溃。消息队列的容量可以配置的很大如果采用磁盘存储消息则几乎等于“无限”容量这样一来高峰期的消息可以被积压起来在随后的时间内进行平滑的处理完成而不至于让系统短时间内无法承载而导致崩溃。在电商网站的秒杀抢购这种突发性流量很强的业务场景中消息队列的强大缓冲能力可以很好的起到削峰作用。
2.2 说一说生产者与消费者模式
参考答案
所谓生产者-消费者问题实际上主要是包含了两类线程。一种是生产者线程用于生产数据另一种是消费者线程用于消费数据为了解耦生产者和消费者的关系通常会采用共享的数据区域就像是一个仓库。生产者生产数据之后直接放置在共享数据区中并不需要关心消费者的行为。而消费者只需要从共享数据区中去获取数据就不再需要关心生产者的行为。但是这个共享数据区域中应该具备这样的线程间并发协作的功能
如果共享数据区已满的话阻塞生产者继续生产数据放置入内如果共享数据区为空的话阻塞消费者继续消费数据。
在Java语言中实现生产者消费者问题时可以采用三种方式
使用 Object 的 wait/notify 的消息通知机制使用 Lock 的 Condition 的 await/signal 的消息通知机制使用 BlockingQueue 实现。
2.3 消息队列如何保证顺序消费
参考答案
在生产中经常会有一些类似报表系统这样的系统需要做 MySQL 的 binlog 同步。比如订单系统要同步订单表的数据到大数据部门的 MySQL 库中用于报表统计分析通常的做法是基于 Canal 这样的中间件去监听订单数据库的 binlog然后把这些 binlog 发送到 MQ 中再由消费者从 MQ 中获取 binlog 落地到大数据部门的 MySQL 中。
在这个过程中可能会有对某个订单的增删改操作比如有三条 binlog 执行顺序是增加、修改、删除。消费者愣是换了顺序给执行成删除、修改、增加这样能行吗肯定是不行的。不同的消息队列产品产生消息错乱的原因以及解决方案是不同的。下面我们以RabbitMQ、Kafka、RocketMQ为例来说明保证顺序消费的办法。
RabbitMQ
对于 RabbitMQ 来说导致上面顺序错乱的原因通常是消费者是集群部署不同的消费者消费到了同一订单的不同的消息。如消费者A执行了增加消费者B执行了修改消费者C执行了删除但是消费者C执行比消费者B快消费者B又比消费者A快就会导致消费 binlog 执行到数据库的时候顺序错乱本该顺序是增加、修改、删除变成了删除、修改、增加。如下图 RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中多个消费者都消费同一个 queue 的消息。解决这个问题我们可以给 RabbitMQ 创建多个 queue每个消费者固定消费一个 queue 的消息生产者发送消息的时候同一个订单号的消息发送到同一个 queue 中由于同一个 queue 的消息是一定会保证有序的那么同一个订单号的消息就只会被一个消费者顺序消费从而保证了消息的顺序性。如下图 Kafka
对于 Kafka 来说一个 topic 下同一个 partition 中的消息肯定是有序的生产者在写的时候可以指定一个 key通过我们会用订单号作为 key这个 key 对应的消息都会发送到同一个 partition 中所以消费者消费到的消息也一定是有序的。
那么为什么 Kafka 还会存在消息错乱的问题呢问题就出在消费者身上。通常我们消费到同一个 key 的多条消息后会使用多线程技术去并发处理来提高消息处理速度否则一条消息的处理需要耗时几十 毫秒1 秒也就只能处理几十条消息吞吐量就太低了。而多线程并发处理的话binlog 执行到数据库的时候就不一定还是原来的顺序了。如下图 Kafka 从生产者到消费者消费消息这一整个过程其实都是可以保证有序的导致最终乱序是由于消费者端需要使用多线程并发处理消息来提高吞吐量比如消费者消费到了消息以后开启 32 个线程处理消息每个线程线程处理消息的快慢是不一致的所以才会导致最终消息有可能不一致。
所以对于 Kafka 的消息顺序性保证其实我们只需要保证同一个订单号的消息只被同一个线程处理的就可以了。由此我们可以在线程处理前增加个内存队列每个线程只负责处理其中一个内存队列的消息同一个订单号的消息发送到同一个内存队列中即可。如下图 RocketMQ
对于 RocketMQ 来说每个 Topic 可以指定多个 MessageQueue当我们写入消息的时候会把消息均匀地分发到不同的 MessageQueue 中比如同一个订单号的消息增加 binlog 写入到 MessageQueue1 中修改 binlog 写入到 MessageQueue2 中删除 binlog 写入到 MessageQueue3 中。
但是当消费者有多台机器的时候会组成一个 Consumer GroupConsumer Group 中的每台机器都会负责消费一部分 MessageQueue 的消息所以可能消费者A消费了 MessageQueue1 的消息执行增加操作消费者B消费了 MessageQueue2 的消息执行修改操作消费者C消费了 MessageQueue3 的消息执行删除操作但是此时消费 binlog 执行到数据库的时候就不一定是消费者A先执行了有可能消费者C先执行删除操作因为几台消费者是并行执行是不能够保证他们之间的执行顺序的。如下图 RocketMQ 的消息乱序是由于同一个订单号的 binlog 进入了不同的 MessageQueue进而导致一个订单的 binlog 被不同机器上的 Consumer 处理。
要解决 RocketMQ 的乱序问题我们只需要想办法让同一个订单的 binlog 进入到同一个 MessageQueue 中就可以了。因为同一个 MessageQueue 内的消息是一定有序的一个 MessageQueue 中的消息只能交给一个 Consumer 来进行处理所以 Consumer 消费的时候就一定会是有序的。 2.4 消息队列如何保证消息不丢
参考答案
丢数据一般分为两种一种是mq把消息丢了一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下丢失数据的场景。
RabbitMQ
RabbitMQ丢失消息分为如下几种情况 生产者丢消息 生产者将数据发送到RabbitMQ的时候可能在传输过程中因为网络等问题而将数据弄丢了。 RabbitMQ自己丢消息 如果没有开启RabbitMQ的持久化那么RabbitMQ一旦重启数据就丢了。所以必须开启持久化将消息持久化到磁盘这样就算RabbitMQ挂了恢复之后会自动读取之前存储的数据一般数据不会丢失。除非极其罕见的情况RabbitMQ还没来得及持久化自己就挂了这样可能导致一部分数据丢失。 消费端丢消息 主要是因为消费者消费时刚消费到还没有处理结果消费者就挂了这样你重启之后RabbitMQ就认为你已经消费过了然后就丢了数据。
针对上述三种情况RabbitMQ可以采用如下方式避免消息丢失 生产者丢消息 可以选择使用RabbitMQ提供是事务功能就是生产者在发送数据之前开启事务然后发送消息如果消息没有成功被RabbitMQ接收到那么生产者会受到异常报错这时就可以回滚事务然后尝试重新发送。如果收到了消息那么就可以提交事务。这种方式有明显的缺点即RabbitMQ事务开启后就会变为同步阻塞操作生产者会阻塞等待是否发送成功太耗性能会造成吞吐量的下降。可以开启confirm模式。在生产者那里设置开启了confirm模式之后每次写的消息都会分配一个唯一的id然后如何写入了RabbitMQ之中RabbitMQ会给你回传一个ack消息告诉你这个消息发送OK了。如果RabbitMQ没能处理这个消息会回调你一个nack接口告诉你这个消息失败了你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id如果超过一定时间还没接收到这个消息的回调那么你可以进行重发。 事务机制是同步的你提交了一个事物之后会阻塞住但是confirm机制是异步的发送消息之后可以接着发送下一个消息然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失都是用confirm机制。 RabbitMQ自己丢消息 设置消息持久化到磁盘设置持久化有两个步骤 创建queue的时候将其设置为持久化的这样就可以保证RabbitMQ持久化queue的元数据但是不会持久化queue里面的数据。发送消息的时候讲消息的deliveryMode设置为2这样消息就会被设为持久化方式此时RabbitMQ就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。 而且持久化可以跟生产的confirm机制配合起来只有消息持久化到了磁盘之后才会通知生产者ack这样就算是在持久化之前RabbitMQ挂了数据丢了生产者收不到ack回调也会进行消息重发。 消费端丢消息 使用RabbitMQ提供的ack机制首先关闭RabbitMQ的自动ack然后每次在确保处理完这个消息之后在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。
Kafka
Kafka丢失消息分为如下几种情况 生产者丢消息 生产者没有设置相应的策略发送过程中丢失数据。 Kafka自己丢消息 比较常见的一个场景就是Kafka的某个broker宕机了然后重新选举partition的leader时。如果此时follower还没来得及同步数据leader就挂了然后某个follower成为了leader它就少了一部分数据。 消费端丢消息 消费者消费到了这个数据然后消费之自动提交了offset让Kafka知道你已经消费了这个消息当你准备处理这个消息时自己挂掉了那么这条消息就丢了。
针对上述三种情况Kafka可以采用如下方式避免消息丢失 生产者丢消息 关闭自动提交offset在自己处理完毕之后手动提交offset这样就不会丢失数据。 Kafka自己丢消息 一般要求设置4个参数来保证消息不丢失 给topic设置 replication.factor 参数这个值必须大于1表示要求每个partition必须至少有2个副本。在kafka服务端设置 min.isync.replicas 参数这个值必须大于1表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据这样才能保证leader挂了之后还有一个follower。在生产者端设置 acksall 表示 要求每条每条数据必须是写入所有replica副本之后才能认为是写入成功了。在生产者端设置 retriesMAX (很大的一个值)表示这个是要求一旦写入事变就无限重试。 消费端丢消息 如果按照上面设置了ackall则一定不会丢失数据要求是你的leader接收到消息所有的follower都同步到了消息之后才认为本次写成功了。如果没满足这个条件生产者会自动不断的重试重试无限次。 数据大数据量的数据处理上。 架构模型方面 RabbitMQ以broker为中心有消息的确认机制。 Kafka以consumer为中心没有消息的确认机制。 吞吐量方面 RabbitMQ支持消息的可靠的传递支持事务不支持批量操作基于存储的可靠性的要求存储可以采用内存或硬盘吞吐量小。 Kafka内部采用消息的批量处理数据的存储和获取是本地磁盘顺序批量操作消息处理的效率高吞吐量高。 集群负载均衡方面 RabbitMQ本身不支持负载均衡需要loadbalancer的支持。 Kafka采用zookeeper对集群中的brokerconsumer进行管理可以注册topic到zookeeper上通过zookeeper的协调机制producer保存对应的topic的broker信息可以随机或者轮询发送到broker上producer可以基于语义指定分片消息发送到broker的某个分片上。
2.5 消息队列如何保证不重复消费
参考答案
先大概说一说可能会有哪些重复消费的问题。首先就是比如rabbitmq、rocketmq、kafka都有可能会出现消费重复消费的问题正常。因为这问题通常不是mq自己保证的是给你保证的。然后我们挑一个kafka来举个例子说说怎么重复消费吧。
kafka实际上有个offset的概念就是每个消息写进去都有一个offset代表他的序号然后consumer消费了数据之后每隔一段时间会把自己消费过的消息的offset提交一下代表我已经消费过了下次我要是重启啥的你就让我继续从上次消费到的offset来继续消费吧。
但是凡事总有意外比如我们之前生产经常遇到的就是你有时候重启系统看你怎么重启了如果碰到点着急的直接kill进程了再重启。这会导致consumer有些消息处理了但是没来得及提交offset尴尬了。重启之后少数消息会再次消费一次。
其实重复消费不可怕可怕的是你没考虑到重复消费之后怎么保证幂等性。举个例子,假设你有个系统消费一条往数据库里插入一条要是你一个消息重复两次你不就插入了两条这数据不就错了但是你要是消费到第二次的时候自己判断一下已经消费过了直接扔了不就保留了一条数据
一条数据重复出现两次数据库里就只有一条数据这就保证了系统的幂等性幂等性。通俗点说就一个数据或者一个请求给你重复来多次你得确保对应的数据是不会改变的不能出错。
想要保证不重复消费其实还要结合业务来思考这里给几个思路
比如你拿个数据要写库你先根据主键查一下如果这数据都有了你就别插入了update一下。比如你是写redis那没问题了反正每次都是set天然幂等性。比如你不是上面两个场景那做的稍微复杂一点你需要让生产者发送每条数据的时候里面加一个全局唯一的id类似订单id之类的东西然后你这里消费到了之后先根据这个id去比如redis里查一下之前消费过吗如果没有消费过你就处理然后这个id写redis。如果消费过了那你就别处理了保证别重复处理相同的消息即可。
还有比如基于数据库的唯一键来保证重复数据不会重复插入多条我们之前线上系统就有这个问题就是拿到数据的时候每次重启可能会有重复因为kafka消费者还没来得及提交offset重复数据拿到了以后我们插入的时候因为有唯一键约束了所以重复数据只会插入报错不会导致数据库中出现脏数据。
2.6 MQ处理消息失败了怎么办
参考答案
一般生产环境中都会在使用MQ的时候设计两个队列一个是核心业务队列一个是死信队列。核心业务队列就是比如专门用来让订单系统发送订单消息的然后另外一个死信队列就是用来处理异常情况的。
比如说要是第三方物流系统故障了此时无法请求那么仓储系统每次消费到一条订单消息尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败注意这个步骤很重要。
一旦标志这条消息处理失败了之后MQ就会把这条消息转入提前设置好的一个死信队列中。然后你会看到的就是在第三方物流系统故障期间所有订单消息全部处理失败全部会转入死信队列。然后你的仓储系统得专门有一个后台线程监控第三方物流系统是否正常能否请求的不停的监视。一旦发现对方恢复正常这个后台线程就从死信队列消费出来处理失败的订单重新执行发货和配送的通知逻辑。死信队列的使用其实就是MQ在生产实践中非常重要的一环也就是架构设计必须要考虑的。
整个过程如下图所示 2.7 请介绍消息队列推和拉的使用场景
参考答案
推模式
推模式是服务器端根据用户需要由目的、按时将用户感兴趣的信息主动发送到用户的客户端。
优点
对用户要求低方便用户获取需要的信息及时性好服务器端及时地向客户端推送更新动态信息吞吐量大。
缺点
不能确保发送成功推模式采用广播方式只有服务器端和客户端在同一个频道上推模式才有效用户才能接收到信息没有信息状态跟踪推模式采用开环控制技术一个信息推送后的状态比如客户端是否接收等无从得知针对性较差。推送的信息可能并不能满足客户端的个性化需求。
拉模式
拉模式是客户端主动从服务器端获取信息。
优点
针对性强能满足客户端的个性化需求信息传输量较小网络中传输的只是客户端的请求和服务器端对该请求的响应服务器端的任务轻。服务器端只是被动接收查询对客户端的查询请求做出响应。
缺点
实时性较差针对于服务器端实时更新的信息客户端难以获取实时信息对于客户端用户的要求较高需要对服务器端具有一定的了解。
2.8 RabbitMQ和Kafka有什么区别
参考答案
在实际生产应用中通常会使用Kafka作为消息传输的数据管道RabbitMQ作为交易数据作为数据传输管道主要的取舍因素则是是否存在丢数据的可能。RabbitMQ在金融场景中经常使用具有较高的严谨性数据丢失的可能性更小同事具备更高的实时性。而Kafka优势主要体现在吞吐量上虽然可以通过策略实现数据不丢失但从严谨性角度来讲大不如RabbitMQ。而且由于Kafka保证每条消息最少送达一次有较小的概率会出现数据重复发送的情况。详细来说它们之间主要有如下的区别 应用场景方面 RabbitMQ用于实时的对可靠性要求较高的消息传递上。 Kafka用于处于活跃的流式数据大数据量的数据处理上。 架构模型方面 RabbitMQ以broker为中心有消息的确认机制。 Kafka以consumer为中心没有消息的确认机制。 吞吐量方面 RabbitMQ支持消息的可靠的传递支持事务不支持批量操作基于存储的可靠性的要求存储可以采用内存或硬盘吞吐量小。 Kafka内部采用消息的批量处理数据的存储和获取是本地磁盘顺序批量操作消息处理的效率高吞吐量高。 集群负载均衡方面 RabbitMQ本身不支持负载均衡需要loadbalancer的支持。 Kafka采用zookeeper对集群中的brokerconsumer进行管理可以注册topic到zookeeper上通过zookeeper的协调机制producer保存对应的topic的broker信息可以随机或者轮询发送到broker上producer可以基于语义指定分片消息发送到broker的某个分片上。
2.9 Kafka为什么速度快
参考答案
Kafka的消息是保存或缓存在磁盘上的一般认为在磁盘上读写数据是会降低性能的因为寻址会比较消耗时间但是实际上Kafka的特性之一就是高吞吐率。即使是普通的服务器Kafka也可以轻松支持每秒百万级的写入请求超过了大部分的消息中间件这种特性也使得Kafka在日志处理等海量数据场景广泛应用。
下面从数据写入和读取两方面分析为什么Kafka速度这么快
写入数据
Kafka会把收到的消息都写入到硬盘中它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术顺序写入和MMFile 。
一、顺序写入
磁盘读写的快慢取决于你怎么使用它也就是顺序读写或者随机读写。在顺序读写的情况下磁盘的顺序读写速度和内存持平。因为硬盘是机械结构每次读写都会寻址-写入其中寻址是一个“机械动作”它是最耗时的。所以硬盘最讨厌随机I/O最喜欢顺序I/O。为了提高读写硬盘的速度Kafka就是使用顺序I/O。
而且Linux对于磁盘的读写优化也比较多包括read-ahead和write-behind磁盘缓存等。如果在内存做这些操作的时候一个是JAVA对象的内存开销很大另一个是随着堆内存数据的增多JAVA的GC时间会变得很长使用磁盘操作有以下几个好处
磁盘顺序读写速度超过内存随机读写JVM的GC效率低内存占用大。使用磁盘可以避免这一问题系统冷启动后磁盘缓存依然可用。
下图就展示了Kafka是如何写入数据的 每一个Partition其实都是一个文件 收到消息后Kafka会把数据插入到文件末尾虚框部分 这种方法有一个缺陷——没有办法删除数据 所以Kafka是不会删除数据的它会把所有的数据都保留下来每个消费者Consumer对每个Topic都有一个offset用来表示读取到了第几条数据 。 二、Memory Mapped Files
即便是顺序写入硬盘硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘 它充分利用了现代操作系统分页存储来利用内存提高I/O效率。Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件在64位操作系统中一般可以表示20G的数据文件它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上操作系统在适当的时候。
通过mmap进程像读写硬盘一样读写内存当然是虚拟机内存也不必关心内存的大小有虚拟内存为我们兜底。使用这种方式可以获取很大的I/O提升省去了用户空间到内核空间复制的开销调用文件的read会把数据先放到内核空间的内存中然后再复制到用户空间的内存中。
但也有一个很明显的缺陷——不可靠写到mmap中的数据并没有被真正的写到硬盘操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync)写入mmap之后立即返回Producer不调用flush叫异步 (async)。
读取数据
一、基于sendfile实现Zero Copy
传统模式下当需要对一个文件进行传输的时候其具体流程细节如下
调用read函数文件数据被copy到内核缓冲区read函数返回文件数据从内核缓冲区copy到用户缓冲区write函数调用将文件数据从用户缓冲区copy到内核与socket相关的缓冲区数据从socket缓冲区copy到相关协议引擎。
以上细节是传统read/write方式进行网络文件传输的方式我们可以看到在这个过程当中文件数据实际上是经过了四次copy操作硬盘-内核buf-用户buf-socket相关缓冲区-协议引擎。而sendfile系统调用则提供了一种减少以上多次copy提升文件传输性能的方法。
在内核版本2.1中引入了sendfile系统调用以简化网络上和两个本地文件之间的数据传输。sendfile的引入不仅减少了数据复制还减少了上下文切换。运行流程如下
sendfile系统调用文件数据被copy至内核缓冲区再从内核缓冲区copy至内核中socket相关的缓冲区最后再socket相关的缓冲区copy到协议引擎。
相较传统read/write方式2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区再由user缓冲区到socket相关缓冲区的文件copy而在内核版本2.4之后文件描述符结果被改变sendfile实现了更简单的方式再次减少了一次copy操作。
在Apache、Nginx、lighttpd等web服务器当中都有一项sendfile相关的配置使用sendfile可以大幅提升文件传输性能。Kafka把所有的消息都存放在一个一个的文件中当消费者需要数据的时候Kafka直接把文件发送给消费者配合mmap作为文件读写方式直接把它传给sendfile。
二、批量压缩
在很多情况下系统的瓶颈不是CPU或磁盘而是网络IO对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
如果每个消息都压缩但是压缩率相对很低所以Kafka使用了批量压缩即将多个消息一起压缩而不是单个消息压缩Kafka允许使用递归的消息集合批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式直到被消费者解压缩Kafka支持多种压缩协议包括Gzip和Snappy压缩协议。
总结
Kafka速度的秘诀在于它把所有的消息都变成一个批量的文件并且进行合理的批量压缩减少网络IO损耗通过mmap提高I/O速度写入数据的时候由于单个Partion是末尾添加所以速度最优。读取数据的时候配合sendfile直接暴力输出。
2.10 RabbitMQ如何保证消息已达
参考答案
RabbitMQ可能丢失消息分为如下几种情况 生产者丢消息 生产者将数据发送到RabbitMQ的时候可能在传输过程中因为网络等问题而将数据弄丢了。 RabbitMQ自己丢消息 如果没有开启RabbitMQ的持久化那么RabbitMQ一旦重启数据就丢了。所以必须开启持久化将消息持久化到磁盘这样就算RabbitMQ挂了恢复之后会自动读取之前存储的数据一般数据不会丢失。除非极其罕见的情况RabbitMQ还没来得及持久化自己就挂了这样可能导致一部分数据丢失。 消费端丢消息 主要是因为消费者消费时刚消费到还没有处理结果消费者就挂了这样你重启之后RabbitMQ就认为你已经消费过了然后就丢了数据。
针对上述三种情况RabbitMQ可以采用如下方式避免消息丢失 生产者丢消息 可以选择使用RabbitMQ提供是事务功能就是生产者在发送数据之前开启事务然后发送消息如果消息没有成功被RabbitMQ接收到那么生产者会受到异常报错这时就可以回滚事务然后尝试重新发送。如果收到了消息那么就可以提交事务。这种方式有明显的缺点即RabbitMQ事务开启后就会变为同步阻塞操作生产者会阻塞等待是否发送成功太耗性能会造成吞吐量的下降。可以开启confirm模式。在生产者那里设置开启了confirm模式之后每次写的消息都会分配一个唯一的id然后如何写入了RabbitMQ之中RabbitMQ会给你回传一个ack消息告诉你这个消息发送OK了。如果RabbitMQ没能处理这个消息会回调你一个nack接口告诉你这个消息失败了你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id如果超过一定时间还没接收到这个消息的回调那么你可以进行重发。 事务机制是同步的你提交了一个事物之后会阻塞住但是confirm机制是异步的发送消息之后可以接着发送下一个消息然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失都是用confirm机制。 RabbitMQ自己丢消息 设置消息持久化到磁盘设置持久化有两个步骤 创建queue的时候将其设置为持久化的这样就可以保证RabbitMQ持久化queue的元数据但是不会持久化queue里面的数据。发送消息的时候讲消息的deliveryMode设置为2这样消息就会被设为持久化方式此时RabbitMQ就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。 而且持久化可以跟生产的confirm机制配合起来只有消息持久化到了磁盘之后才会通知生产者ack这样就算是在持久化之前RabbitMQ挂了数据丢了生产者收不到ack回调也会进行消息重发。 消费端丢消息
息然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失都是用confirm机制。 RabbitMQ自己丢消息 设置消息持久化到磁盘设置持久化有两个步骤 创建queue的时候将其设置为持久化的这样就可以保证RabbitMQ持久化queue的元数据但是不会持久化queue里面的数据。发送消息的时候讲消息的deliveryMode设置为2这样消息就会被设为持久化方式此时RabbitMQ就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。 而且持久化可以跟生产的confirm机制配合起来只有消息持久化到了磁盘之后才会通知生产者ack这样就算是在持久化之前RabbitMQ挂了数据丢了生产者收不到ack回调也会进行消息重发。 消费端丢消息 使用RabbitMQ提供的ack机制首先关闭RabbitMQ的自动ack然后每次在确保处理完这个消息之后在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。