长沙网站优化电话,阆中做网站,什么网站做装修公司广告比较好,乐清网站推广公司使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ) 主要角色
首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色
ProducerBrokerConsumer
整体架构如下所示 自定义协议
首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者…使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ) 主要角色
首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色
ProducerBrokerConsumer
整体架构如下所示 自定义协议
首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , 消息的 生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息 ,所以在这里我们自定义一个协议如下
消息处理中心 : 如果接收到的信息包含SEND字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费
消息处理中心 : 如果接受到的信息为CONSUME既视为消费者发送消费请求需要将存储的消息队列头部的信息转发给消费者然后将此消息从队列中移除
消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求
消息生产者需要遵循协议将生产的消息头部增加SEND: 表示生产消息
消息消费者需要遵循协议向消息处理中心发送CONSUME字符串表示消费消息
流程顺序
项目构建流程
下面将整个的构建流程过一遍
新建一个 Broker 类内部维护一个 ArrayBlockingQueue 队列提供生产消息和消费消息的方法 仅仅具备存储服务功能
新建一个 BrokerServer 类,将 Broker 发布为服务到本地9999端口监听本地9999端口的 Socket 链接在接受的信息中进行我们的协议校验, 这里 仅仅具备接受消息,校验协议,转发消息功能;
新建一个 MqClient 类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法
测试新建两个 MyClient 类对象分别执行其生产方法和消费方法
具体使用流程
生产消息客户端执行生产消息方法传入需要生产的信息该信息需要遵循我们自定义的协议消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法如果合法如果合法就会将该消息存储到Broker内部维护的 ArrayBlockingQueue 队列中如果 ArrayBlockingQueue 队列没有达到我们协议中的最大长度将将消息添加到队列中否则输出生产消息失败
消息消息客户端执行消费消息方法 Broker服务 会校验请求的信息的信息是否等于 CONSUME 如果验证成功则从Broker内部维护的 ArrayBlockingQueue 队列的 Poll 出一个消息返回给客户端
代码演示
消息处理中心 Broker
/*** * 消息处理中心* */
public class Broker {// 队列存储消息的最大数量private final static int MAX_SIZE 3;// 保存消息数据的容器private static ArrayBlockingQueue messageQueue new ArrayBlockingQueue(MAX_SIZE);// 生产消息public static void produce(String msg) {if (messageQueue.offer(msg)) {System.out.println(成功向消息处理中心投递消息 msg 当前暂存的消息数量是 messageQueue.size());} else {System.out.println(消息处理中心内暂存的消息达到最大负荷不能继续放入消息);}System.out.println();}// 消费消息public static String consume() { String msg messageQueue.poll();if(msg !null) {// 消费条件满足情况从消息容器中取出一条消息System.out.println(已经消费消息 msg 当前暂存的消息数量是 messageQueue.size()); }else{ System.out.println(消息处理中心内没有消息可供消费); } System.out.println();returnmsg; }
}}
消息处理中心服务 BrokerServer
客户端 MqClient /*** * 用于启动消息处理中心* */
public class BrokerServer implements Runnable {public static int SERVICE_PORT 9999;private final Socket socket;public BrokerServer(Socket socket) {this.socket socket;}Overridepublic void run() {try (BufferedReader in new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out new PrintWriter(socket.getOutputStream())) {while (true) {String str in.readLine();if (str null) {continue;}System.out.println(接收到原始数据 str);if (str.equals(CONSUME)) {// CONSUME 表示要消费一条消息//从消息队列中消费一条消息String message Broker.consume();out.println(message);out.flush();} else if (str.contains(SEND:)) {// 接受到的请求包含SEND:字符串 表示生产消息放到消息队列中Broker.produce(str);} else {System.out.println(原始数据: str 没有遵循协议,不提供相关服务);}}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {ServerSocket server new ServerSocket(SERVICE_PORT);while (true) {BrokerServer brokerServer new BrokerServer(server.accept());new Thread(brokerServer).start();}}
}
测试MQ
public class ProduceClient {public static void main(String[] args) throws Exception {MqClient client newMqClient();client.produce(SEND:Hello World);}
}public class ConsumeClient {public static void main(String[] args) throws Exception {MqClient client newMqClient();String message client.consume();System.out.println(获取的消息为 message);}
}我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志
接收到原始数据SEND:Hello World成功向消息处理中心投递消息SEND:Hello World当前暂存的消息数量是
1接收到原始数据SEND:Hello World成功向消息处理中心投递消息SEND:Hello World当前暂存的消息数量是
2接收到原始数据SEND:Hello World成功向消息处理中心投递消息SEND:Hello World当前暂存的消息数量是
3接收到原始数据SEND:Hello World消息处理中心内暂存的消息达到最大负荷不能继续放入消息
接收到原始数据Hello World原始数据:Hello World没有遵循协议,不提供相关服务接收到原始数据CONSUME已经消费消息SEND:Hello World当前暂存的消息数量是
2接收到原始数据CONSUME已经消费消息SEND:Hello World当前暂存的消息数量是
1接收到原始数据CONSUME已经消费消息SEND:Hello World当前暂存的消息数量是
0接收到原始数据CONSUME消息处理中心内没有消息可供消费
小结
本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 “生产者,消费者,消费处理中心,协议” 有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ