做甜品网站,课件app制作教程,中国建设银行报网站,最新wordpress免费主题目录 一、客户端设计
#x1f345; 1、设计三个核心类 #x1f345; 2、完善Connection类 #x1f384; 读取请求和响应、创建channel #x1f384; 添加扫描线程 #x1f384; 处理不同的响应 #x1f384; 关闭连接
#x1f345; 3、完善Channel类
#x1f384; 编…目录 一、客户端设计 1、设计三个核心类 2、完善Connection类 读取请求和响应、创建channel 添加扫描线程 处理不同的响应 关闭连接 3、完善Channel类 编写createChannel() 编写waitResult(和putRetuens()方法 编写其他核心API 交换机 队列 绑定 发布消息 订阅消息 确认消息
二、客户端测试 1、准备工作和收尾工作 2、测试connection 3、测试channnel的创建 4、测试交换机 5、测试队列 6、测试绑定 一、客户端设计 1、设计三个核心类 三个核心类 1ConnectoonFactory连接工厂这个类持有服务器的地址主要功能是创建出连接Connectiond对象 2Connection表示一个TCP连接持有Socket对象写入请求/读取响应管理多个Channel对象 3Channel表示一个逻辑上的连接。当前设定的交互模型一个TCP连接是可以进行复用的一个客户端可以有多个模块每个模块都可以和brokerServer之间建立“逻辑上的连接”channel,但是这几个模块的channel之间是互相不影响的。同时还需要提供一系列的方法与服务器提供的核心API进行对应。 先创建这三个核心的类
在包mqclient中创建这三个类。
Data
public class ConnectionFactory {
// BrokerServer的ip地址private String host;
// BrokerServer端口号private int port;public Connection newConnection(){Connection connection new Connection(host,port);return connection;}
}
Data
public class Connection {private Socket socket null;// 需要管理多个channel使用哈希表把若干个channel组织起来private ConcurrentHashMapString ,Channel channelMap new ConcurrentHashMap();private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;private ExecutorService callbackPool null;public Connection(String host,int port) throws IOException {socket new Socket(host,port);inputStream socket.getInputStream();outputStream socket.getOutputStream();dataInputStream new DataInputStream(inputStream);dataOutputStream new DataOutputStream(outputStream);}// 使用该方法分别处理当前响应是一个针对控制请求的响应还是服务器推送的响应private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {//TODO}// 发送请求public void writeRequest(Request request) throws IOException {
// TODO}// 读取响应public Response readResponse() throws IOException {
// TODO}// 通过这个方法,再connection中创建出一个channelpublic Channel createChannel(){// TODOreturn channel;}
// 关闭connectionpublic void close() {//TODO}
}
Data
public class Channel {private String channelId;
// 当前channel属于哪个连接private Connection connection;
// 用来存储后续客户端收到的服务器的响应private ConcurrentHashMapString, BasicReturns basicReturnsMap new ConcurrentHashMap();
// 如果当前的Channel订阅了某个队列此处就需要记录对应的回调是什么
// 当该队列的消息返回回来的时候就调用回调
// 约定一个channel值能有一个回调private Consumer consumer null;public Channel(String channelId, Connection connection) {this.channelId channelId;this.connection connection;}// 这个方法主要和服务器进行交互
// 目的是为了告知服务器此处客户端创建了新的channelpublic boolean createChannel() {
// TODOreturn true;}// 使用该方法阻塞等待服务器的响应private BasicReturns waitResult(String rid) {return null;}// 关闭channel,给服务器发送一个type 0x2的请求public boolean close() throws IOException {//TODOreturn null;}//创建核心的API方法
} 2、完善Connection类
这里完成发送请求、读取响应、创建channel、处理响应、关闭连接 读取请求和响应、创建channel
// 发送请求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println([Connection] 发送请求! type request.getType() , length request.getLength());}// 读取响应public Response readResponse() throws IOException {Response response new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload new byte[response.getLength()];int n dataInputStream.read(payload);if (n ! response.getLength()) {throw new IOException(读取的响应数据不完整!);}response.setPayload(payload);System.out.println([Connection] 收到响应! type response.getType() , length response.getLength());return response;}// 通过这个方法,再connection中创建出一个channel
// 此处的createChannel()方法在后面channel类中编写以后会抛异常这里大家写完channel之后回过来手动抛一下public Channel createChannel() { //throws IOExceptionString channelId C- UUID.randomUUID();Channel channel new Channel(channelId,this);
// 把channel对象放到Connection管理的channel的哈希表中channelMap.put(channelId,channel);
// 同时也需要把“创建channel”的这个消息也告诉服务器boolean ok channel.createChannel();if (!ok){
// 创建channel失败
// 删除hash表中的键值对channelMap.remove(channelId);return null;}return channel;} 添加扫描线程
在构造方法中添加一个扫描线程使用该线程不停的从socket中读取响应再将这个响应交给对应的channnel。 public Connection(String host,int port) throws IOException {socket new Socket(host,port);inputStream socket.getInputStream();outputStream socket.getOutputStream();dataInputStream new DataInputStream(inputStream);dataOutputStream new DataOutputStream(outputStream);callbackPool Executors.newFixedThreadPool(4);// 创建一个扫描线程
// 该线程负责不停的从socket中读取响应数据然后把这个响应数据再交给对应的channel负责处理Thread t new Thread(() - {try {while (!socket.isClosed()) {Response response readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println([Connection] 连接正常断开!);} catch (IOException | ClassNotFoundException | MqException e) {System.out.println([Connection] 连接异常断开!);e.printStackTrace();}});t.start();} 处理不同的响应 使用该方法分别处理两种不同的响应当前响应是一个针对控制请求的响应还是服务器推送的响应。
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() 0xc) {
// 服务器推送来的消息数据SubScribeReturns subScribeReturns (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());// 根据channelId找到对用的channel对象Channel channel channelMap.get(subScribeReturns.getChannelId());if (channel null) {throw new MqException([Connection] 该消息对应的 channel 在客户端中不存在! channelId channel.getChannelId());}callbackPool.submit(() - {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {
// 当前响应是针对控制请求的响应BasicReturns basicReturns (BasicReturns) BinaryTool.fromBytes(response.getPayload());
// 把这个结果放到对应的channel的hash表中Channel channel channelMap.get(basicReturns.getChannelId());if (channel null) {throw new MqException([Connection] 该消息对应的 channel 在客户端中不存在! channelId channel.getChannelId());}channel.putReturns(basicReturns);}} 关闭连接
public void close() {// 关闭connection释放持有的资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}} 3、完善Channel类 编写createChannel()
// 这个方法主要和服务器进行交互 // 目的是为了告知服务器此处客户端创建了新的channel
// 这个方法主要和服务器进行交互
// 目的是为了告知服务器此处客户端创建了新的channelpublic boolean createChannel() throws IOException {
// 对于创建channel来说payload就是一个basicArgumentsBasicArguments basicArguments new BasicArguments();basicArguments.setChannelId(channelId);
// rid表示这次请求的idbasicArguments.setRid(generateRid());byte[] payload BinaryTool.toBytes(basicArguments);Request request new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 发送构造出的请求connection.writeRequest(request);
// 等待服务器的响应BasicReturns basicReturns waitResult(basicArguments.getRid());return basicReturns.isOk();}private String generateRid(){return R- UUID.randomUUID().toString();} 编写waitResult(和putRetuens()方法
putRetuents()是为了将返回的响应放到对用的哈希表中 public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this){
// 当前不知道有多少个线程在等待上面的响应
// 把多有的等待的线程都唤醒notifyAll();}}
waitResult()方法的作用是为了阻塞等待服务器的响应。以下举例说明 如下假如有3个channel按照123的顺序发送了请求所以应该是请求1先等待响应1然后再是2和3。 但是现在有一个情况服务器这边是多线程并发处理请求服务器处理每个请求的时间不一样返回响应的顺序也就不一样。 如下图channel1等待的是响应1但是先返回的响应却是2和3。响应1还没来请求1就一直等。请求1没等到后面的2和3也就拿不到响应 所以为了解决这个问题就创建了basicReturnsMap将socket中收到的所有响应数据放到这个在前面创建的basicReturnsMap哈希表中。客户端的的请求就可以不断的从这个哈希表中寻找是否存在和自己匹配的响应。 如果存在就把相应取走不存在就继续等待。 这个waitResult()方法就是当请求对应的响应不再哈希表中时就阻塞等待。 // 使用该方法阻塞等待服务器的响应private BasicReturns waitResult(String rid) {BasicReturns basicReturns null;while ((basicReturns basicReturnsMap.get(rid)) null){
// 如果查询结果为null说明响应没来
// 此时就需要阻塞等待
// 此处加锁是为了保证wait/notify的是同一个对象synchronized (this){try {wait();}catch (InterruptedException e){e.printStackTrace();}}}
// 读取成功之后,把这个响应从哈希表中删除掉basicReturnsMap.remove(rid);return basicReturns;}编写其他核心API 交换机
// 创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);byte[] payload BinaryTool.toBytes(exchangeDeclareArguments);Request request new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}// 删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments arguments new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();} 队列 public boolean queueDeclare(String queueName,boolean durable) throws IOException {QueueDeclareArguments queueDeclareArguments new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);byte[] payload BinaryTool.toBytes(queueDeclareArguments);Request request new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments queueDeleteArguments new QueueDeleteArguments();queueDeleteArguments.setRid(generateRid());queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setQueueName(queueName);byte[] payload BinaryTool.toBytes(queueDeleteArguments);Request request new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueDeleteArguments.getRid());return basicReturns.isOk();} 绑定
// 创建绑定public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException {QueueBindArguments queueBindArguments new QueueBindArguments();queueBindArguments.setRid(generateRid());queueBindArguments.setChannelId(channelId);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setQueueName(queueName);queueBindArguments.setBindingKey(bindingKey);byte[] payload BinaryTool.toBytes(queueBindArguments);Request request new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueBindArguments.getRid());return basicReturns.isOk();}// 删除绑定public boolean queueUnbind(String queueName,String exchangeName) throws IOException {QueueUnbindArguments queueUnbindArguments new QueueUnbindArguments();queueUnbindArguments.setRid(generateRid());queueUnbindArguments.setChannelId(channelId);queueUnbindArguments.setQueueName(queueName);queueUnbindArguments.setExchangeName(exchangeName);byte[] payload BinaryTool.toBytes(queueUnbindArguments);Request request new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueUnbindArguments.getRid());return basicReturns.isOk();} 发布消息
// 发布消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBasicProperties(basicProperties);arguments.setBody(body);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();} 订阅消息
// 订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
// 先设置回调.if (this.consumer ! null) {throw new MqException(该 channel 已经设置过消费消息的回调了, 不能重复设置!);}this.consumer consumer;BasicConsumeArguments arguments new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);
// 此处 consumerTag 使用 channelId 来表示arguments.setConsumerTag(channelId); arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();} 确认消息
// 确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload BinaryTool.toBytes(arguments);Request request new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(arguments.getRid());return basicReturns.isOk();} 二、客户端测试 1、准备工作和收尾工作 private BrokerServer brokerServer null;private ConnectionFactory factory null;private Thread t null;BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器TigerMqApplication.context SpringApplication.run(TigerMqApplication.class);brokerServer new BrokerServer(9090);t new Thread(() - {// 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();// 2. 配置 ConnectionFactoryfactory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);}AfterEachpublic void tearDown() throws IOException {// 停止服务器brokerServer.stop();// t.join();TigerMqApplication.context.close();// 删除必要的文件File file new File(./data);FileUtils.deleteDirectory(file);factory null;}
} 2、测试connection
Testpublic void testConnection() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);}
打印日志
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[BrokerServer] 服务器停止运行!3、测试channnel的创建 Testpublic void testChannel() throws IOException{Connection connection connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);} 4、测试交换机
Testpublic void testExchange() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.exchangeDeclare(testExchange, ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok channel.exchangeDelete(testExchange);Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求type 1,length 188
[Request] ridR-ff8a7be0-8138-4334-b496-647b472349fa, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type1, length188
[BrokerServer] 创建 channel 完成! channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] ridR-ff8a7be0-8138-4334-b496-647b472349fa, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type1, length192
[Connection]收到响应!type 1,length 192
[connection]发送请求type 3,length 412
[Request] ridR-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type3, length412
[MemoryDataCenter]新交换机添加成功exchangeName defaulttestExchange
[VirtualHost] 交换机创建完成exchangeName defaulttestExchange
[Response] ridR-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type3, length192
[Connection]收到响应!type 3,length 192
[connection]发送请求type 4,length 288
[Request] ridR-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type4, length288
[MemoryDataCenter]交换机删除成功 exchangeName defaulttestExchange
[VirtualHost] 交换机删除成功exchangeName defaulttestExchange
[Response] ridR-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type4, length192
[Connection]收到响应!type 4,length 192
[connection]发送请求type 2,length 188
[Request] ridR-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type2, length188
[BrokerServer] 销毁 channel 完成! channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] ridR-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelIdC-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type2, length192
[Connection]收到响应!type 2,length 192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:54675
[BrokerServer]清理session完成~ 被清理的channeId []
[BrokerServer] 服务器停止运行!
2023-08-13 17:10:28.619 INFO 38940 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2023-08-13 17:10:28.704 INFO 38940 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2023-08-13 17:10:28.727 INFO 38940 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.Process finished with exit code 05、测试队列
Testpublic void testQueue() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok channel.queueDelete(testQueue);Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求type 1,length 188
[Request] ridR-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type1, length188
[BrokerServer] 创建 channel 完成! channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] ridR-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type1, length192
[Connection]收到响应!type 1,length 192
[connection]发送请求type 5,length 349
[Request] ridR-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type5, length349
[MemoryDataCenter]队列删除成功queueName defaulttestQueue
[VirtualHost]队列创建成功queueName defaulttestQueue
[Response] ridR-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type5, length192
[Connection]收到响应!type 5,length 192
[connection]发送请求type 6,length 279
[Request] ridR-057a79a8-20db-408f-82db-eb9e9145a48b, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type6, length279
[MemoryDataCenter]删除队列成功queueName defaulttestQueue
[VirtualHost]删除队列成功queueName defaulttestQueue
[Response] ridR-057a79a8-20db-408f-82db-eb9e9145a48b, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type6, length192
[Connection]收到响应!type 6,length 192
[connection]发送请求type 2,length 188
[Request] ridR-73a48380-78e9-448f-9566-4cdc7da17bda, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type2, length188
[BrokerServer] 销毁 channel 完成! channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] ridR-73a48380-78e9-448f-9566-4cdc7da17bda, channelIdC-91c1e861-6a7b-49de-a102-27c5437e0a40, type2, length192
[Connection]收到响应!type 2,length 192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:54945
[BrokerServer]清理session完成~ 被清理的channeId []
[BrokerServer] 服务器停止运行!
2023-08-13 17:16:54.304 INFO 72124 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2023-08-13 17:16:54.333 INFO 72124 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2023-08-13 17:16:54.363 INFO 72124 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.Process finished with exit code 06、测试绑定 Testpublic void testBinding() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.exchangeDeclare(testExchange, ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok channel.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok channel.queueBind(testQueue, testExchange, testBindingKey);Assertions.assertTrue(ok);ok channel.queueUnbind(testQueue, testExchange);Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求type 1,length 188
[Request] ridR-071477e3-7115-42e7-9370-7995fa36daab, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type1, length188
[BrokerServer] 创建 channel 完成! channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] ridR-071477e3-7115-42e7-9370-7995fa36daab, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type1, length192
[Connection]收到响应!type 1,length 192
[connection]发送请求type 3,length 412
[Request] ridR-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type3, length412
[MemoryDataCenter]新交换机添加成功exchangeName defaulttestExchange
[VirtualHost] 交换机创建完成exchangeName defaulttestExchange
[Response] ridR-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type3, length192
[Connection]收到响应!type 3,length 192
[connection]发送请求type 5,length 349
[Request] ridR-478d13e5-b999-4ac4-96cf-e0c8df829152, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type5, length349
[MemoryDataCenter]队列删除成功queueName defaulttestQueue
[VirtualHost]队列创建成功queueName defaulttestQueue
[Response] ridR-478d13e5-b999-4ac4-96cf-e0c8df829152, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type5, length192
[Connection]收到响应!type 5,length 192
[connection]发送请求type 7,length 347
[Request] ridR-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type7, length347
[MemoryDataCenter]新绑定添加成功exchangeName defaulttestQueue,queueName defaulttestQueue
[VirtualHost]绑定创建成功 exchangeName defaulttestExchangequeueName defaulttestQueue
[Response] ridR-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type7, length192
[Connection]收到响应!type 7,length 192
[connection]发送请求type 8,length 314
[Request] ridR-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type8, length314
[MemoryDataCenter]绑定删除成功exchangeName defaulttestQueue,queueName defaulttestQueue
[VirtualHost]删除绑定成功
[Response] ridR-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type8, length192
[Connection]收到响应!type 8,length 192
[Request] ridR-c3335cd6-d02e-440e-9aea-5275bf445412, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type2, length188
[BrokerServer] 销毁 channel 完成! channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] ridR-c3335cd6-d02e-440e-9aea-5275bf445412, channelIdC-fa94e733-1642-425a-9da2-a1174b52ab52, type2, length192
[Connection]收到响应!type 2,length 192
[connection]发送请求type 2,length 188
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:55256
[BrokerServer]清理session完成~ 被清理的channeId []
[Connection] 连接正常断开!
[BrokerServer] 服务器停止运行!
2023-08-13 17:22:34.611 INFO 74604 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2023-08-13 17:22:34.646 INFO 74604 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2023-08-13 17:22:34.691 INFO 74604 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.Process finished with exit code 07、测试消息的相关操作 Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.exchangeDeclare(testExchange, ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok channel.queueDeclare(testQueue, true, false);Assertions.assertTrue(ok);byte[] requestBody hello.getBytes();ok channel.basicPublish(testExchange, testQueue, null, requestBody);Assertions.assertTrue(ok);ok channel.basicConsume(testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println([消费数据] 开始!);System.out.println(consumerTag consumerTag);System.out.println(basicProperties basicProperties);Assertions.assertArrayEquals(requestBody, body);System.out.println([消费数据] 结束!);}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[Connection] 发送请求! type1, length188
[Request] ridR-143ae2ea-f258-4874-bff4-be3f719d44ed, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type1, length188
[BrokerServer] 创建 channel 完成! channelIdC-0977501d-4608-4428-ae5c-db2738c02068
[Response] ridR-143ae2ea-f258-4874-bff4-be3f719d44ed, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type1, length192
[Connection] 收到响应! type1, length192
[Connection] 发送请求! type3, length512
[Request] ridR-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type3, length512
[MemoryDataCenter]新交换机添加成功exchangeName defaulttestExchange
[VirtualHost] 交换机创建完成exchangeName defaulttestExchange
[Response] ridR-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type3, length192
[Connection] 收到响应! type3, length192
[Connection] 发送请求! type5, length349
[Request] ridR-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type5, length349
[MemoryDataCenter]队列删除成功queueName defaulttestQueue
[VirtualHost]队列创建成功queueName defaulttestQueue
[Response] ridR-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type5, length192
[Connection] 收到响应! type5, length192
[Connection] 发送请求! type9, length429
[Request] ridR-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type9, length429
[MemoryDataCenter]新消息添加成功messageId M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被投递到到队列中! messageId M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Response] ridR-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type9, length192
[Connection] 收到响应! type9, length192
[Connection] 发送请求! type10, length315
[Request] ridR-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type10, length315
[MemoryDataCenter]消息从队列中取出!messageId M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[VirtualHost]basicConsume成功 queueName defaulttestQueue
[Response] ridR-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type10, length192
[Connection] 收到响应! type10, length192
[MemoryDataCenter]消息进入待确认队列!messageId M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Connection] 收到响应! type12, length520
[MemoryDataCenter]消息从待确认队列删除!messageId M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被移除messageId M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[ConsumerManager]消费被成功消费queueName defaulttestQueue
[消费数据] 开始!
consumerTagC-0977501d-4608-4428-ae5c-db2738c02068
basicPropertiesBasicProperties(messageIdM-b688741f-808b-4cfa-9d7a-1a4f51d28c0b, routingKeytestQueue, deliverMode1)
[消费数据] 结束!
[Connection] 发送请求! type2, length188
[Request] ridR-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type2, length188
[BrokerServer] 销毁 channel 完成! channelIdC-0977501d-4608-4428-ae5c-db2738c02068
[Response] ridR-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelIdC-0977501d-4608-4428-ae5c-db2738c02068, type2, length192
[Connection] 收到响应! type2, length192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:58925
[BrokerServer]清理session完成~ 被清理的channeId []
[BrokerServer] 服务器停止运行!
2023-08-13 18:19:24.906 INFO 75700 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2023-08-13 18:19:24.929 INFO 75700 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2023-08-13 18:19:24.935 INFO 75700 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.Process finished with exit code 0