青岛低价网站建设,达内it教育官网,企业网站模板下载哪家公司强,温州企业建站系统RocketMQ 面试FAQ
说说你们公司线上生产环境用的是什么消息中间件?
为什么要使用MQ#xff1f;
因为项目比较大#xff0c;做了分布式系统#xff0c;所有远程服务调用请求都是同步执行经常出问题#xff0c;所以引入了mq
解耦
系统耦合度降低#xff0c;没有强依赖…RocketMQ 面试FAQ
说说你们公司线上生产环境用的是什么消息中间件?
为什么要使用MQ
因为项目比较大做了分布式系统所有远程服务调用请求都是同步执行经常出问题所以引入了mq
解耦
系统耦合度降低没有强依赖关系
异步
不需要同步执行的远程调用可以有效提高响应时间
削峰
请求达到峰值后后端service还可以保持固定消费速率消费不会被压垮
多个mq如何选型
RabbitMQ
erlang开发延迟比较低
RocketMQ
java开发面向互联网集群化功能丰富
kafka
Scala开发面向日志功能丰富
ActiveMQ
java开发简单稳定
小项目ActiveMQ
大项目RocketMQ或kafka、RabbitMq
RocketMQ由哪些角色组成每个角色作用和特点是什么
nameserver 无状态 动态列表
producer
broker
consumer
RocketMQ中的Topic和ActiveMQ有什么区别
ActiveMQ
有destination的概念即消息目的地
destination分为两类
topic 广播消息 queue 队列消息
RocketMQ
RocketMQ的Topic是一组Message Queue的集合 ConsumeQueue
一条消息是广播消息还是队列消息由客户端消费决定
RocketMQ Broker中的消息被消费后会立即删除吗
不会每条消息都会持久化到CommitLog中每个consumer连接到broker后会维持消费进度信息当有消息消费后只是当前consumer的消费进度CommitLog的offset更新了。
那么消息会堆积吗什么时候清理过期消息
4.6版本默认48小时后会删除不再使用的CommitLog文件
检查这个文件最后访问时间判断是否大于过期时间指定时间删除默认凌晨4点
RocketMQ消费模式有几种
消费模型由consumer决定消费维度为Topic
集群消费
一组consumer同时消费一个topic可以分配消费负载均衡策略分配consumer对应消费topic下的哪些queue
多个group同时消费一个topic时每个group都会消费到数据
一条消息只会被一个group中的consumer消费
广播消费
消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group 消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
消费消息时使用的是push还是pull
在刚开始的时候就要决定使用哪种方式消费
两种
DefaultLitePullConsumerImpl 拉
DefaultMQPushConsumerImpl推
两个实现 DefaultLitePullConsumerImpl DefaultMQPushConsumerImpl都实现了MQConsumerInner接口接口
名称上看起来是一个推一个拉但实际底层实现都是采用的长轮询机制即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
为什么要主动拉取消息而不使用事件监听方式
事件驱动方式是建立好长连接由事件发送数据的方式来实时推送。
如果broker主动推送消息的话有可能push速度快消费速度慢的情况那么就会造成消息在consumer端堆积过多同时又不能被其他consumer消费的情况
说一说几种常见的消息同步机制
push
如果broker主动推送消息的话有可能push速度快消费速度慢的情况那么就会造成消息在consumer端堆积过多同时又不能被其他consumer消费的情况
pull
轮训时间间隔固定值的话会造成资源浪费
长轮询
上连接 短连接每秒 长轮询
broker如何处理拉取请求的
consumer首次请求broker
broker中是否有符合条件的消息有 - 响应consumer等待下次consumer的请求 没有 挂起consumer的请求即不断开连接也不返回数据挂起时间长短写死在代码里的吗长轮询写死短轮询可以配使用consumer的offset DefaultMessageStore#ReputMessageService#run方法 每隔1ms检查commitLog中是否有新消息有的话写入到pullRequestTable当有新消息的时候返回请求 PullRequestHoldService 来Hold连接每个5s执行一次检查pullRequestTable有没有消息有的话立即推送
RocketMQ如何做负载均衡
通过Topic在多broker种分布式存储实现
producer端
发送端指定Target message queue发送消息到相应的broker来达到写入时的负载均衡
提升写入吞吐量当多个producer同时向一个broker写入数据的时候性能会下降消息分布在多broker种为负载消费做准备
每 30 秒从 nameserver获取 Topic 跟 Broker 的映射关系近实时获取最新数据存储单元queue落地在哪个broker中
在使用api中send方法的时候可以指定Target message queue写入或者使用MessageQueueSelector
默认策略是随机选择
producer维护一个index每次取节点会自增index向所有broker个数取余自带容错策略
其他实现
SelectMessageQueueByHash hash的是传入的args SelectMessageQueueByRandomSelectMessageQueueByMachineRoom 没有实现
也可以自定义实现MessageQueueSelector接口中的select方法
MessageQueue select(final ListMessageQueue mqs, final Message msg, final Object arg);可以自定义规则来选择mqs
如何知道mqs的mqs的数据从哪儿来
producer.start()方法
参考源码
启动producer的时候会向nameserver发送心跳包获取nameserver中的topic列表使用topic向nameserver获取topicRouteData
TopicRouteData对象表示与某一个topic有关系的broker节点信息内部包含多个QueueData对象可以有多个broker集群支持该topic和多个BrokerData信息多个集群的多个节点信息都在该列表中
producer加工TopicRouteData对应的多节点信息后返回mqs。
consumer端
客户端完成负载均衡
获取集群其他节点当前节点消费哪些queue负载粒度直到Message Queueconsumer的数量最好和Message Queue的数量对等或者是倍数不然可能会有消费倾斜每个consumer通过balanced维护processQueueTable processQueueTable为当前consumer的消费queueprocessQueueTable中有 ProcessQueue 维护消费进度从broker中拉取回来的消息缓冲MessageQueue 用来定位查找queue
DefaultMQPushConsumer默认 使用AllocateMessageQueueAveragely平均分配
当消费负载均衡consumer和queue不对等的时候会发生什么
平均分配 环形分配 负载均衡算法
平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) 手动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) 一致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)
consumer启动流程参考源码
消息丢失
SendResult
producer在发送同步/异步可靠消息后会接收到SendResult表示消息发送成功
SendResult其中属性sendStatus表示了broker是否真正完成了消息存储
当sendStatus!ok的时候应该重新发送消息避免丢失
当producer.setRetryAnotherBrokerWhenNotStoreOK
消息重复消费
影响消息正常发送和消费的重要原因是网络的不确定性。
可能是因为consumer首次启动引起重复消费
需要设置consumer.setConsumeFromWhere
只对一个新的consumeGroup第一次启动时有效,设置从头消费还是从维护开始消费
你们怎么保证投递出去的消息只有一条且仅仅一条不会出现重复的数据?
绑定业务key
如果消费了重复的消息怎么保证数据的准确性?
引起重复消费的原因
ACK
正常情况下在consumer真正消费完消息后应该发送ack通知broker该消息已正常消费从queue中剔除
当ack因为网络原因无法发送到brokerbroker会认为词条消息没有被消费此后会开启消息重投机制把消息再次投递到consumer
group
在CLUSTERING模式下消息在broker中会保证相同group的consumer消费一次但是针对不同group的consumer会推送多次
解决方案
数据库表
处理消息前使用消息主键在表中带有约束的字段中insert
Map
单机时可以使用map ConcurrentHashMap - putIfAbsent guava cache
Redis
使用主键或set操作
如何让RocketMQ保证消息的顺序消费
你们线上业务用消息中间件的时候是否需要保证消息的顺序性?
如果不需要保证消息顺序为什么不需要?假如我有一个场景要保证消息的顺序你们应该如何保证? 同一topic 同一个QUEUE 发消息的时候一个线程去发送消息 消费的时候 一个线程 消费一个queue里的消息或者使用MessageListenerOrderly 多个queue 只能保证单个queue里的顺序
应用场景是啥
应用系统和现实的生产业务绑定避免在分布式系统中多端消费业务消息造成顺序混乱
比如需要严格按照顺序处理的数据或业务
数据包装/清洗
数据
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;去掉import统计某个字符出现次数
业务流程处理
返修过程
收件录入信息信息核对送入检修系统处理
电商订单
创建订单检查库存预扣库存支付真扣库存
binlog同步
RocketMQ如何保证消息不丢失
生产端如何保证投递出去的消息不丢失消息在半路丢失或者在MQ内存中宕机导致丢失此时你如何基于MQ的功能保证消息不要丢失MQ自身如何保证消息不丢失消费端如何保证消费到的消息不丢失如果你处理到一半消费端宕机导致消息丢失此时怎么办
解耦的思路
发送方
发送消息时做消息备份记日志或同步到数据库判断sendResult是否正常返回
broker
节点保证
master接受到消息后同步刷盘保证了数据持久化到了本机磁盘中同步写入slave写入完成后返回SendResult
consumer
记日志同步执行业务逻辑最后返回ack异常控制
磁盘保证
使用Raid磁盘阵列保证数据磁盘安全
网络数据篡改
内置TLS可以开启默认使用crc32校验数据
消息刷盘机制底层实现
每间隔10ms执行一次数据持久化操作
两种 同步刷、异步刷 public void run() {CommitLog.log.info(this.getServiceName() service started);while (!this.isStopped()) {try {this.waitForRunning(10);this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() service has exception. , e);}}
rocketMq的消息堆积如何处理
下游消费系统如果宕机了导致几百万条消息在消息中间件里积压此时怎么处理?
你们线上是否遇到过消息积压的生产故障?如果没遇到过你考虑一下如何应对?
具体表现为 ui中转圈圈
对于大规模消息发送接收可以使用pull模式手动处理消息拉取速度,消费的时候统计消费时间以供参考
保证消息消费速度固定即可通过上线更多consumer临时解决消息堆积问题
如果consumer和queue不对等上线了多台也在短时间内无法消费完堆积的消息怎么办 准备一个临时的topic queue的数量是堆积的几倍 queue分不到多broker种 上线一台consumer做消息的搬运工把原来topic中的消息挪到新的topic里不做业务逻辑处理只是挪过去 上线N台consumer同时消费临时topic中的数据 改bug 恢复原来的consumer继续消费之前的topic
堆积时间过长消息超时了
RocketMQ中的消息只会在commitLog被删除的时候才会消失不会超时
堆积的消息会不会进死信队列
不会消息在消费失败后会进入重试队列%RETRY%consumergroup多次默认16才会进入死信队列%DLQ%consumergroup
你们用的是RocketMQ?那你说说RocketMQ的底层架构原理磁盘上数据如何存储的整体分布式架构是如何实现的?
零拷贝等技术是如何运用的?
使用nio的MappedByteBuffer调起数据输出
你们用的是RocketMQ?RocketMQ很大的一个特点是对分布式事务的支持你说说他在分布式事务支持这块机制的底层原理?
分布式系统中的事务可以使用TCCTry、Confirm、Cancel、2pc来解决分布式系统中的消息原子性
RocketMQ 4.3提供分布事务功能通过 RocketMQ 事务消息能达到分布式事务的最终一致
RocketMQ实现方式
**Half Message**预处理消息当broker收到此类消息后会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
**检查事务状态**Broker会开启一个定时任务消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息每次执行任务会向消息发送者确认事务执行状态提交、回滚、未知如果是未知等待下一次回调。
**超时**如果超过回查次数默认回滚消息
TransactionListener的两个方法
executeLocalTransaction
半消息发送成功触发此方法来执行本地事务
checkLocalTransaction
broker将发送检查消息来检查事务状态并将调用此方法来获取本地事务状态
本地事务执行状态
LocalTransactionState.COMMIT_MESSAGE
执行事务成功确认提交
LocalTransactionState.ROLLBACK_MESSAGE
回滚消息broker端会删除半消息
LocalTransactionState.UNKNOW
暂时为未知状态等待broker回查
如果让你来动手实现一个分布式消息中间件整体架构你会如何设计实现?
看过RocketMQ 的源码没有。如果看过说说你对RocketMQ 源码的理解?
高吞吐量下如何优化生产者和消费者的性能?
消费 同一group下多机部署并行消费 单个consumer提高消费线程个数 批量消费 消息批量拉取业务逻辑批量处理
运维
网卡调优jvm调优多线程与cpu调优Page Cache
再说说RocketMQ 是如何保证数据的高容错性的?
在不开启容错的情况下轮询队列进行发送如果失败了重试的时候过滤失败的Broker如果开启了容错策略会通过RocketMQ的预测机制来预测一个Broker是否可用如果上次失败的Broker可用那么还是会选择该Broker的队列如果上述情况失败则随机选择一个进行发送在发送消息的时候会记录一下调用的时间与是否报错根据该时间去预测broker的可用时间