建设项目环保验收网站,福永镇网站建设,株洲市住房和城乡建设局网站,html5网站演示send message源码解析
引入 send message方法作为我们经常使用的方法#xff0c;平时我们很难去关注他底层到底做了什么。大部分人只知道通过send message方法可以将消息发送到broker#xff0c;然后供消费者进行消费。其实不然#xff0c;消息从客户端发送到broker#x…send message源码解析
引入 send message方法作为我们经常使用的方法平时我们很难去关注他底层到底做了什么。大部分人只知道通过send message方法可以将消息发送到broker然后供消费者进行消费。其实不然消息从客户端发送到broker需要中间需要经过很多步骤比如首先客户端需要向nameserver拿路由拿到路由后才能将消息发送到对应的broker。消息到了broker需要先进行校验校验无误后再写到commitLog写完commitLog后再根据具体的策略判断是否需要同步到slave节点同步完slave节点完后才response给客户端。 源码阅读入口
// 客户端入口
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl// NameServer入口
org.apache.rocketmq.namesrv.processor.ClientRequestProcessor#getRouteInfoByTopic// Broker端入口
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest源码解析 RocketMQ中的客户端send方法提供了单条发送发送也批量发送的API不管是单条发送还是批量发送本质都是一样的批量发送会把消息集合包装一下了具体可以看batch里面的实现将消息集合封装了MessageBatch对象当然MessageBatch继承Message。然后再尝试去topicPublishInfoTable中拿路由如果没有就请求NameServer忽略经过Proxy层需要注意的是请求NameServer获取路由的这个过程是同步的同一时间只有一个线程可以请求NameServer需要等到NameServer返回之后才会执行后续的操作。拿到路由后再根据轮询策略选中其中一个broker进行发送。这就是发送消息客户端大致的逻辑总体来说是还是比较简单的。CONSUMER_SEND_MSG_BACK是消费者发过来的RETRY消息本次重点不在这里后续单独讲下这里。当消息到达Broker’端先根据请求头构建出一个MappingContext对象再把request对象封装成sendMessageContext执行注册到sendMessageProcessor里面的钩子方法sendMessageBefore之后根据是否是batch消息如果是batch消息执行sendBatchMessage不是执行sendMessage方法其实本质上还是一样的只是sendBatchMessage中间构建的是messageExtBatch对象而sendMessage构建的是messageExtBrokerInner对象。MessageExtBatch是MessageExtBrokerInner的子类所以两者后续还是共用一套逻辑然后根据是否开启异步写入执行asyncPutMessage或者putMessage同步的putMessage实际上还是调用的asynPutMessage只是要等到asyncPutMessage有返回值之后才执行后续的逻辑。我们这里以asyncPutMessage为主还是先执行注册到SendMessageProcessor里面的钩子方法SendMessageAfter然后再先判断时候是否是HA高可用高可用是需要等到消息写入slave节点成功之后才说明消息发送成功一般使用在一些金融场景对消息可靠性要求较高。然后再然后分配offset这个offset是由consumeQueue分配的分配完offset之后分配完了之后再将消息体append到commitLog的分配的buf中返回的状态码PUT_OK执行handleDiskFlush方法如果是配置的是同步刷盘就等到刷盘成功后返回如果是异步刷盘wakeup对应的FlushManager就算写入完成。上述执行成功后执行handleHA方法如果是不是HA模式执行response PUT_OK否则构建一个GroupCommitRequest对象put到haService里面对应slave节点写完最终才算发送成功。 参考 · https://rocketmq.apache.org/ · 基于Apache Rocket 5.1.0 · https://github.com/apache/rocketmq