网站建设工具品牌有,云南高端网站建设公司,17网站一起做网店广,wordpress外链图本地化消息队列就是把阻塞队列这样的数据结构单独提取成一个程序独立进行部署。——实现生产者消费者模型。 但是阻塞队列是在一个进程内部进行的#xff1b; 消息队列是在进程与进程之间进行实现的#xff0c; 解耦合#xff1a;就是在分布式系统中#xff0c;A服务器调用B… 消息队列就是把阻塞队列这样的数据结构单独提取成一个程序独立进行部署。——实现生产者消费者模型。 但是阻塞队列是在一个进程内部进行的 消息队列是在进程与进程之间进行实现的 解耦合就是在分布式系统中A服务器调用B服务器A给B发送请求B给A返回响应A和B之间耦合性很大。引入消息队列后A把请求发给消息队列B再从消息队列获取请求。 削峰填谷B接收A传送过多B会感受到峰值而B的硬盘之类资源有限有消息队列B仍然可以按照原本速度进行接收数据。不至于太多的并发量。 broker中间人负责转发和存储mq和消费者的工作模式主要有两种 推pushBroker把收到的消息主动发出。 核⼼概念 • ⽣产者 (Producer) • 消费者 (Consumer) • 中间⼈ (Broker) • 发布 (Publish) • 订阅 (Subscribe) package com.example.mq.mqserver.core;import javax.websocket.RemoteEndpoint;
import java.io.Serializable;
import java.util.UUID;/**Message包含正文和属性两个核心部分*其中basicproperties基本特性包含routingkey,与Binding类中的bindingkey相对应*in the end,创建工厂方法协助我们去创建Message对象* *//**对于message需要支持序列化将对象转变为字节流方便网络传输或者存储。* 可能会进行网络传输或者持久化存储** json中本质存储文本了理性的数据* 而这里的Message存储二进制数据* 标准库自带的方式进行序列化可以*需要被序列化的类需要实现特定的接口之前我们实现一个接口就是为了重写里面的某个或者某些方法。* 但是offsetEnd,offsetBeg是不需要序列化到保存在文件中的因为消息一旦写入文件中位置就已经固定了* so,transient暂住的临时的保证不背序列化* **//**硬盘存储数据库 或者内存* mysql是客户端服务器结构的程序SQLite轻量级是一个本地数据库这个数据库相当于直接操作本地的硬盘文件*在java中想要使用SQLite就直接使用maven,将SQLite的依赖引入·如果想额外安装SQLite也可以下载就是一个dll/exe** 轻量级SQLLite* */
public class Message implements Serializable {private BasicPropertities basicPropertitiesnew BasicPropertities();private byte[] body;private transient long offsetBeg0;//begin消息的开头距文件的偏移【transient暂住的临时的保证不背序列化private transient long offsetEnd0;//消息结尾距离文件开头的文件偏移【/**消息内容的存储需要持久化硬盘对这个的处理就是存储在文件中。* 后续就是一个文件中存储很多消息而如何在一个文件中找到我们想要的消息* 就是使用偏移量offsetBeg和offsetEnd;* */private byte isValid0x1;/**是否是有效消息** 删除数据通常是逻辑删除只是把它标记成无效而不是真正进行物理上的删除*0x1有效0x0无效* 在硬盘操作使用byte更好一点* */public String getMessageId(){return basicPropertities.getMessageId();}public void setMessageId(String MessageId){basicPropertities.setMessageId(MessageId);}public String getRoutingKey(){return basicPropertities.getRoutingkey();}public void setRoutingKey(String routingKey){basicPropertities.setRoutingkey(routingKey);}public int getDeliverMode(){return basicPropertities.getDeliverMode();}public void setDeliverMode(int mode){basicPropertities.setDeliverMode(mode);}public BasicPropertities getBasicPropertities() {return basicPropertities;}public void setBasicPropertities(BasicPropertities basicPropertities) {this.basicPropertities basicPropertities;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body body;}public long getOffsetBeg() {return offsetBeg;}public void setOffsetBeg(long offsetBeg) {this.offsetBeg offsetBeg;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd offsetEnd;}public byte getIsValid() {return isValid;}public void setIsValid(byte isValid) {this.isValid isValid;}/*** 创建工厂方法协助我们去创建Message对象*创建的message会帮助我们创建一个唯一的message id;* 这也是使用工厂方法而不是构造方法的原因之一。** 函数里面放message的核心内容basicproperties和body* 这里是在内存中设置这些属性而offsetEnd,offsetBeg是在持久化才。** */public static Message createMessageWithId(String routingKey, BasicPropertities basicPropertities,byte[] body){Message messagenew Message();if(basicPropertities!null){message.setBasicPropertities(basicPropertities);}message.setMessageId(M-UUID.randomUUID());//前缀message.basicPropertities.setRoutingkey(routingKey);message.bodybody;return message;}
}package com.example.mq.mqserver.core;import java.util.HashMap;
import java.util.Map;public class MSGQueue {private String name;//唯一的身份标识private boolean durablefalse;private boolean exclusivefalse;//如果为true表示这个队列只能被一个消费者使用//如果是false,表示这个队列可以被大家都使用private boolean autodeletefalse;private MapString,Object argumentsnew HashMap();//其它的选项可以有。public String getName() {return name;}public void setName(String name) {this.name name;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable durable;}public boolean isExclusive() {return exclusive;}public void setExclusive(boolean exclusive) {this.exclusive exclusive;}public boolean isAutodelete() {return autodelete;}public void setAutodelete(boolean autodelete) {this.autodelete autodelete;}public MapString, Object getArguments() {return arguments;}public void setArguments(MapString, Object arguments) {this.arguments arguments;}
}package com.example.mq.mqserver.core;public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.typetype;}public int getType(){return type;}}package com.example.mq.mqserver.core;/*** 描述队列与交换机之间的关联关系* Binding依附于交换机和队列* 所以如果交换机和队列都没有持久化对Binding进行持久化是没有任何意义的。* */
public class Binding {private String exchangName;private String queueName;private String bindingKey;//routingkey口令红包public String getExchangName() {return exchangName;}public void setExchangName(String exchangName) {this.exchangName exchangName;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName queueName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey bindingKey;}
}package com.example.mq.mqserver.core;import java.util.HashMap;
import java.util.Map;public class Exchange {private String name;//唯一标识//交换机的三种类型directfanout,topicprivate ExchangeType typeExchangeType.DIRECT;private boolean durablefalse;//是否需要持久化shifalse不用/*** 内存存储快速* 硬盘存储持久* 对于交换机队列绑定有些需要持久化有些不需要* so,持久化的开关** */private boolean autoDeletefalse;//没人使用就自动删除private MapString,Object argumentsnew HashMap();//arguments表示创建交换机的时候指定的一些其它参数选项//可以有也可以没有用于开启不同的功能public String getName() {return name;}public void setName(String name) {this.name name;}public ExchangeType getType() {return type;}public void setType(ExchangeType type) {this.type type;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete autoDelete;}public MapString, Object getArguments() {return arguments;}public void setArguments(MapString, Object arguments) {this.arguments arguments;}
}