k网站建设,东莞做网站公司排名,wordpress插图文章排版,肇庆市端州发布一、基本概念、常见工作模式以及简单使用
MQ全称Message Queue (消息队列)#xff0c;是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
小结
MQ消息队列#xff0c;存储消息的中间件分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信发…一、基本概念、常见工作模式以及简单使用
MQ全称Message Queue (消息队列)是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
小结
MQ消息队列存储消息的中间件分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信发送方称为生产者接收方称为消费者
1.Mq的优势和劣势
1.1优势
1.1.1 应用解耦
用户从订单系统向库存系统、支付系统等这一链路发送消息如果库存系统出现错误可能也会导致订单系统出错从而给用户带来不好的体验。 并且如果要在该下单链路上添加新的服务为了完成服务间的通信只能修改订单系统的代码。 如果在订单系统和后面的系统中间加一个消息中间件情况就会好很多。 假设库存系统出错了这并不会影响到订单系统因为中间有一个Mq隔离有时库存系统出错时暂时的等一会儿库存系统被维护好了可以继续从Mq中重新取出消息进行操作。 并且如果需要新增一个服务也不需要更改订单系统的代码。 Mq使得应用间解耦提升容错性和可维护性
1.1.2 异步提速
在黑马点评中就用到了异步提速不过没有使用Mq而是使用的java阻塞队列后面改成了redis的消息中间件 原始得订单系统是同步进行的需要各个服务都完成之后才向用户返回信息 使用Mq后可以在订单系统将消息发送到Mq后直接向用户返回信息后面的库存系统和支付系统可以慢慢的从Mq中取出消息去执行 提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
1.1.3 削峰填谷 小结
应用解耦: 提高系统容错性和可维护性异步提速: 提升用户体验和系统吞吐量削峰填谷: 提高系统稳定性
1.2 劣势
①系统的可用性降低 系统引入的外部依赖越多系统稳定性越差。一旦 MQ宕机就会对业务造成影响。如何保证MQ的高可用?②系统的复杂度提高 MQ的加入大大增加了系统的复杂度以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?③一致性问题 A系统处理完业务,通过MQ给B、C、D三个系统发消息数据如果B系统、C系统处理成功D系统处理失败。如何保证消息数据处理的一致性?
1.3 小结
既然MQ有优势也有劣势那么使用MQ需要满足什么条件呢?
①产者不需要从消费者处获得反馈。引入消息队列之前的直接调用其接口的返回值应该为空,这才让明明下层的动作还没做.上层却当成动作做完了继续往后走即所谓异步成为了可能。 ②容许短暂的不一致性。 ③确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
2.常见的MQ产品
目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、 ZeroMQ、MetaMq等,也有直接使用Redis充当消息队列的案例 黑马点评就是 而这些消息队列产品各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。
3.RabbitMq简介
3.1 AMQP协议
AMQP即Advanced Message Queuing Protosol (高级消息队列协议) , 是一个网络协议,是应用层协议的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年 AMQP规范发布。类比HTTP。 AMQP协议流程图
3.2 RabbitMQ
2007年Rabbit 技术公司基于AMQP标准开发的RabbitMQ 1.0发布。RabbitMQ 采用Erlang语言开发。Erlang语言由Ericson设计专为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
3.3 RabbitMQ架构
①中间这一块是服务端生产者和消费者会与服务段建立connect连接频繁的建立connect连接会影响性能消耗资源所以在一个connect连接中有很多channel可以通过channel建立连接。 ②服务端的内部有很多的虚拟机virtual host ③虚拟机中有交换机和队列交换机通过一些规则和队列绑定
3.4 RabbitMQ的相关概念 Broker: 接收和分发消息的应用RabbitMQ Server就是Message Broker Virtual host: 出于多租户和安全因素设计的把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等 Connection: publisher / consumer和broker之间的TCP连接 Channel: 如果每一次访问 RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的效率也较低。Channel 是在connection内部建立的逻辑连接,如果应用程序支持多线程通常每个thread创建单独的channel进行通讯AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel 作为轻量级的Connection极大减少了操作系统建立TCP connection的开销 Exchange: message 到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有: direct (point-to point), topic (publish-subscribe) and fanout (multicast) Queue: 消息最终被送到这里等待consumer取走 Binding: exchange 和queue之间的虚拟连接, binding 中可以包含routing key。 Binding 信息被保存到exchange中的查询表中用于message的分发依据 RabbitMQ提供了6种工作模式: ①简单模式、②work queues、③Routing路由模式、④Topics 主题模式、⑤Publish/Subscribe 发布与订阅模式、⑥RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)。 官网对应模式介绍: https://www.rabbitmq.com/getstarted.html
3.5 JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口是一个Java平台中关于面向消息中间件的APIJMS是JavaEE规范中的一种,类比JDBC很多消息中间件都实现了JMS规范,例如: ActiveMQ。 RabbitMQ 官方没有提供JMS的实现包但是开源社区有
3.6 小结 4.RabbitMQ的安装与配置
自行在网上找教程
5.RabbitMQ的快速入门
5.1 生产者-消费者模型
简单模式 使用一个入门程序了解生产者和消费者的代码编写 步骤 ①创建工程生产者消费者 ②分别添加依赖 ③编写生产者发送消息 ④编写消费者接收消息 生产者代码 工厂参数要和rabbitmq部署时严格一致以及和rabbitmq management网页中设置的参数一致
package ratbbitMq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/12* * Describe:消息队列生产者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2.设置工厂参数connectionFactory.setHost(1.12.244.105);//ip地址默认localhostconnectionFactory.setVirtualHost(itcast);//虚拟机名称 默认 /connectionFactory.setPort(5673);//端口号 默认值 5672connectionFactory.setUsername(heima);//用户名 默认 guestconnectionFactory.setPassword(heima);//密码 默认 guest//3.创建connection连接Connection connection connectionFactory.newConnection();//4.创建channel连接Channel channel connection.createChannel();//5.创建队列/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments)queue:队列名称durable是否持久化重启后是否存在exclusive是否独占只能有一个交换机与其绑定当connection关闭时是否删除队列autoDelete是否自动删除当没有consumer的时候是否删除队列arguments参数*/channel.queueDeclare(hello_world, true, false, false, null);//6.发送消息/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)exchange:交换机名称简单模式使用默认的交换机routingKey路由名称,简单模式下为queue名称props配置信息body消息*/channel.basicPublish(,hello_world,null,你好.getBytes());channel.close();connection.close();}
}消费者代码 消费者代码和生产者代码创建连接工厂、连接、channel的部分相同需要注意的是使用channel消费消息时需要实现一个回调函数。
package rabbitMq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/12* * Describe:* 消费者*/
public class consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);//虚拟机名称 默认 /connectionFactory.setPort(5673);//端口号connectionFactory.setUsername(heima);//用户名 默认 guestconnectionFactory.setPassword(heima);//密码 默认 guestConnection connection connectionFactory.newConnection();Channel channel connection.createChannel();/*basicConsume(String queue, boolean autoAck, Consumer callback)queue:队列名称autoAck是否自动确认消息callback回调函数*/channel.basicConsume(hello_world, true, new Consumer() {Overridepublic void handleConsumeOk(String s) {}Overridepublic void handleCancelOk(String s) {}Overridepublic void handleCancel(String s) throws IOException {}Overridepublic void handleShutdownSignal(String s, ShutdownSignalException e) {}Overridepublic void handleRecoverOk(String s) {}/*s:标识envelope获取一些信息交换机路由Key等basicProperties参数bytes消息*/Overridepublic void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {System.out.println(consumerTag: s);System.out.println(envelope_exchange: envelope.getExchange());System.out.println(envelope_RoutingKey: envelope.getRoutingKey());System.out.println(properties: basicProperties);System.out.println(body: new String(bytes));}});}
}小结
5.2工作队列模式 Work Queues:与入门程序的简单模式相比多了一个或一些消费端 ,多个消费端共同消费同一一个队列中的消息。同一条消息只能被一个消费者获取。 应用场景对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
生产者代码 工作队列的生产者代码与简单模式的生产者代码区别不大为了展示多条消息被多个消费者消费这里使用循环发送了10条消息
package ratbbitMq.produce_work_queues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/13* * Describe:工作队列模式 生产者*/
public class producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);connectionFactory.setUsername(heima);connectionFactory.setPassword(heima);connectionFactory.setPort(5673);Connection connection connectionFactory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(work_queue, true, false, false, null);for (int i 0; i 10; i) {String s i 测试工作队列;channel.basicPublish(, work_queue, null, s.getBytes());}channel.close();connection.close();}
}消费者代码
package rabbitMq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/12* * Describe:* 消费者*/
public class consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);//虚拟机名称 默认 /connectionFactory.setPort(5673);//端口号connectionFactory.setUsername(heima);//用户名 默认 guestconnectionFactory.setPassword(heima);//密码 默认 guestConnection connection connectionFactory.newConnection();Channel channel connection.createChannel();/*basicConsume(String queue, boolean autoAck, Consumer callback)queue:队列名称autoAck是否自动确认消息如果autoAck参数设置为false即消费者不自动确认消息那么在消费者接收到消息后消息将保留在队列中直到消费者显式地确认消息为止。如果消费者不确认消息并且没有进行任何处理那么该消息将一直留在队列中不会被移除callback回调函数*/channel.basicConsume(hello_world, true, new Consumer() {Overridepublic void handleConsumeOk(String s) {}Overridepublic void handleCancelOk(String s) {}Overridepublic void handleCancel(String s) throws IOException {}Overridepublic void handleShutdownSignal(String s, ShutdownSignalException e) {}Overridepublic void handleRecoverOk(String s) {}/*s:标识envelope获取一些信息交换机路由Key等basicProperties参数bytes消息*/Overridepublic void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {System.out.println(consumerTag: s);System.out.println(envelope_exchange: envelope.getExchange());System.out.println(envelope_RoutingKey: envelope.getRoutingKey());System.out.println(properties: basicProperties);System.out.println(body: new String(bytes));}});}
}
小结 ①在一个队列中如果有多个消费者那么消费者之间对于同一个消息的关系是竞争的关系。 ②Work Queues对于任务过重或(任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。只要有一个服务器获取到短信并发送就行
5.3 Pub/Sub订阅模式 在订阅模型中多了一个Exchange角色而且过程略有变化:
P:生产者也就是要发送消息的程序,但是不再发送到队列中而是发给X (交换机)C:消费者,消息的接收者,会- -直等待消息到来Queue:消息队列接收消息、缓存消息Exchange: 交换机(X) 。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有常见以下3种类型: – Fanout广播将消息发送到所有绑定了交换机的队列 – Direct定向把消息交给符合指定routing key的队列 – Topic通配符把消息交给符合routing pattern路由模式的队列
生产者代码 Pubsub模式的生产者代码和工作队列的区别 Pubsub模式多了一个交换机我们需要创建交换机并且将交换机与创建的队列进行绑定。代码细节请看注释
package ratbbitMq.produce_Pubsub;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/17* * Describe:*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2.设置连接参数connectionFactory.setVirtualHost(itcast);connectionFactory.setHost(1.12.244.105);connectionFactory.setUsername(heima);connectionFactory.setPassword(heima);connectionFactory.setPort(5673);//3.创建连接Connection connection connectionFactory.newConnection();//4.创建ChannelChannel channel connection.createChannel();//5.创建队列String queueName1 test_fanout_queue1;String queueName2 test_fanout_queue2;channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//6.创建交换机/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, MapString, Object arguments)exchange:交换机名称type交换机类型fanout广播direct定向topic通配符headers参数匹配durable是否持久化aotuDelete没有任何与之关联的队列或交换器时会被自动删除internal指定交换器是否是内部的。如果设置为true则表示该交换器仅可用于内部使用不能被客户端用来发布消息。一般用于实现一些特殊的交换器交互模式。arguments一组可选的附加参数用于设置交换器的特定属性。这些参数是键值对的形式允许用户自定义交换器的行为。*/String exchangeName test_fanout;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true);//7.绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)queue:队列名称exchange交换机名称routingKey路由键绑定规则如果交换机类型为fanout则路由键设置为*/channel.queueBind(queueName1,exchangeName,);channel.queueBind(queueName2,exchangeName,);//8.发送消息channel.basicPublish(exchangeName,,null,测试Pubsub——fanout.getBytes());//9.释放资源channel.close();connection.close();}
}消费者代码 消费者代码和工作队列模式的代码没有区别
package rabbitMq.produce_Pubsub;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/17* * Describe:*/
public class Consumer_Pubsub1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);connectionFactory.setUsername(heima);connectionFactory.setPassword(heima);connectionFactory.setPort(5673);Connection connection connectionFactory.newConnection();Channel channel connection.createChannel();channel.basicConsume(test_fanout_queue1,true, new com.rabbitmq.client.Consumer() {Overridepublic void handleConsumeOk(String s) {}Overridepublic void handleCancelOk(String s) {}Overridepublic void handleCancel(String s) throws IOException {}Overridepublic void handleShutdownSignal(String s, ShutdownSignalException e) {}Overridepublic void handleRecoverOk(String s) {}Overridepublic void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// System.out.println(consumerTag: s);
// System.out.println(envelope_exchange: envelope.getExchange());
// System.out.println(envelope_RoutingKey: envelope.getRoutingKey());
// System.out.println(properties: basicProperties);System.out.println(body: new String(bytes));}});}
}
Pubsub工作模式和工作队列的工作模式的区别在于 Pubsub多了一个交换机非默认交换机交换机将消息发送到与之绑定的每个队列每个队列都会有同样的消息不同的消费者再分别从不同的队列中获取相同的消息。
工作队列只有一个消息队列没有交换机非默认交换机消息会被发送到一个队列中不同的消费者从同一个队列中获取不同的消息。
5.4 Routing工作模式
模式说明
队列与交换机的绑定不能是任意绑定了,而是要指定一个RoutingKey (路由key)消息的发送方在向Exchange发送消息时也必须指定消息的RoutingKeyExchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的Routing key完全一致 才会接收到消息 我们通过一个例子来了解Routing路由模式 在项目中我们需要把一些普通日志信息infowarning打印到控制台不需要存在数据库中还有一些重要的日志信息error在打印到控制台的同时也需要存在数据库中。 这里我们就需要用到Routing路由工作模式。 在Pubsub订阅模式中交换机并没有指定RoutingKey而在Routing工作模式中我们需要指定Routingkey以将交换机与指定的队列进行绑定。 生产者代码 Routing工作模式时生产者发送消息需要指定RoutingKey在交换机和队列绑定时也需要指定RoutingKey之前介绍的三种工作模式都是不需要指定RoutingKey的。
package ratbbitMq.produce_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/17* * Describe:*/
public class producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2.设置工厂参数connectionFactory.setPort(5673);connectionFactory.setUsername(heima);connectionFactory.setPassword(heima);connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);//3.创建连接对象Connection connection connectionFactory.newConnection();//4.创建channelChannel channel connection.createChannel();//5.创建队列channel.queueDeclare(test_routing1, true, false, false, null);channel.queueDeclare(test_routing2, true, false, false, null);//6.创建交换机channel.exchangeDeclare(routing, BuiltinExchangeType.DIRECT);//7.绑定交换机和队列并设置RoutingKeychannel.queueBind(test_routing1,routing,error);channel.queueBind(test_routing2,routing,warning);channel.queueBind(test_routing2,routing,info);//8.发送消息并在发送消息的时候指定RoutingKeychannel.basicPublish(routing,warning,null,xxx访问了xxx方法 【warning】.getBytes());channel.basicPublish(routing,error,null,xxx访问了xxx方法 【error】.getBytes());//9.释放资源channel.close();connection.close();}
}消费者代码 消费者代码与前面三种模式无区别
package rabbitMq.consumer_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/17* * Describe:*/
public class Consumer_routing1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);connectionFactory.setUsername(heima);connectionFactory.setPassword(heima);connectionFactory.setPort(5673);Connection connection connectionFactory.newConnection();Channel channel connection.createChannel();channel.basicConsume(test_routing1,true, new Consumer() {Overridepublic void handleConsumeOk(String s) {}Overridepublic void handleCancelOk(String s) {}Overridepublic void handleCancel(String s) throws IOException {}Overridepublic void handleShutdownSignal(String s, ShutdownSignalException e) {}Overridepublic void handleRecoverOk(String s) {}Overridepublic void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// System.out.println(consumerTag: s);
// System.out.println(envelope_exchange: envelope.getExchange());
// System.out.println(envelope_RoutingKey: envelope.getRoutingKey());
// System.out.println(properties: basicProperties);System.out.println(body: new String(bytes));}});}
} 5.5 Topic工作模式
Topic工作模式是更灵活的工作模式
在 RabbitMQ 的 Topic 模式中通配符用于匹配特定的 Routing Key。Topic 模式支持两种通配符星号*和井号#。
星号*通配符
星号可以匹配一个单词单词是由点号.分隔的字符串。 例如“*.apple” 可以匹配 “red.apple”、“green.apple”但不能匹配 “apple” 或 “red.green.apple”。 井号#通配符
#井号可以匹配零个或多个单词单词是由点号.分隔的字符串。 例如“fruit.#” 可以匹配 “fruit.apple”、“fruit.orange”、“fruit.red.apple”以及类似的任何其他组合。 注意事项
通配符只匹配单词不匹配点号.本身。Topic 模式中的 Routing Key 可以包含任意数量的单词。匹配时较长的通配符含有更多单词将会优先匹配。 举例说明 假设有一个 Topic Exchange 名为 “fruits_exchange”有两个队列绑定到该 Exchange 上队列 A 绑定键为 “*.apple”队列 B 绑定键为 “fruit.#”。当消息被发布到 “fruits_exchange” 时根据消息的 Routing KeyRabbitMQ 将会将消息分发到匹配的队列。
发布消息到 “fruits_exchange”Routing Key 为 “green.apple”此消息将被发送到队列 A 。 发布消息到 “fruits_exchange”Routing Key 为 “fruit.orange”此消息将仅被发送到队列 B因为它只匹配了队列 B 的绑定键。 通过使用不同的通配符规则Topic 模式允许更灵活和精确地路由消息到不同的队列以满足复杂的消息路由需求。 举例 下面我们将使用Topic工作模式实现下面的要求 生产者代码 Topic工作模式的生产者代码与Routing路由模式的生产者代码的区别是Topic的代码在绑定队列和交换机时需要指定带有通配符的Routingkey ,并且在定义交换机时不要忘了将交换机的类型设置为topic.
package ratbbitMq.produce_Topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/18* * Describe:*/
public class producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2.设置工厂参数connectionFactory.setPort(5673);connectionFactory.setUsername(heima);connectionFactory.setPassword(heima);connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);//3.创建连接对象Connection connection connectionFactory.newConnection();//4.创建channelChannel channel connection.createChannel();//5.创建队列channel.queueDeclare(test_Queue_Topic1, true, false, false, null);channel.queueDeclare(test_Queue_Topic2, true, false, false, null);channel.queueDeclare(test_Queue_Topic3, true, false, false, null);channel.queueDeclare(test_Queue_Topic4, true, false, false, null);//6.创建交换机channel.exchangeDeclare(Topic_exchange, BuiltinExchangeType.TOPIC);//7.绑定交换机和队列并设置RoutingKeychannel.queueBind(test_Queue_Topic1,Topic_exchange,usa.#);channel.queueBind(test_Queue_Topic2,Topic_exchange,#.news);channel.queueBind(test_Queue_Topic3,Topic_exchange,#.weather);channel.queueBind(test_Queue_Topic4,Topic_exchange,europe.#);//8.发送消息并在发送消息的时候指定RoutingKeychannel.basicPublish(Topic_exchange,usa.news,null,美国新闻【usa.news】.getBytes());channel.basicPublish(Topic_exchange,usa.weather,null,美国天气【usa.weather】.getBytes());channel.basicPublish(Topic_exchange,europe.news,null,欧洲新闻【europe.news】.getBytes());channel.basicPublish(Topic_exchange,europe.weather,null,欧洲天气【europe.weather】.getBytes());//9.释放资源channel.close();connection.close();}
}消费者代码 在这个例子中我创建了四个消费者消费者的代码都相同只是在绑定队列时要指定获取消息的队列名称。
package rabbitMq.consumer_Topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** author Watching* * date 2023/7/17* * Describe:*/
public class Consumer_topic1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(1.12.244.105);connectionFactory.setVirtualHost(itcast);connectionFactory.setUsername(heima);connectionFactory.setPassword(heima);connectionFactory.setPort(5673);Connection connection connectionFactory.newConnection();Channel channel connection.createChannel();channel.basicConsume(test_Queue_Topic1,true, new Consumer() {Overridepublic void handleConsumeOk(String s) {}Overridepublic void handleCancelOk(String s) {}Overridepublic void handleCancel(String s) throws IOException {}Overridepublic void handleShutdownSignal(String s, ShutdownSignalException e) {}Overridepublic void handleRecoverOk(String s) {}Overridepublic void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// System.out.println(consumerTag: s);
// System.out.println(envelope_exchange: envelope.getExchange());
// System.out.println(envelope_RoutingKey: envelope.getRoutingKey());
// System.out.println(properties: basicProperties);System.out.println(body: new String(bytes));}});}
}
运行结果 小结 Topic主题模式可以实现Pub/Sub发布与订阅模式和Routing路由模式的功能只是Topic 在配置routing key的时候可以使用通配符显得更加灵活。
6.Springboot整合RabbitMQ
在实际的项目中我们是不可能直接使用原生的代码进行开发的springboot的整合会帮助我们简化开发。 下面是springboot整合rabbitMQ的流程步骤 生产者
①导入依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency②编写yml配置文件
spring:rabbitmq:password: heimausername: heimaport: 5673virtual-host: itcasthost: 1.12.244.105③创建配置类定义交换机、队列、绑定关系。 创建配置类和交换机和绑定关系的代码是将原生的代码封装了不指定的属性会是默认属性。 这里定义的是Topic工作模式其它类型的模式代码类似。
package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** author Watching* * date 2023/7/18* * Describe:*/
Configuration
public class MQConfig {public static final String EXCHANGE_NAME boot_topic_exchange;public static final String QUEUE_NAME boot_topic_queue;/*创建交换机*/Bean(topic_exchange_bean)public Exchange bootExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}/*创建队列*/Bean(topic_queue_bean)public Queue bootQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}/*绑定交换机*/Beanpublic Binding bootBinding(Qualifier(topic_exchange_bean) Exchange exchange,Qualifier(topic_queue_bean) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(test.*).noargs();}
}
④注入RabbitTemplate对象调用方法完成操作。 这里是在测试类中执行生产者操作。
package com.rabbitmq.springboot_mqproducer;import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;SpringBootTest
class SpringbootMqProducerApplicationTests {ResourceRabbitTemplate rabbitTemplate;Testvoid contextLoads() {rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,test.hello,测试springboot整合交换机);}
}消费者 消费者前两步代码和生产者相同不需要再编写定义交换机、队列、绑定关系的配置类最后一步需要使用 RabbitListener 注解进行监听。 代码
package com.rabbit.springboot_mqconsumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** author Watching* * date 2023/7/19* * Describe:*/
Component
public class RabbitMQListener {RabbitListener(queues {boot_topic_queue})//填写队列名称,可以以字符串数组的方式监听多个队列public void listener(Message message){System.out.println(message);}
}
小结 ①SpringBoot提供了快速整合RabbitMQ的方式 ②基本信息再ym|中配置队列交互机以及绑定关系在配置类中使用Bean的方式配置 ③生产端直接注入 RabbitTemplate 完成消息发送 ④消费端直接使用 RabbitListener 完成消息接收