ppt模版模板免费网站,网站 建设 外包,公司简介模板300字,网站导航条代码【RabbitMQ】RabbitMQ配置与交换机学习 文章目录 【RabbitMQ】RabbitMQ配置与交换机学习简介安装和部署1. 安装RabbitMQ2.创建virtual-host3. 添加依赖4.修改配置文件 WorkQueues模型1.编写消息发送测试类2.编写消息接收#xff08;监听#xff09;类3. 实现能者多劳 交换机F…【RabbitMQ】RabbitMQ配置与交换机学习 文章目录 【RabbitMQ】RabbitMQ配置与交换机学习简介安装和部署1. 安装RabbitMQ2.创建virtual-host3. 添加依赖4.修改配置文件 WorkQueues模型1.编写消息发送测试类2.编写消息接收监听类3. 实现能者多劳 交换机Fanout交换机1.消息发送2.消息监听 Direct交换机1.消息发送2.消息接收 Topic交换机1.消息发送2.消息接收 声明队列和交换机声明队列声明交换机绑定队列和交换机1.fanout示例2. direct示例3.基于注解的方式声明队列和交换机 消息转换器 总结 简介
RabbitMQ是一个开源的消息代理软件它实现了高级消息队列协议AMQP。RabbitMQ支持多种消息传递协议具有高可靠性、高可用性和高性能等特点。它允许应用程序通过消息队列进行异步通信从而实现解耦和负载均衡。RabbitMQ的核心概念包括交换机Exchange、队列Queue和绑定Binding它们共同构成了消息的路由和传递机制。
RabbitMQ的架构如图 其中包含几个概念
publisher生产者也就是发送消息的一方consumer消费者也就是消费消息的一方queue队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理exchange交换机负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。virtual host虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的exchange、queue
安装和部署
这里以Centos7为例
1. 安装RabbitMQ
docker run \-e RABBITMQ_DEFAULT_USERshijun \-e RABBITMQ_DEFAULT_PASS123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management-e RABBITMQ_DEFAULT_USERshijun -e参数用于设置环境变量。这行代码用来设置RabbitMQ的默认用户名为shijun。 -e RABBITMQ_DEFAULT_PASS123321 这行代码用来设置默认密码为123321。 -p 15672:15672 这行代码用来将宿主机的15672端口映射到容器的15672端口15672端口是RabbitMQ管理控制台的默认端口。 -p 5672:5672 这行代码用来将宿主机的5672端口映射到容器的5672端口5672端口是RabbitMQ的默认通信端口。 --network hm-net 这行代码将容器连接到名为hm-net的网络。 -d -d参数表示以后台模式运行容器。 rabbitmq:3.8-management 这里是我们要运行的rabbitmq的Docker镜像这里选择的是RabbitMQ 3.8版本版本需要根据自己的SpringCloud版本来选择。 安装完成后访问http://虚拟机IP地址:15672
输入刚才的账号密码shijun 123321就能进入控制台界面。 2.创建virtual-host 由于RabbitMQ 每秒并发能力为几万一般项目都不会达到这个规模因此我们可以让多个项目使用同一个RabbitMQ 。要实现项目直接的隔离需要创建virtual-host每个项目对应一个virtual-host。 按顺序点击填入“Name”和“Descrption”然后点击“Add virtual host”按钮 然后在右上角切换到创建的virtual-host 3. 添加依赖 dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency4.修改配置文件
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.56.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /mq-demo # 虚拟主机username: shijun # 用户名password: 123321 # 密码WorkQueues模型 Work queues任务模型。简单来说就是让多个消费者绑定到一个队列共同消费队列中的消息。 当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。 此时就可以使用work 模型多个消费者共同处理消息处理消息处理的速度就能大大提高了。 在控制台创建一个work.queue队列 1.编写消息发送测试类
import static org.junit.jupiter.api.Assertions.*;
SpringBootTest
class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testWorkQueue() throws InterruptedException {// 队列名称String queueName work.queue;// 消息String message hello, message_;for (int i 0; i 50; i) {// 发送消息每20毫秒发送一次相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}}
}RabbitTemplateSpring AMQP提供的模板类用于发送和接收消息。rabbitTemplate.convertAndSend(queueName, message i);使用RabbitTemplate发送消息到指定的队列。 2.编写消息接收监听类
Component
public class SpringRabbitListener {/*** 监听名为work.queue的RabbitMQ队列接收并处理来自队列的消息。* 通过延迟执行模拟消息处理时间* * param msg 从队列中接收到的消息内容以字符串形式提供* throws InterruptedException 如果线程在睡眠期间被中断则抛出此异常*/RabbitListener(queues work.queue)public void listenWorkQueue1(String msg) throws InterruptedException {// 输出接收到消息的时间以便跟踪消息处理的时间点System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());// 模拟消息处理时间让线程睡眠20毫秒Thread.sleep(20);}RabbitListener(queues work.queue)public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);}
}RabbitListener此注解用来设置该方法应该作为RabbitMQ消息队列的监听器。这意味着当队列中有消息发布时Spring框架会自动调用此方法来处理这些消息。queues work.queue指定要监听的RabbitMQ队列的名称在这个例子中是work.queue。注意到这两消费者都设置了Thead.sleep模拟任务耗时 消费者1 sleep了20毫秒相当于每秒钟处理50个消息消费者2 sleep了200毫秒相当于每秒处理5个消息 运行后查看结果 观察可以发现 一个消息只会被一个消费者处理当有很多消息时会平均分配一个消费者负责奇数的消息一个消费者负责偶数的消息 但问题是消费者之间的消费能力可能不一样有的消费能力强有消费的弱会出现部分消费者一直空闲而其他消费者一直繁忙的状况没有充分利用每一个消费者的能力 3. 实现能者多劳
修改配置文件
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息再次运行查看结果 观察可以发现 消费者2处理了大概5条消息而消费者1处理了40多条消息成功实现能者多劳。 交换机 在之前的RabbitMQ的架构图中我们可以看到里面还有一项Exchange没有提到那就是RabbitMQ中的交换机。 交换机是RabbitMQ中用于接收生产者发送的消息并将这些消息路由到一个或多个队列的组件。 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失 交换机有不同的类型常见的有以下几种
Fanout Exchange扇出交换机将消息广播到所有绑定的队列不考虑路由键。Direct Exchange直连交换机根据消息的路由键精确匹配队列。Topic Exchange主题交换机根据路由键的模式匹配队列。Headers Exchange头交换机根据消息的头部信息匹配队列。
Fanout交换机 扇出交换机将消息广播到所有绑定的队列而不考虑路由键。这种交换机非常适合需要将消息广播到多个消费者的场景。 假如说当订单服务有了一笔新订单之后就要去通知短信服务、商品服务等等有了交换机之后就只需要将消息发给交换机然后为每一个微服务创建一个队列并绑定之后当有消息时交换机就会把消息发送到所有队列就能实现一个消息被多个消费者处理了。 可以有多个队列每个队列都要绑定到Exchange交换机生产者发送的消息只能发送到交换机交换机把消息发送给绑定过的所有队列订阅队列的消费者都能拿到消息 创建Fanout交换机 创建两个队列fanout.queue1、fanout.queue2 点击刚刚创建的交换机进入 将刚才创建的两个队列绑定到交换机 1.消息发送
在SpringAmqpTest类中添加测试方法
Test
public void testFanoutExchange() {// 交换机名称String exchangeName demo.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);
}abbitTemplate.convertAndSend(exchangeName, , message);中的第二个参数用来指定路由键。对于Fanout交换机路由键没有实际意义因此可以传递一个空字符串。 2.消息监听
在SpringRabbitListener中添加两个方法
RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);
}RabbitListener(queues fanout.queue2)
public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);
}运行代码查看结果 交换机的作用是什么
接收publisher发送的消息将消息按照规则路由到与之绑定的队列不能缓存消息路由失败消息丢失FanoutExchange的会将消息路由到每个绑定的队列
Direct交换机 在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 直连交换机根据消息的路由键精确匹配队列。只有当消息的路由键与绑定的路由键完全匹配时消息才会被路由到相应的队列。 在Direct模型下
队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息 创建direct.queue1和direct.queue2两个队列之后创建一个direct类型的交换机 绑定队列到交换机最终结果如图所示
1.消息发送
在SpringAmqpTest类中添加测试方法
Test
public void testSendDirectExchange1() {// 交换机名称String exchangeName demo.direct;// 消息String message 红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message);
}Test
public void testSendDirectExchange2() {// 交换机名称String exchangeName demo.direct;// 消息String message 最新报道哥斯拉是居民自治巨型气球虚惊一场;// 发送消息rabbitTemplate.convertAndSend(exchangeName, blue, message);
}2.消息接收
在SpringRabbitListener中添加方法
RabbitListener(queues direct.queue1)
public void listenDirectQueue1(String msg) {System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(queues direct.queue2)
public void listenDirectQueue2(String msg) {System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}运行测试类中的testSendDirectExchange1查看结果 运行测试类中的testSendDirectExchange2查看结果 观察可以发现 当发送的消息的Routing key为red时两个消息队列都能收到 当发送的消息的Routing key为red时只有消息队列1才能收到 Topic交换机 Topic交换机Topic Exchange是RabbitMQ中一种功能强大的交换机类型它通过路由键的模式匹配将消息路由到一个或多个队列。Topic交换机允许使用通配符来匹配路由键从而实现灵活的消息路由。 通配符 在绑定键中可以使用两个特殊字符来实现模式匹配 *匹配一个单词。#匹配零个或多个单词。 如图所示假如此时publisher发送的消息使用的RoutingKey共有四种
china.news 代表有中国的新闻消息china.weather 代表中国的天气消息japan.news 则代表日本新闻japan.weather 代表日本的天气消息
解释
topic.queue1绑定的是china.# 凡是以 china.开头的routing key 都会被匹配到包括 china.newschina.weather topic.queue2绑定的是#.news 凡是以 .news结尾的 routing key 都会被匹配。包括: china.newsjapan.news 更多范例 假设我们有以下绑定键 *.orange.**.*.rabbitlazy.# 我们可以通过以下路由键进行消息路由 路由键quick.orange.rabbit将匹配第一个和第二个绑定键。路由键lazy.orange.elephant将匹配第一个和第三个绑定键。路由键lazy.brown.fox将匹配第三个绑定键。路由键lazy将匹配第三个绑定键。 按照之前的流程创建Topic交换机和队列并进行绑定最终结果如下
1.消息发送
在SpringAmqpTest类中添加测试方法
Test
public void testSendTopicExchange1() {// 交换机名称String exchangeName demo.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message);
}Test
public void testSendTopicExchange1() {// 交换机名称String exchangeName demo.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.weather, message);
}2.消息接收
在SpringRabbitListener中添加方法
RabbitListener(queues topic.queue1)
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(queues topic.queue2)
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}运行测试类中的testSendTopicExchange1后观察结果 观察发现两个消息队列都收到了说明china.# 和#.news都匹配成功了。 运行测试类中的testSendTopicExchange2后观察结果 观察可以发现只有消息队列1匹配成功说明china.#匹配成功。 声明队列和交换机 之前我们创建交换机是通过控制台创建的然而实际开发中是不推荐使用这种方式因为可能会出现一些问题更推荐让程序员通过代码来判断交换机和队列是否存在如果没有再进行创建。 在实际开发中RabbitMQ的配置类一般放到消费者包下生产者一般会关心消息是否发送成功。 声明队列
队列是RabbitMQ中用于存储消息的组件。Spring AMQP通过Queue类来声明队列。队列有以下几个重要属性
name队列名称。durable是否持久化。持久化队列在RabbitMQ重启后仍然存在信息持久到磁盘。exclusive是否排他。排他队列只能被创建它的连接使用并且在连接断开时自动删除。autoDelete是否自动删除。当最后一个消费者断开连接后自动删除队列。
比如
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQQueueConfig {Beanpublic Queue myQueue() {return new Queue(simple.queue);}
}声明了一个名为simple.queue的队列默认为持久化、非排他、非自动删除。 声明交换机
使用ExchangeBuilder声明交换机ExchangeBuilder类提供了多种方法来配置交换机的属性。以下是一些常用的方法
durable()声明持久化交换机。autoDelete()声明自动删除交换机。withArgument()添加交换机的自定义参数。
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQExchangeConfig {Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange(direct.exchange).durable(true).build();}Beanpublic FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange(fanout.exchange).durable(true).build();}Beanpublic TopicExchange topicExchange() {return ExchangeBuilder.topicExchange(topic.exchange).durable(true).build();}}绑定队列和交换机 在RabbitMQ中绑定关系Binding是交换机和队列之间的连接。绑定关系告诉交换机如何将消息路由到队列。在Spring AMQP中我们可以使用BindingBuilder类来声明和配置绑定关系。 BindingBuilder类提供了一些静态方法来创建绑定关系。常用的方法包括
bind()绑定队列到交换机。to()指定交换机。with()指定路由键用于直连交换机和主题交换机。where()指定头部信息用于头交换机。
1.fanout示例
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class FanoutConfig {/*** 声明交换机* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(demo.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}在控制台删除demo.fanout交换机和fanout.queue2 、fanout.queue2这两个队列再次运行代码会发现删除的又重新出现了 2. direct示例 direct交换机 import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class DirectConfig {/*** 声明交换机* return Direct类型交换机*/Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(direct.exchange).build();}/*** 第1个队列*/Beanpublic Queue directQueue1(){return new Queue(direct.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(blue);}/*** 第2个队列*/Beanpublic Queue directQueue2(){return new Queue(direct.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(yellow);}
}3.基于注解的方式声明队列和交换机 基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。 修改SpringRabbitListener类 Component
public class SpringRabbitListener {// .......RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name demo.direct, type ExchangeTypes.DIRECT),key {red, blue}))public void listenDirectQueue1(String msg){System.out.println(消费者1接收到direct.queue1的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name demo.direct, type ExchangeTypes.DIRECT),key {red, yellow}))public void listenDirectQueue2(String msg){System.out.println(消费者2接收到direct.queue2的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name demo.topic, type ExchangeTypes.TOPIC),key china.#))public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name demo.topic, type ExchangeTypes.TOPIC),key #.news))public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);}// .......}QueueBinding注解包含以下几个主要部分 value定义队列使用Queue注解。exchange定义交换机使用Exchange注解。key定义路由键使用字符串数组。 删除交换机和队列后再次运行会发现又重新出现 消息转换器 Spring的消息发送代码接收的消息体是一个Object 在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。 只不过默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题 数据体积过大有安全漏洞可读性差 因此我们可以配置JSON转换器来解决这个问题。 引入Jackson依赖 dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactId/dependency配置消息转换器 在publisher和consumer两个服务的启动类中添加一个Bean即可
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}测试
在FanoutConfig类中声明队列 Beanpublic Queue objectQueue() {return new Queue(object.queue);}
在SpringAmqpTest类中添加
Test
public void testSendMap() throws InterruptedException {// 准备消息MapString,Object msg new HashMap();msg.put(name, 柳岩);msg.put(age, 21);// 发送消息rabbitTemplate.convertAndSend(object.queue, msg);
}在SpringRabbitListener类中添加
RabbitListener(queues object.queue)
public void listenSimpleQueueMessage(MapString, Object msg) throws InterruptedException {System.out.println(消费者接收到object.queue消息【 msg 】);
}运行测试类查看结果 总结
本文较为详细的记录了RabbitMQ的安装配置以及交换机学习希望本文对大家学习RabbitMQ有所帮助。