一个网站的制作过程,开创云网站建设支持,哪里有网站开发服务,WordPress破解分享目录 一、需求分析
1.1、对 Message Queue 的认识
1.2、消息队列核心概念
1.3、Broker Server 内部关键概念
1.4、Broker Server 核心 API #xff08;重点实现#xff09;
1.5、交换机类型
Direct 直接交换机
Fanout 扇出交换机
Topic 主题交换机
1.6、持久化
1.7…目录 一、需求分析
1.1、对 Message Queue 的认识
1.2、消息队列核心概念
1.3、Broker Server 内部关键概念
1.4、Broker Server 核心 API 重点实现
1.5、交换机类型
Direct 直接交换机
Fanout 扇出交换机
Topic 主题交换机
1.6、持久化
1.7、网络通信
通信流程
远程调用设计思想
1.8、模块设计图
二、实现核心类
2.1、交换机和队列的属性及绑定关系
2.2、Message 消息 一、需求分析 1.1、对 Message Queue 的认识
消息队列就是把 阻塞队列 这样的数据结构单独提取成了一个独立的程序进行部署实现 “进程和进程之间 / 服务和服务之间” 的生产者消费者模型分布式系统中则是一组服务器构成的集群.
生产者消费者模型的好处如下
解耦合在一个分布式系统中 服务器A 给 服务器B 发送请求B 给 A 返回响应这样 A 和 B 的耦合是比较大的B 一旦挂了A 这边也无法正常接收响应引入消息队列后A 把请求发送给消息队列B 再从消息队列中获取请求就降低了耦合度哪怕 B 挂了A 也不用管继续发送请求给消息队列队列反馈响应.削峰填谷假设这样一个常见 A 是入口服务器 A 再调用 B 完成一些具体的业务如果是 A 和 B 直接通信突然有一天 A 收到用户请求的峰值此时 B 也会随之感受到峰值引入消息队列之后 A 把请求发给队列这时候 B 就可以按照自己原有的节奏从队列中取请求不至于一下子收到太多的并发量. 1.2、消息队列核心概念 生产者Producer发布消息的客户端应用程序.
消费者Consumer订阅消息的客户端应用程序用于处理生产者产生的消息.
中间人Broker消费者要拿到生产者的消息需要经过中间人用来削峰填谷.
发布Publish生产者向中间人这里投递消息的过程.
订阅Subscribe类似于订阅报纸消费者要从中间人这里取到对应消息的前提是先订阅消息.
消费Consume消费者从中间人这里取数据的操作. 1.3、Broker Server 内部关键概念 1.虚拟主机Virtual Host类似于 MySQL 中的 database是一个 “逻辑” 上的数据集合.
实际的开发中一个 Broker Server 也可以有多种不同类型的数据可能会同时用来管理多组 业务线 上的数据可以使用 Virtual Host 进行逻辑上的区分~ 2.交换机Exchange实际上生产者是先把消息给了 Broker Server 上的某一个交换机再由交换机把消息转发给对应的队列.
交换机就类似于公司的前台小姐姐有一天你来面试你就告诉前台小姐姐然后她就会把你带到对应的楼层队列. 3.队列Queue在一个大的消息队列中可以又很多哥具体的小队列他们用来存储消息实体后续消费者也是从对应的队列中取数据. 4.绑定Binding把交换机和队列之间建立起关联关系.
可以把 交换机 和 队列 看作 数据库 中的 “多对多” 这样的关系. 5.消息Message服务器 A 给 B 发的请求通过 MQ 转发就是一个消息服务器 B 给 A 返回的响应通过 MQ 转发也是一个消息.
一个消息可以视为是一个 字符串二进制数据。消息中具体包含啥样的数据都是程序员自定义的. Ps这些概念既需要在内存中存储方便快也需要在硬盘中存储持久化. 1.4、Broker Server 核心 API 重点实现
1. 创建队列queueDeclare
这里不使用 Create 创建 这样的术语而使用 Declare 是因为这里以 RabbitMQ 为蓝本 Declare 表示 不存在则创建存在就什么也不干. 2.销毁队列queueDelete
3.创建交换机exchangeDeclare
4.销毁交换机exchangeDelete
5.创建绑定queueBind
6.解除绑定queueUnbind
7.发布消息basicPublish
8.订阅消息basicConsume
9.确定消息basicAck
类似于 TCP 的确认应答机制确认消息这个 api 就是让消费者显式告诉 broker server 这个消息我已经处理完毕了提高整个系统的可靠性. 客户端除了提供以上 9 中方法还需要提供 4 个方法
1.创建 Connection
2.关闭 Connection
3.创建 Channel
4.关闭 Channel 一个 Connection 对象就代表一个 TCP 连接里面可以包含多个 Channel 每隔 Channel 上面传输的数据都是互不相干的. 有了 Connection 了为什么还要搞一个 Channel TCP 中建立 / 断开一个连接成本还是挺高的因此很多时候不希望频繁的建立断开 TCP 连接因此引入 Channel 相比于 TCP就要轻量很多. Connection 和 Channel 之间的关系就类似于 “网线” 一样. 1.5、交换机类型
交换机在转发消息的时候按照转发规则提供了几种不同的 交换机类型ExchangeType来描述这里不同的转发规则.
RabbitMQ 主要实现了 四种 交换机类型也是 AMQP 协议定义的
Direct 直接交换机Fanout 扇出交换机Topic 主题交换机Header 消息头交换机 Ps这里主要实现前三种交换机类型因为第四种交换机规则复杂应用场景少前三种就已经够用了. Direct 直接交换机 有两个关键概念
bindingKey把队列和交换机绑定的时候指定一个单词类似于暗号.
routingKey生产者发送消息的时候也指定一个单词.
当 routingKey 和 BindingKey 对上暗号了就可以把这个消息转发到对应的队列中了.
生产者发送消息的时候会指定一个 “目标队列的名字”routingKey交换机收到之后就看看绑定的队列里有没有匹配的队列BindingKey如果有就转发过去把消息塞进对应的队列中如果没有消息直接丢弃. Fanout 扇出交换机 会将接收到的消息广播到每一个跟其绑定的queue最后交给对应的消费者值得注意的是exchange负责消息路由而不是存储路由失败则消息丢失。 Topic 主题交换机 有两个关键概念
bindingKey把队列和交换机绑定的时候指定一个单词类似于暗号.
routingKey生产者发送消息的时候也指定一个单词.
当 routingKey 和 BindingKey 对上暗号了就可以把这个消息转发到对应的队列中了. PsTopicExchange 与 DirectExchange 十分类似区别在于routingKey必须是多个单词的列表并且以 . 分割。 1.6、持久化
上述这些概念交换机、队列、绑定、消息....对应的数据都需要存储和管理起来此时内存和硬盘都会各自存储一份内存为主硬盘为辅.
在内存中存储的原因对于 MQ 来说能够高效的转发处理数据式非常关键的因此在内存上组织数据比硬盘上要快的多.
在硬盘上存储的原因防止内存中的数据随着 进程重启/主机重启 而丢失. Ps硬盘存储这个持久化是相对于 内存 的对于一个硬盘来说存储的消息寿命一般为 几年~几十年一直不通电的情况下. 1.7、网络通信
通信流程
⽣产者和消费者都是客⼾端程序, broker server 则是作为服务器. 通过⽹络进⾏通信.
例如如下过程
客户端发送请求生产者的代码中需要有一个方法 queueDeclare 来创建队列这个方法内部要做的事情就是给服务器发送一个请求告诉服务器咱们要创建一个队列以及队列长啥样子......服务器处理请求并返回响应broker server 收到这个请求之后再执行服务器这边的 queueDeclare 方法真正取给 内存/硬盘 上写一些数据把这个队列给真正创建出来再把创建 成功/失败 结果包装成响应返回给客户端.客户端接收响应当响应回来了客户端的 queueDeclare 就会获取到这个响应看到 创建队列成功此时 queueDeclare 就算执行完毕了. 远程调用设计思想
此处客户端调用了一个本地方法结果这个方法的背后又给服务器发送了一系列消息由服务器完成了一系列工作站在调用者的角度只是看到了这个功能完成了并不知道背后的实现细节.
虽然调用的是一个本地方法但实际上好像调用另一个远端服务器的方法一样~
这里可以认为是编写客户端服务器程序通信过程中的一种设计思想——远程调用RPC 举个例子 以前有个美国的老哥再大厂干活但是他特别想摸鱼于是他就找了一个中国人帮他干有偿. 同样级别的程序员在美国工作的薪水是国内的 3-4 倍. 1.8、模块设计图 二、实现核心类 PsSpring boot、MyBatis 2.1、交换机和队列的属性及绑定关系
我们可以使用 一个枚举类 表示三种交换机.
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type) {this.type type;}public int getType() {return type;}}交换机属性如下
public class Exchange {//模仿 rabbitmq 使用 name 作为唯一身份标识private String name;//交换价类型DIRECT,FANOUT,TOPICprivate ExchangeType type ExchangeType.DIRECT;//当前交换机是否要持久化存储true 标识持久化.private boolean durable false;//TODO: 若当前交换机没人使用了就会自动删除private boolean autoDelete false;//TODO: 表示创建交换机时可以指定一些额外的选项private MapString, Object arguments new HashMap();public String getName() {return name;}public void setName(String name) {this.name name;}public ExchangeType getType() {return type;}public void setType(ExchangeType type) {this.type type;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete autoDelete;}public MapString, Object getArguments() {return arguments;}public void setArguments(MapString, Object arguments) {this.arguments arguments;}
}队列属性如下
public class MSGQueue {//表示队列的身份标识private String name;//是否持久化true 表示支持private boolean durable;//TODO: 这个属性为 true表示队列只能被一个消费者使用, false表示大家都能用private boolean exclusive false;//TODO: 自动删除为 true 表示没有人使用以后自动删除private boolean autoDelete false;//TODO: 扩展参数自定义选项private MapString, Object arguments new HashMap();public String getName() {return name;}public void setName(String name) {this.name name;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable durable;}public boolean isExclusive() {return exclusive;}public void setExclusive(boolean exclusive) {this.exclusive exclusive;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete autoDelete;}public MapString, Object getArguments() {return arguments;}public void setArguments(MapString, Object arguments) {this.arguments arguments;}
}交换机和队列绑定关系如下
public class Binding {//交换机身份标识private String exchangeName;//队列身份标识private String queueName;private String bindingKey;//绑定没必要设置持久化因为没有意义他存在的意义前提是 交换机和队列存在持久化public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName exchangeName;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName queueName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey bindingKey;}
}2.2、Message 消息
Message 消息主要分为 属性部分 BasicProperties网络这是一个自定义类里面承载着 Message 核心属性如 消息的唯一身份标识、routingKey、是否需要持久化....... 正文部分 byte[]承载着具体的消息内容.辅助属性通过 offsetBeg 和 offsetEnd 快速找到文件中某一个消息的具体位置一个文件中会存储很多消息这里约定的规则为 “前闭后开”以字节为单位isValid 标识消息要删除如果删除采用逻辑删除并不是真的删除而是标记为无效数据这里约定 0x1 为有效数据、0x0 表示无效数据. Ps前两个信息需要在网络上传输并且写入文件因此就需要通过 Serializable 序列化和反序列化直接继承接口即可不需要具体实现~~ 而第三条不需要被序列化保存到文件中这个属性主要是为了内存中的 Message 对象快速找到 文件中的 Message 对象因此使用 trasient 修饰即可防止序列化 public class Message implements Serializable {//这两个属性是 Message 最核心的属性private BasicProperties basicProperties new BasicProperties();private byte[] body;//以下是辅助类型的属性private transient long offsetBeg 0; //消息数据的开头距离文件开头的位置偏移量(字节)private transient long offsetEnd 0; //消息数据的结尾距离文件开头的位置偏移量(字节)//表示文件中的消息是否是有效消息(对文件中的消息如果删除使用逻辑删除)// 0x1 表示有效 0x0 表示无效private byte isValid 0x1;//创建一个工厂方法让工厂方法创建 Message 对象//此方法中创建的 Message 对象会自动生成唯一的 MessageId//万一 routingKey 和 basicProperties 里的 routingKey 冲突以外面为主public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {Message message new Message();if(basicProperties ! null) {message.setBasicProperties(basicProperties);}//生成的 MessageId 以 M- 作为前缀message.setMessageId(M- UUID.randomUUID());message.setRoutingKey(routingKey);message.body body;// offsetBeg, offsetEnd, isValid 是消息持久化的时候才会用到消息写入文件之前在进行设定return message;}public String getMessageId() {return basicProperties.getMessageId();}public void setMessageId(String messageId) {basicProperties.setMessageId(messageId);}public String getRoutingKey() {return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey) {basicProperties.setRoutingKey(routingKey);}public int getDeliverMode() {return basicProperties.getDeliverMode();}public void setDeliverMode(int mode) {basicProperties.setDeliverMode(mode);}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body body;}public long getOffsetBeg() {return offsetBeg;}public void setOffsetBeg(long offsetBeg) {this.offsetBeg offsetBeg;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd offsetEnd;}public byte getIsValid() {return isValid;}public void setIsValid(byte isValid) {this.isValid isValid;}
}Message 中的核心属性如下
public class BasicProperties implements Serializable {//消息的唯一标识(使用 UUID 保证唯一性)private String messageId;//和 bindingKey 做匹配//如果当前交换机类型是 DIRECT 此时 routingKey 就表示要转发的队列名//如果当前交换机类型是 FANOUT 此时 routingKey 无意义(不使用)//如果当前交换机类型是 TOPIC 此时 routingKey 就是要和 bindingKey 做匹配匹配才进行转发private String routingKey;//标识消息是否要持久化1 表示不持久化 2 表示持久化(rabbitmq 是这么搞得)private int deliverMode 1;//RabbitMQ 还有其他属性这里就不考虑了public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId messageId;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey routingKey;}public int getDeliverMode() {return deliverMode;}public void setDeliverMode(int deliverMode) {this.deliverMode deliverMode;}
}