新沂网站建设公司,网站建设大作业有代码,山东营销型网站,深圳公司网站建设服务文章目录 1、源码环境搭建1.1、主要功能模块1.2、源码启动服务1.2.1、 启动nameServer1.2.2、 启动Broker1.2.3、 发送消息1.2.4、 消费消息 2、源码剖析2.1、NameServer的启动过程2.2、Broker服务启动过程2.3、Netty服务注册框架2.3.1、关注重点2.3.2、源码重点 1、源码环境搭… 文章目录 1、源码环境搭建1.1、主要功能模块1.2、源码启动服务1.2.1、 启动nameServer1.2.2、 启动Broker1.2.3、 发送消息1.2.4、 消费消息 2、源码剖析2.1、NameServer的启动过程2.2、Broker服务启动过程2.3、Netty服务注册框架2.3.1、关注重点2.3.2、源码重点 1、源码环境搭建
1.1、主要功能模块
RocketMQ的官方Git仓库地址https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。 也可以到RocketMQ的官方网站上下载指定版本的源码 http://rocketmq.apache.org/dowloading/releases/ 源码下很多的功能模块很容易让人迷失方向我们只关注下几个最为重要的模块
broker: Broker 模块broke 启动进程client 消息客户端包含消息生产者、消息消费者相关类example: RocketMQ 例代码namesrvNameServer模块store消息存储模块remoting远程访问模块
1.2、源码启动服务
将源码导入IDEA后需要先对源码进行编译。编译指令 clean install -Dmaven.test.skiptrue 编译完成后就可以开始调试代码了。调试时需要按照以下步骤 调试时先在项目目录下创建一个conf目录并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
1.2.1、 启动nameServer
展开namesrv模块运行NamesrvStartup类即可启动NameServer 启动时会报错提示需要配置一个ROCKETMQ_HOME环境变量。 这个环境变量我们可以在机器上配置跟配置JAVA_HOME环境变量一样。也可以在IDEA的运行环境中配置。目录指向源码目录即可。
配置完成后再次执行看到以下日志内容表示NameServer启动成功 1.2.2、 启动Broker
启动Broker之前我们需要先修改之前复制的broker.conf文件
brokerClusterName DefaultCluster
brokerName broker-a
brokerId 0
deleteWhen 04
fileReservedTime 48
brokerRole ASYNC_MASTER
flushDiskType ASYNC_FLUSH# 自动创建Topic
autoCreateTopicEnabletrue
# nameServ地址
namesrvAddr127.0.0.1:9876
# 存储路径
storePathRootDirE:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLogE:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueueE:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndexE:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpointE:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFileE:\\RocketMQ\\data\\rocketmq\\dataDir\\abort然后Broker的启动类是broker模块下的BrokerStartup。
启动Broker时同样需要ROCETMQ_HOME环境变量并且还需要配置一个-c 参数指向broker.conf配置文件。
然后重新启动即可启动Broker。
1.2.3、 发送消息
启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。
但是在测试源码中需要指定NameServer地址。这个NameServer地址有两种指定方式一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer
producer.setNamesrvAddr(127.0.0.1:9876);然后就可以发送消息了。
1.2.4、 消费消息
可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址
consumer.setNamesrvAddr(192.168.232.128:9876);这样整个调试环境就搭建好了。
2、源码剖析
2.1、NameServer的启动过程
关注重点 在RocketMQ集群中实际记性消息存储、推送等核心功能点额是Broker。而NameServer的作用其实和微服务中的注册中心非常类似他只是提供了Broker端的服务注册与发现功能。 源码重点 NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController。这个Cotroller对象就跟MVC中的Controller是很类似的都是响应客户端的请求。只不过他响应的是基于Netty的客户端请求。
另外他的实际启动过程其实可以配合NameServer的启动脚本进行更深入的理解。
从NameServer启动和关闭这两个关键步骤我们可以总结出NameServer的组件其实并不是很多整个NameServer的结构是这样的 从这里也能看出 RocketMQ的整体源码风格就是典型的MVC思想。Controller响应请求Service处理业务各种Table保存消息。
2.2、Broker服务启动过程
关注重点
Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。
这里重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点。
源码重点
Broker启动的入口在BrokerStartup这个类可以从他的main方法开始调试。
启动过程关键点重点也是围绕一个BrokerController对象先创建然后再启动。
首先 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置
BrokerConfig Broker服务配置MessageStoreConfig 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置NettyServerConfig Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。NettyClientConfig Broker既要作为Netty服务端向客户端提供核心业务能力又要作为Netty客户端向NameServer注册心跳。
这些配置是我们了解如何优化 RocketMQ 使用的关键。
然后 在BrokerController.start方法可以看到启动了一大堆Broker的核心服务我们挑一些重要的
this.messageStore.start();//启动核心的消息存储组件this.remotingServer.start();
this.fastRemotingServer.start(); //启动两个Netty服务this.brokerOuterAPI.start();//启动客户端往外发请求BrokerController.this.registerBrokerAll //向NameServer注册心跳。this.brokerStatsManager.start();
this.brokerFastFailure.start();//这也是一些负责具体业务的功能组件我们现在不需要了解这些核心组件的具体功能只要有个大概Broker中有一大堆的功能组件负责具体的业务。后面等到分析具体业务时再去深入每个服务的细节。
我们需要抽象出Broker的一个整体结构
可以看到Broker启动了两个Netty服务他们的功能基本差不多。实际上在应用中可以通过producer.setSendMessageWithVIPChannel(true)让少量比较重要的producer走VIP的通道。而在消费者端也可以通过consumer.setVipChannelEnabled(true)让消费者支持VIP通道的数据。
2.3、Netty服务注册框架
2.3.1、关注重点
网络通信服务是构建分布式应用的基础也是我们去理解RocketMQ底层业务的基础。这里就重点梳理RocketMQ的这个服务注册框架理解各个业务进程之间是如何进行RPC远程通信的。
Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中涉及到的远程服务非常多在RocketMQ中NameServer主要是RPC的服务端RemotingServerBroker对于客户端来说是RPC的服务端RemotingServer而对于NameServer来说又是RPC的客户端。各种Client是RPC的客户端RemotingClient。
需要理解的是RocketMQ基于Netty保持客户端与服务端的长连接Channel。只要Channel是稳定的那么即可以从客户端发请求到服务端同样服务端也可以发请求到客户端。例如在事务消息场景中就需要Broker多次主动向Producer发送请求确认事务的状态。所以RemotingServer和RemotingClient都需要注册自己的服务。
2.3.2、源码重点
1、哪些组件需要Netty服务端哪些组件需要Netty客户端 比较好理解的NameServer需要NettyServer。客户端Producer和Consuer需要NettyClient。Broker需要NettyServer响应客户端请求需要NettyClient向NameServer注册心跳。但是有个问题 事务消息的Producer也需要响应Broker的事务状态回查他需要NettyServer吗
NameServer不需要NettyClient这也验证了之前介绍的NameServer之间不需要进行数据同步的说法。
2、所有的RPC请求数据都封账成RemotingCommand对象。而每个处理消息的服务逻辑都会封装成一个NettyRequestProcessor对象。
3、服务端和客户端都维护一个processorTable这是个HashMap。key是服务码requestCodevalue是对应的运行单元 PairNettyRequestProcessor,ExecutorService类型包含了处理Processor和执行线程的线程池。具体的Processor由业务系统自行注册。Broker服务注册见BrokerController.registerProcessor()客户端的服务注册见MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor统一处理所有服务。
4、请求类型分为REQUEST和RESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后可以先缓存到responseTable中等NettyClient下次来获取这样就不用阻塞Channel了可以提升请求吞吐量。猜一猜Producer的同步请求的流程是什么样的
5、重点理解remoting包中是如何实现全流程异步化。