网站建设数据处理,宁波seo入门教程,wordpress 同步插件,怎么做 社区网站转载自 滴滴出行基于RocketMQ构建企业级消息队列服务的实践
本文整理自滴滴出行消息队列负责人 江海挺 在Apache RocketMQ开发者沙龙北京站的分享。通过本文#xff0c;您将了解到滴滴出行#xff1a; 1. 在消息队列技术选型方面的思考#xff1b; 2. 为什么选择 RocketMQ…转载自 滴滴出行基于RocketMQ构建企业级消息队列服务的实践
本文整理自滴滴出行消息队列负责人 江海挺 在Apache RocketMQ开发者沙龙北京站的分享。通过本文您将了解到滴滴出行 1. 在消息队列技术选型方面的思考 2. 为什么选择 RocketMQ 作为出行业务的消息队列解决方案 3. 如何构建自己的消息队列服务 4. 在 RocketMQ 上的扩展改造实践 5. 在 RocketMQ 上的实践经验。 江海挺 滴滴出行消息队列负责人Apache RocketMQ Contributor大学毕业后一直在做消息队列领域相关的技术、产品和服务积累了丰富的实践经验沉淀了不少关于消息队列的思考。 滴滴出行的消息技术选型
1.1 历史
初期公司内部没有专门的团队维护消息队列服务所以消息队列使用方式较多主要以Kafka为主有业务直连的也有通过独立的服务转发消息的。另外有一些团队也会用RocketMQ、Redis的list甚至会用比较非主流的beanstalkkd。导致的结果就是比较混乱无法维护资源使用也很浪费。 1.2 为什么弃用 Kafka
一个核心业务在使用Kafka的时候出现了集群数据写入抖动非常严重的情况经常会有数据写失败。
主要有两点原因 随着业务增长Topic的数据增多集群负载增大性能下降 我们用的是Kafka0.8.2那个版本有个bug会导致副本重新复制复制的时候有大量的读我们存储盘用的是机械盘导致磁盘IO过大影响写入。
所以我们决定做自己的消息队列服务。 首先需要解决业务方消息生产失败的问题。因为这个Kafka用的是发布/订阅模式一个topic的订阅方会有很多涉及到的下游业务也就非常多没办法一口气直接替换Kafka迁移到新的一个消息队列服务上。所以我们当时的方案是加了一层代理然后利用codis作为缓存解决了Kafka不定期写入失败的问题如上图。当后面的Kafka出现不可写入的时候我们就会先把数据写入到codis中然后延时进行重试直到写成功为止。
1.3 为什么选择 RocketMQ
经过一系列的调研和测试之后我们决定采用RocketMQ具体原因在后面会介绍。 为了支持多语言环境、解决一些迁移和某些业务的特殊需求我们又在消费侧加上了一个代理服务。然后形成了这么一个核心框架。业务端只跟代理层交互。中间的消息引擎负责消息的核心存储。在之前的基本框架之后我们后面就主要围绕三个方向做。 迁移把之前提到的所有五花八门的队列环境全部迁移到我们上面。这里面的迁移方案后面会跟大家介绍一下。 功能迭代和成本性能上的优化。 服务化业务直接通过平台界面来申请资源申请到之后直接使用。
1.4 演进中的架构 这张图是我们消息队列服务的一个比较新的现状。先纵向看上面是生产的客户端包括了7种语言。然后是我们的生产代理服务。在中间的是我们的消息存储层。目前主要的消息存储引擎是RocketMQ。然后还有一些在迁移过程中的Kafka。另一个是Chronos它是我们延迟消息的一个存储引擎。
再下面就是消费代理。消费代理同样提供了多种语言的客户端还支持多种协议的消息主动推送功能包括HTTP 协议 RESTful方式。结合我们的groovy脚本功能还能实现将消息直接转存到Redis、Hbase和HDFS上。此外我们还在陆续接入更多的下游存储。
除了存储系统之外我们也对接了实时计算平台例如FlinkSparkStorm左边是我们的用户控制台和运维控制台。这个是我们服务化的重点。用户在需要使用队列的时候就通过界面申请Topic填写各种信息包括身份信息消息的峰值流量消息大小消息格式等等。然后消费方通过我们的界面就可以申请消费。
运维控制台主要负责我们集群的管理自动化部署流量调度状态显示之类的功能。最后所有运维和用户操作会影响线上的配置都会通过ZooKeeper进行同步。 为什么选择RocketMQ
我们围绕以下两个纬度进行了对比测试结果显示RocketMQ的效果更好。
2.1 测试-topic数量的支持
测试环境Kafka 0.8.2RocketMQ 3.4.61.0 Gbps Network16 threads
测试结果如下 这张图是Kafka和RocketMQ在不同topic数量下的吞吐测试。横坐标是每秒消息数纵坐标是测试case。同时覆盖了有无消费和不同消息体的场景。一共8组测试数据每组数据分别在Topic个数为16、32、64、128、256时获得的每个topic包括8个Partition。下面四组数据是发送消息大小为128字节的情况上面四种是发送2k消息大小的情况。on 表示消息发送的时候同时进行消息消费off表示仅进行消息发送。
先看最上面一组数据用的是Kafka开启消费每条消息大小为2048字节可以看到随着Topic数量增加到256 Topic之后吞吐极具下降。第二组是是RocketMQ。可以看到Topic增大之后影响非常小。第三组和第四组是上面两组关闭了消费的情况。结论基本类似整体吞吐量会高那么一点点。
下面的四组跟上面的区别是使用了128字节的小消息体。可以看到Kafka吞吐受Topic数量的影响特别明显。对比来看虽然topic比较小的时候RocketMQ吞吐较小但是基本非常稳定对于我们这种共享集群来说比较友好。
2.2 测试-延迟 Kafka
测试环境Kafka 0.8.2.2topic1/8/32Ack1/allreplica3
测试结果 横坐标对应吞吐纵坐标对应延迟时间
上面的一组的3条线对应Ack3需要3个备份都确认后才完成数据的写入。下面的一组的3条线对应Ack1有1个备份收到数据后就可以完成写入。可以看到下面一组只需要主备份确认的写入延迟明显较低。每组的三条线之间主要是Topic数量的区别Topic数量增加延迟也增大了。 RocketMQ
测试环境
RocketMQ 3.4.6brokerRoleASYNC/SYNC_MASTER, 2 Slave
flushDiskTypeSYNC_FLUSH/ASYNC_FLUSH
测试结果 上面两条是同步刷盘的情况延迟相对比较高。下面的是异步刷盘。橙色的线是同步主从蓝色的线是异步主从。然后可以看到在副本同步复制的情况下即橙色的线4w的TPS之内都不超过1ms。用这条橙色的线和上面Kafka的图中的上面三条线横向比较来看Kafka超过1w TPS 就超过1ms了。Kafka的延迟明显更高。 如何构建自己的消息队列
3.1 问题与挑战 面临的挑战顺时针看 客户端语言需要支持PHP、Go、Java、C 只有3个开发人员 决定用RocketMQ但是没看过源码 上线时间紧线上的Kafka还有问题 可用性要求高。
使用RocketMQ时的两个问题 客户端语言支持不全以Java为主而我们还需要支持PHP、Go、C 功能特别多如tag、property、消费过滤、RETRYtopic、死信队列、延迟消费之类的功能但这对我们稳定性维护来说挑战非常大。
针对以上两个问题的解决办法如下图所示 使用ThriftRPC框架来解决跨语言的问题 简化调用接口。可以认为只有两个接口send用来生产pull用来消费。
主要策略就是坚持KISS原则Keep it simple, stupid保持简单先解决最主要的问题让消息能够流转起来。然后我们把其他主要逻辑都放在了proxy这一层来做比如限流、权限认证、消息过滤、格式转化之类的。这样我们就能尽可能地简化客户端的实现逻辑不需要把很多功能用各种语言都写一遍。
3.2 迁移方案
架构确定后接下来是我们的一个迁移过程。 迁移这个事情在pub-sub的消息模型下会比较复杂。因为下游的数据消费方可能很多上游的数据没法做到一刀切流量这就会导致整个迁移的周期特别长。然后我们为了尽可能地减少业务迁移的负担加快迁移的效率我们在Proxy层提供了双写和双读的功能。 双写ProcucerProxy同时写RocketMQ和Kafka 双读ConsumerProxy同时从RocketMQ和Kafka消费数据。
有了这两个功能之后我们就能提供以下两种迁移方案了。
3.2.1 双写
生产端双写同时往Kafka和RocketMQ写同样的数据保证两边在整个迁移过程中都有同样的全量数据。Kafka和RocketMQ有相同的数据这样下游的业务也就可以开始迁移。如果消费端不关心丢数据那么可以直接切换切完直接更新消费进度。如果需要保证消费必达可以先在ConsumerProxy设置消费进度消费客户端保证没有数据堆积后再去迁移这样会有一些重复消息一般客户端会保证消费处理的幂等。
生产端的双写其实也有两种方案 客户端双写如下图 业务那边不停原来的kafka 客户端。只是加上我们的客户端往RocketMQ里追加写。这种方案在整个迁移完成之后业务还需要把老的写入停掉。相当于两次上线。 Producer Proxy双写如下图 业务方直接切换生产的客户端只往我们的proxy上写数据。然后我们的proxy负责把数据复制同时写到两个存储引擎中。这样在迁移完成之后我们只需要在Proxy上关掉双写功能就可以了。对生产的业务方来说是无感知的生产方全程只需要改造一次上一下线就可以了。
所以表面看起来应该还是第二种方案更加简单。但是从整体可靠性的角度来看一般还是认为第一种相对高一点。因为客户端到Kafka这一条链路业务之前都已经跑稳定了。一般不会出问题。但是写我们Proxy就不一定了在接入过程中是有可能出现一些使用上的问题导致数据写入失败这就对业务方测试质量的要求会高一点。然后消费的迁移过程其实风险是相对比较低的。出问题的时候可以立即回滚。因为它在老的Kafka上消费进度是一直保留的而且在迁移过程中可以认为是全量双消费。
以上就是数据双写的迁移方案这种方案的特点就是两个存储引擎都有相同的全量数据。
3.2.2 双读
特点保证不会重复消费。对于P2P 或者消费下游不太多或者对重复消费数据比较敏感的场景比较适用。 这个方案的过程是这样的消费先切换。全部迁移到到我们的Proxy上消费Proxy从Kafka上获取。这个时候RocketMQ上没有流量。但是我们的消费Proxy保证了双消费一旦RocketMQ有流量了客户端同样也能收到。然后生产方改造客户端直接切流到RocketMQ中这样就完成了整个流量迁移过程。运行一段时间比如Kafka里的数据都过期之后就可以把消费Proxy上的双消费关了下掉Kafka集群。
整个过程中生产直接切流所以数据不会重复存储。然后在消费迁移的过程中我们消费Proxy上的group和业务原有的group可以用一个名字这样就能实现迁移过程中自动rebalance这样就能实现没有大量重复数据的效果。所以这个方案对重复消费比较敏感的业务会比较适合的。这个方案的整个过程中消费方和生产方都只需要改造一遍客户端上一次线就可以完成。 RocketMQ扩展改造
说完迁移方案这里再简单介绍一下我们在自己的RocketMQ分支上做的一些比较重要的事情。
首先一个非常重要的一点是主从的自动切换。
熟悉RocketMQ的同学应该知道目前开源版本的RocketMQ broker 是没有主从自动切换的。如果你的Master挂了那你就写不进去了。然后slave只能提供只读的功能。当然如果你的topic在多个主节点上都创建了虽然不会完全写不进去但是对单分片顺序消费的场景还是会产生影响。所以呢我们就自己加了一套主从自动切换的功能。
第二个是批量生产的功能。
RocketMQ4.0之后的版本是支持批量生产功能的。但是限制了只能是同一个ConsumerQueue的。这个对于我们的Proxy服务来说不太友好因为我们的proxy是有多个不同的topic的所以我们就扩展了一下让它能够支持不同Topic、不同Consume Queue。原理上其实差不多只是在传输的时候把Topic和Consumer Queue的信息都编码进去。
第三个元信息管理的改造。
目前RocketMQ单机能够支持的Topic数量基本在几万这么一个量级在增加上去之后元信息的管理就会非常耗时对整个吞吐的性能影响相对来说就会非常大。然后我们有个场景又需要支持单机百万左右的Topic数量所以我们就改造了一下元信息管理部分让RocketMQ单机能够支撑的Topic数量达到了百万。
后面一些就不太重要了比如集成了我们公司内部的一些监控和部署工具修了几个bug也给提了PR。最新版都已经fix掉了。 RocketMQ使用经验
接下来再简单介绍一下我们在RocketMQ在使用和运维上的一些经验。主要是涉及在磁盘IO性能不够的时候一些参数的调整。
5.1 读老数据的问题
我们都知道RocketMQ的数据是要落盘的一般只有最新写入的数据才会在PageCache中。比如下游消费数据因为一些原因停了一天之后又突然起来消费数据。这个时候就需要读磁盘上的数据。然后RocketMQ的消息体是全部存储在一个append only的 commitlog 中的。如果这个集群中混杂了很多不同topic的数据的话要读的两条消息就很有可能间隔很远。最坏情况就是一次磁盘IO读一条消息。这就基本等价于随机读取了。如果磁盘的IOPSInput/Output Operations Per Second扛不住还会影响数据的写入这个问题就严重了。
值得庆幸的是RocketMQ提供了自动从Slave读取老数据的功能。这个功能主要由slaveReadEnable这个参数控制。默认是关的slaveReadEnable false bydefault。推荐把它打开主从都要开。这个参数打开之后在客户端消费数据时会判断当前读取消息的物理偏移量跟最新的位置的差值是不是超过了内存容量的一个百分比accessMessageInMemoryMaxRatio 40 by default。如果超过了就会告诉客户端去备机上消费数据。如果采用异步主从也就是brokerRole等于ASYNC_AMSTER的时候你的备机IO打爆其实影响不太大。但是如果你采用同步主从那还是有影响。所以这个时候最好挂两个备机。因为RocketMQ的主从同步复制只要一个备机响应了确认写入就可以了一台IO打爆问题不大。
5.2 过期数据删除
RocketMQ默认数据保留72个小时fileReservedTime72。然后它默认在凌晨4点开始删过期数据deleteWhen04。你可以设置多个值用分号隔开。因为数据都是定时删除的所以在磁盘充足的情况数据的最长保留会比你设置的还多一天。又由于默认都是同一时间删除一整天的数据如果用了机械硬盘一般磁盘容量会比较大需要删除的数据会特别多这个就会导致在删除数据的时候磁盘IO被打满。这个时候又要影响写入了。
为了解决这个问题可以尝试多个方法一个是设置文件删除的间隔有两个参数可以设置 deleteCommitLogFilesInterval 100毫秒。每删除10个commitLog文件的时间间隔 deleteConsumeQueueFilesInterval100毫秒。每删除一个ConsumeQueue文件的时间间隔。
另外一个就是增加删除频率把00-23都写到deleteWhen就可以实现每个小时都删数据。
5.3 索引
默认情况下所有的broker都会建立索引messageIndexEnabletrue。这个索引功能可以支持按照消息的uniqId消息的key来查询消息体。索引文件实现的时候本质上也就是基于磁盘的个一个hashmap。如果broker上消息数量比较多查询的频率比较高这也会造成一定的IO负载。所以我们的推荐方案是在Master上关掉了index功能只在slave上打开。然后所有的index查询全部在slave上进行。当然这个需要简单修改一下MQAdminImpl里的实现。因为默认情况下它会向Master发出请求。