软装设计师常用网站,wordpress文章点赞量,网站掉排名,东营招聘信息网官网一、什么是消息队列#xff1f;
提到消息队列是否唤醒了你脑海深处的记忆#xff1f;回看前面的这篇文章#xff1a;《Java 多线程系列Ⅳ#xff08;单例模式阻塞式队列定时器线程池#xff09;》#xff0c;其中我们在介绍阻塞队列时说过#xff0c;阻塞队列最大的用途…一、什么是消息队列
提到消息队列是否唤醒了你脑海深处的记忆回看前面的这篇文章《Java 多线程系列Ⅳ单例模式阻塞式队列定时器线程池》其中我们在介绍阻塞队列时说过阻塞队列最大的用途就是实现 生产者消费者模型。
我们知道对于生产者消费者模型来说它具有两个十分亮眼的特点 解耦合.削峰填谷. 1解耦合 在引入生产者消费者模型之前两台服务器之间通常是直接交互这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后两台服务器之间不再进行直接通信而是借助阻塞队列进行业务处理起到了解耦的效果。 2削峰填谷 在引入生产者消费者模型之前同样是两台服务器进行直接通信如果在一个时间点服务器 A 突然发送一组请求峰值此刻服务器 B 也会随之感受到峰值这种情况下很可能造成服务器故障。如果此时使用阻塞队列A 将收到的请求发给队列虽然队列中有很多请求但是服务器 B 仍然和以按照原有的节奏读取请求。 其实正是因为生产者消费者模型具有以上诸多好处在实际的后端开发中特别是分布式系统里跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来赋予更加丰富的功能封装成一个独立的服务器程序这个程序就称为 消息队列。
二、需求整理
1、生产者消费者模型核心概念 生产者 (Producer) 负责将消息发送到消息队列中。 消费者 (Consumer) 从消息队列中接收和处理消息。。 中间人 (Broker) 它负责接收发布者发送的消息并将这些消息存储在队列中然后将这些消息传递给订阅者。 发布 (Publish) 生产者将消息投递到中间人的过程。 订阅 (Subscribe) 消费者在中间人这里注册的过程。只有消费者注册之后当一个消息发布到消息队列时消息才会被发送给相应的订阅者。 根据以上概念我们可以大致画出生产者消费者模型概念图PS下面的每个模块均表示服务器
一个生产者一个消费者 N 个生产者N 个消费者 2、Broker 设计概要
我们当前的目的是为了实现一个消息队列其中 Broker 是最核心的部分它主要负责消息的 存储 和 转发其中涉及到的核心概念如下 虚拟主机 (VirtualHost) 类似于 MySQL 的 “database”是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据此时可以使用不同的 VirtualHost 进行区分。 交换机 (Exchange) 生产者把消息先发送到 Broker 的 Exchange 上再根据不同的规则把消息转发给不同的 Queue。 队列 (Queue) 真正用来存储消息的部分每个消费者决定自己从哪个 Queue 上读取消息根据订阅的队列。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)一个 Queue 也可以被多个 Exchange 绑定 (一个 Queue 中的消息可以来自于多个 Exchange)。 绑定 (Binding) Exchange 和 Queue 之间的关联关系Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。 消息 (Message) 具体来说是服务器之间的请求和响应。一个消息可以视为一个字符串二进制数据具体由程序员自定义。 上述概念在 Broker 中的体现如图所示 补充说明1数据存储
以上这些概念对应的数据既需要在内存中存储也需要在硬盘上存储以内存为主硬盘为辅 内存存储对于 MQ 来说能够高效的处理转发数据时非常关键的指标因此使用内存组织上述数据能够得到较高的效率。硬盘存储主要是为了防止内存中的数据随着进程/主机重启而丢失。 补充说明:2 交换机类型与转发规则
上面我们提到在生产者发送消息时首先会将消息发送到 Broker 的交换机上再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机它们分别是 Direct直接交换机、Fanout扇出交换机、Topic主题交换机、Header头部交换机。其中 Header 这种方式比较复杂也比较少见当前项目中主要实现了前三种下面分别对他们进行详细介绍
前要说明 以下 bindingKey绑定键是在创建队列和交换机绑定关系时指定的关键字。以下 routingKey路由键是生产者发送消息时指定的关键字。 1Direct直接交换机 生产者发送消息时会指定一个目标队列的名字此时的 routingKey 就是 队列的名字bindingKey 无效交换机收到消息后查看当前交换机对应的绑定里面是否存在队列名字为routingKey的队列如果有就转发过去把消息塞进对应的“目标队列”中如果没有消息直接丢弃 2Fanout扇出交换机 生产者无需指定routingKey直接发送消息到指定交换机交换机收到消息后直接将消息转发给当前交换机已绑定的所有队列中。此时的 bindingKey 和 routingKey 对扇出交换机无效。 3Topic主题交换机 生产者发送消息时指定一个 routingKey交换机收到消息后查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配如果有就将消息转发到对应的绑定队列中。如果没有则将消息丢弃。 PS以上所有概念出自 AMQP 协议一个提供统一消息服务的应用层标准高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。
3、Broker 核心 API
Broker 基于以上概念和功能需要实现的核心 API 如下
创建队列 (queueDeclare)销毁队列 (queueDelete)创建交换机 (exchangeDeclare)销毁交换机 (exchangeDelete)创建绑定 (queueBind)解除绑定 (queueUnbind)发布消息 (basicPublish)订阅消息 (basicConsume)确认消息 (basicAck)
补充说明 从上面可以看出来在进行创建操作时并没有使用 create而是使用 declare这是从语义上说明这里的创建起到的效果是不存在则创建存在则啥也不做。上述并没有创建一个“消费消息”的API这是因为当前我们使用的工作模式是 push(推)Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式需要消费者主动调用 Broker 的 API 获取消息当前项目不涉及这种模式。在MQ中有两种应答方式一种是自动应答这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答上述确认消息basicAck起到的效果是可以让消费者 显式 的告诉 Broker这个消息我处理完毕了提高整个系统的可靠性。 4、客户端设计概要
生产者、消费者都是客户端程序broker 则是作为服务器通过网络进行通信。此处设定使用 TCP 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作这里需要给客户端提供一组 API让客户端的业务代码来调用从而通过网络通信的方式远程调用 brokerserver 上的方法。
客户端核心API
创建 Connection关闭 Connection创建 Channel关闭 Channel创建队列 (queueDeclare)销毁队列 (queueDelete)创建交换机 (exchangeDeclare)销毁交换机 (exchangeDelete)创建绑定 (queueBind)解除绑定 (queueUnbind)发布消息 (basicPublish)订阅消息 (basicConsume)确认消息 (basicAck)
补充说明 和 Broker 服务器 API 相比客户端程序还提供了如下 4 个 API创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。这里的一个Connection对象代表一个TCP连接。Channel 是 Connection 内部逻辑上的链接多个Channel复用同一个TCP连接一个Connection 中可以包含多个Channel每个Channel负责客户端中不同的模块其中传输的数据是互不相干的。这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。上述客户端提供的 API 只是给业务代码进行调用真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用Remote Procedure Call简称RPC是一种计算机通信协议它允许程序调用另一个地址空间通常是不同的计算机中的过程或函数而无需程序员显式编写网络代码。通过使用RPC应用程序可以像调用本地进程一样调用远程服务器上的进程。 5、小结
最后简单总结一下我们大致需要做的工作其中涉及到的细节问题我们后面在进行补充 实现生产者、BrokerServer、消费者三个部分针对生产者、消费者来说主要编写客户端和服务器的网络通信部分。重点实现BrokerServer包括内部的基本概念和核心 API。数据的持久化存储。 三、具体实现
附上连接
1、消息队列详细设计与实现思维导图 2、Gitee 完整代码地址