网站首页设计说明,代做网站和说明书,十大农业网站,网站做友链1.MQ概述 1.1 RocketMQ简介 RocketMQ 是阿里开源的分布式消息中间件#xff0c;跟其它中间件相比#xff0c;RocketMQ 的特点是纯JAVA实现#xff0c;是一套提供了消息生产#xff0c;存储#xff0c;消费全过程API的软件系统。 1.2 MQ用途
限流削峰 MQ可以将系统的超量请…1.MQ概述 1.1 RocketMQ简介 RocketMQ 是阿里开源的分布式消息中间件跟其它中间件相比RocketMQ 的特点是纯JAVA实现是一套提供了消息生产存储消费全过程API的软件系统。 1.2 MQ用途
限流削峰 MQ可以将系统的超量请求暂存其中以便系统后期可以慢慢进行处理从而避免了请求的丢失或系统被压垮。 异步解耦 上游系统对下游系统的调用若为同步调用则会大大降低系统的吞吐量与并发度且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化一般性做法就是在这两层间添加一个MQ层。 数据收集 分布式系统会产生海量级数据流如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总然后对这些数据流进行大数据分析这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。 1.3 常见MQ产品
RabbitMQ RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低且由于其不是Java语言开发所以公司内部对其实现定制化开发难度较大。 Kafka Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐量常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议而是使用自研协议。 RocketMQ RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验性能与稳定性非常高。其没有遵循任何常见的MQ协议而是使用自研协议。 对比 2.RocketMQ 基本概念 2.1 消息 消息是指消息系统所传输信息的物理载体生产和消费数据的最小单位每条消息必须属于一个主题。单个消息所占空间不会很大。
RocketMQ中每个消息拥有唯一的MessageId且可以携带具有业务标识的Key以方便对消息的查询。不过需要注意的是MessageId有两个在生产者send()消息时会自动生成一个MessageIdmsgId)当消息到达Broker后Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。
msgId由producer端生成其生成规则为 producerIp 进程pid MessageClientIDSetter类的ClassLoader的hashCode 当前时间 AutomicInteger自增计数器 offsetMsgId由broker端生成其生成规则为brokerIp 物理分区的offsetQueue中的偏移量 key由用户指定的业务相关的唯一标识
2.2 主题 Topic表示一类消息的集合每个主题包含若干条消息每条消息只能属于一个主题是RocketMQ进行消息订阅的基本单位。 一个生产者可以同时发送多种Topic的消息而一个消费者只对某种特定的Topic感兴趣即只可以订阅和消费一种Topic的消息。
2.3 标签 标签为消息设置的标签用于同一主题下区分不同类型的消息。来自同一业务单元的消息可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑实现更好的扩展性。 Topic是消息的一级分类Tag是消息的二级分类。Topic相当于货物Tag相当于上海山东等地区。
2.4 队列 存储消息的物理实体。 一个Topic中可以包含多个Queue每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区Partition。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。
分片不同于分区。在RocketMQ中分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区即Queue每个Queue的大小都是相同的。 2.5 Producer 消息生产者负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递投递的过程支持快速失败并且低延迟。 例如用户提交的请求写入到MQ的过程就是消息生产的过程在这里用户就是生产者 。 RocketMQ中的消息生产者都是以生产者组Producer Group的形式出现的。生产者组是同一类生产者的集合这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列生产者组只有一个生产者生产者会采取轮询的方式进行发送消息。
生产者代码如下导入依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.0.2/version/dependency
生产者代码 public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer order new DefaultMQProducer(order);order.setNamesrvAddr(localhost:9876);order.start();Message message new Message(myTopic, myTag, (test).getBytes());SendResult result order.send(message);System.out.println(result);order.shutdown();} 2.6 Consumer 消息消费者负责消费消息。一个消息消费者会从Broker服务器中获取到消息并对消息进行相关业务处理。 例如系统从MQ中读取到请求并对请求进行处理的过程就是消息消费的过程在这里系统就是消费者。 RocketMQ中的消息消费者都是以消费者组Consumer Group的形式出现的。消费者组是同一类消费者的集合这类Consumer消费的是同一个Topic类型的消息。 消费者组使得在消息消费方面实现负载均衡将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer注意并不是将消息负载均衡和容错一个Consmer挂了该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue的目标变得非常容易。
消费者代码
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order);consumer.setNamesrvAddr(localhost:9876);consumer.subscribe(myTopic,*);consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(收到的消息list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} 负载均衡策略
queue 个数大于 Consumer个数 那么 Consumer 会平均分配 queue不够平均会根据clientId排序来拿取余数 queue个数小于Consumer个数那么会有Consumer闲置就是浪费掉了其余Consumer平均分配到queue
消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量则多出的Consumer将不能消费消息。
2.7 NameServer NameServer是一个Broker与Topic路由的注册中心支持Broker的动态注册与发现。 主要包括两个功能 Broker管理接受Broker集群的注册信息并且保存下来作为路由信息的基本数据提供心跳检测机制检查Broker是否还存活。
路由信息管理每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息从而进行消息的投递和消费。NameServer可以获取整个Broker集群的路由信息从而进行消息的投递和消费。 路由注册 Name Server既然是注册中心那么是如何完成注册的呢 NameServer通常也是以集群的方式部署不过NameServer是无状态的即NameServer集群中的各个节点间是无差异的各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢在Broker节点启动时轮询NameServer列表与每个NameServer节点建立长连接发起注册请求。在NameServer内部维护着⼀个Broker列表用来动态存储Broker的信息。 Broker节点为了证明自己是活着的为了维护与NameServer间的长连接会将最新的信息以心跳包的方式上报给NameServer每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IPPort)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后会更新心跳时间戳记录这个Broker的最新存活时间。 路由剔除 由于Broker关机、宕机或网络抖动等原因NameServer没有收到Broker的心跳NameServer可能会将其从Broker列表中剔除。 NameServer中有⼀个定时任务每隔10秒就会扫描⼀次Broker表查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒如果超过则会判定Broker失效然后将其从Broker列表中剔除。 路由发现 RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时NameServer不会主动推送给客户端而是客户端定时拉取Topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。
2.8 Broker Broker充当着消息中转角色负责存储消息、转发消息。 Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据包括消费者组消费进度偏移offset、主题、队列等。
模块如下图
Remoting Module整个Broker的实体负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。
Client Manager客户端管理器。负责接收、解析客户端(Producer/Consumer)请求管理客户端。例如维护Consumer的Topic订阅信息
Store Service存储服务。提供方便简单的API接口处理消息存储到物理硬盘和消息查询功能。
HA Service高可用服务提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service索引服务。根据特定的Message key对投递到Broker的消息进行索引服务同时也提供根据Message Key对消息进行快速查询的功能。 2.9 RocketMQ 工作流程 工作流程如下图 1启动NameServerNameServer启动后开始监听端口等待Broker、Producer、Consumer连接。 2启动Broker时Broker会与所有的NameServer建立并保持长连接然后每50秒向NameServer定时发送心跳包。 3发送消息前可以先创建Topic创建Topic时需要指定该Topic要存储在哪些Broker上当然在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过这步是可选的也可以在发送消息时自动创建Topic。 4) Producer发送消息启动时先跟NameServer集群中的其中一台建立长连接并从NameServer中获取路由信息即当前发送的Topic消息的Queue与Broker的地址IPPort)的映射关系。然后根据算法策略从队选择一个Queue与队列所在的Broker建立长连接从而向Broker发消息。当然在获取到路由信息后Producer会首先将路由信息缓存到本地再每30秒从NameServer更新一次路由信息。 5)Consumer跟Producer类似跟其中一台NameServer建立长连接获取其所订阅Topic的路由信息然后根据算法策略从路由信息中获取到其所要消费的Queue然后直接跟Broker建立长连接开始消费其中的消息。Consumer在获取到路由信息后同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是Consumer还会向Broker发送心跳以确保Broker的存活状态。