当前位置: 首页 > news >正文

佛山小学网站建设安装网站模版视频教程

佛山小学网站建设,安装网站模版视频教程,上海专业做网站的公司,西安大型网站建设公司排名目录 一#xff1a;初始RabbitMQ 1. 同步和异步通讯 1.1 同步调用 1.2 异步调用 2. MQ常见框架 二#xff1a;RabbitMQ快速入门 1. RabbitMQ概述和安装 2. 常见消息队列模型 3. 快速入门案例 三#xff1a;SpringAMQP 1. Basic Queue 简单队列模型 2. Work Queu…目录 一初始RabbitMQ 1. 同步和异步通讯 1.1 同步调用 1.2 异步调用 2. MQ常见框架 二RabbitMQ快速入门 1. RabbitMQ概述和安装 2. 常见消息队列模型 3. 快速入门案例 三SpringAMQP 1. Basic Queue 简单队列模型 2. Work Queue 工作队列模型 3. 发布订阅模型-Fanout 发布 4. 发布订阅模型-Direct 发布 5. 发布订阅模型-Topic 发布 6. 消息转换器 前些天突然发现了一个巨牛的人工智能学习网站通俗易懂风趣幽默感兴趣的同学可以进行学习人工智能学习 一初始RabbitMQ 1. 同步和异步通讯 同步通讯和异步通讯理解 生活中就有很多同步和异步的案例例1假如你现在与一个妹子聊天采用同步通信更像是打视频电话就像直播一样所得到的信息都能立刻同步过去具有一定的优势而异步通信更像是微信聊天别人不想理你也不知道时效性不是那么好但也有自己的优点。例2假如你现在在和三个妹子聊天同步通信只能一个妹子聊就会错失很多良机异步通信可以多个妹子一块聊还不会被发现所以那么牛的技术我们当然要好好学习 1.1 同步调用 案例前面学习的微服务间基于Feign的调用就属于同步方式就存在以下问题 耦合度高每次加入新的需求都要修改原来的代码 对于一个订单业务我们支付成功后就需要更改订单服务修改订单状态然后进行发货支付服务调用订单服务还是存储服务都需要等待对方的响应是实时的调用。此时一个完整的系统开发好了如果产品经理需要增加一个短信通知服务等功能此时就需要在支付服务里增加代码每次增加一个业务代码就需要更改具有很强的耦合性 性能下降吞吐量调用者需要等待服务提供者响应如果调用链过长则响应时间等于每次调用的时间之和 假如现在调用支付服务需要50ms支付服务调用其它服务都需要150ms支付服务调用每个服务都是同步调用所以只能进行等待当前调用完成才可以调用其它的服务所以一个完整的服务调用下来就需要500ms这相当于1s中只能处理请求。数以十万百万的请求过来根本顶不住性能下降、吞吐量也下降了 资源浪费调用链中的每个服务在等待响应过程中不能释放请求占用的资源高并发场景下会极度浪费系统资源 在支付服务等待订单服务的过程CPU和内存都在占用着啥都不干只有某个服务调用完成才会执行下一个在等待的过程中浪费大量资源资源利用的不够充分 级联失败如果服务提供者出现问题所有调用方都会跟着出问题如同多米诺骨牌一样迅速导致整个微服务群故障 假如现在存储服务挂了此时支付服务进行访问就会一直进入阻塞状态这个请求就不会被释放后面阻塞的越来越多等待资源耗尽支付服务就进不去了相当于支付服务也挂了所以造成整个服务就瘫痪了 总结同步调用 优点时效性强可以立即得到结果。 缺点耦合度高、性能和吞吐能力下降、有额外的资源消耗、有级联失联问题。 1.2 异步调用 异步调用常见实现就是事件驱动模式 在支付服务与其它服务之间引入一个Broker事件代理者。一旦有人支付成功就是一个事件这个事件交给Broker来管理而订单、仓储等服务就会找Broker这个老大哥一旦有人支付成功你要通知我们订阅事件所以一旦有人支付成功Broker就会发布支付成功事件这里通知完就会返回给用户不会等待其它服务响应完去通知其它服务有人支付成功了此时其它服务就会去修改订单状态 优势一服务解耦 原来增加业务需要更改业务的代码现在就不用了因为现在支付服务不负责调用只负责发送一个事件到Broker至于是谁接收什么时间接收有没有完成完全不用管。所以一旦有新的业务只需要订阅新的Broker事件即可到时候直接大喇嘛一喊就能通知到你注这样将来增加或删除业务就不需要更改代码只需要订阅或取消订阅事件即可。 优势二性能提升吞吐量提升 以前的耗时是总耗时加在一起50150*3500ms现在只要支付成功支付服务就向Broker发布事件立刻就能返还给用户支付成功501060ms。而Broker通知其它服务什么时候去完成多久去完成完全不用管。 优势三服务没有强依赖不担心级联失败问题没有资源浪费 支付服务相当于借用Broker去通知而不是调用此时仓储服务挂了也和我没关系只需要重启仓储服务即可。既然没有强的依赖关系我不调用你也不需要等待你所以就没有了资源浪费。 优势四流量削峰 假设现在有多个用户发出请求此时Broker就起到一个缓冲的作用把请求都放到让订单服务、仓储等服务按照自己的能力去处理业务处理完再去Broker取现在此时的压力是Broker扛着。 总结异步调用 优点耦合度低性能和吞吐量提升故障隔离没有资源消耗没有级联失联问题流量消峰。 缺点依赖于Broker的可靠性、安全性、吞吐能力架构复杂了业务没有明显的流程线不好追踪管理。 2. MQ常见框架 MQ MessageQueue中文是消息队列字面来看就是存放消息的队列。也就是事件驱动架构中的Brokeri MQ常见的四种实现RabbitMQ、ActiveMQ、RocketMQ、Kafka  RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQPXMPPSMTPSTOMPOpenWire,STOMPREST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般 追求可用性Kafka、 RocketMQ 、RabbitMQ 追求可靠性RabbitMQ、RocketMQ 追求吞吐能力RocketMQ、Kafka 追求消息低延迟RabbitMQ、Kafka 二RabbitMQ快速入门 1. RabbitMQ概述和安装 RabbitMQ概述 RabbitMQ是基于Erlang语言开发的开源消息通信中间件官网地址RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ RabbitMQ安装 单机部署基于Centos7虚拟机中使用Docker来安装 第一步下载镜像 ①在线拉取 docker pull rabbitmq:3-management ②从本地加载使用本地已经安装的镜像包  上传到虚拟机目录后例如tmp目录使用命令加载镜像即可 docker load -i mq.tar 第二步安装MQ 执行下面的命令来运行MQ容器 docker run \-e RABBITMQ_DEFAULT_USERitcast \ # -e设置环境变量用户名和密码-e RABBITMQ_DEFAULT_PASS123321 \--name mq \--hostname mq1 \ # --hostname配置主机名集群部署需要配置这个-p 15672:15672 \ # 管理平台的端口-p 5672:5672 \ # 消息通信的端口-d \rabbitmq:3-management 第三步查看状态 docker ps 成功启动 第四步登录管理品台页面 注如果出现第二天登录不上的情况请重启dockerservice docker restart 192.168.#.#:15672 # 前面是虚拟机IP后面是端口 输入设置的账户密码 需要注意的是每个用户都需要有自己独享的虚拟主机 RabbitMQ的结构和概念 Publisher是消息的发送者Consumer是消息的消费者。发送者将来会把消息发送到exchange交换机交换机会把消息路由到queue队列队列负责暂存消息而后消费者从队列中获取消息然后处理消息 注每创建一个用户都对应一个VirtualHost虚拟主机各个虚拟主机之间是相互隔离的看不到这样可以避免干扰。 总结RabbitMQ中的几个概念 ①channel操作MQ的工具  ②exchange路由消息到队列中 ③queue缓存消息 ④virtual host虚拟主机是对queue、exchange等资源的逻辑分组、隔离 2. 常见消息队列模型 MQ的官方文档中给出了7个MQ的Demo示例其中与消息发送和接收有关系的就是前5个 ①其中前2个命名为基本消息队列BasicQueue和工作消息队列WorkQueue这两种有一个共同的特征消息的发送和接收都是基于队列来完成的没有通过交换机其中P代表发送者、C代表消费者、中间的红色部分代表消息队列。 ②后3个都属于发布订阅Publish、Subscribe只是交换机类型不同分为三种Fanout Exchange广播、Direct Exchange路由、Topic Exchange主题其中紫色的部分就代表交换机。 3. 快速入门案例 HelloWorld案例---》基本消息队列入门 注mq-demo是父工程用来做依赖管理consumer和publisher是两个子工程 官方的HelloWorld是基于最基础的消息队列模型来实现的只包括三个角色 publisher消息发布者将消息发送到队列 queue消息队列负责接受并缓存消息 consumer订阅队列处理队列中的消息 注其中queue是由MQ进行管理的所以我们只需要写publisher和consumer这两部分代码 mq-demo父工程 pom.xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.itcast.demo/groupIdartifactIdmq-demo/artifactIdversion1.0-SNAPSHOT/versionmodulesmodulepublisher/modulemoduleconsumer/module/modulespackagingpom/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.9.RELEASE/versionrelativePath//parentpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency/dependencies /project publisher消息的发送者 其中启动类和yml文件是SpringBoot工程必备的没什么好说的最主要的是测试类 ①首先要先创建连接需要连接工厂ConnectioFactory ②根据连接工厂去设置连接的信息连接的地址、端口号、虚拟主机、用户名、密码 ③前面连接工厂和参数都准备好了然后就是调用连接工厂ConnectionFactory的newConnection方法正式建立连接connection ④正式建立连接后就需要调用connection的createChannel建立通道channnel这样生产者和消费者才能完成消息的发送和接收 ⑤通道有了就可以基于通道向队列queue中发送消息了首先是声明了队列的名称然后调用通道的queueDeclare方法向队列中发送消息 ⑥有了队列生产者就可以向队列中发送消息了把准备的消息发送到队列当中以字节的形式发送出去。 ⑦最后在关闭通道和连接。 注无论是声明队列还是向队列中发送消息实际上使用的都是通道channel package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test;import java.io.IOException; import java.util.concurrent.TimeoutException;public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.150.101);factory.setPort(5672); // 5672是通信的端口15672是管理的接口factory.setVirtualHost(/);factory.setUsername(itcast);factory.setPassword(123321);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message hello, rabbitmq!;channel.basicPublish(, queueName, null, message.getBytes());System.out.println(发送消息成功【 message 】);// 5.关闭通道和连接channel.close();connection.close();} }1. 正式建立连接connection管理界面就会有连接的信息  2. 连接正式建立后就会创建通道channel供消息的发送和接收使用 3. 根据通道向队列queue发送消息 4. 消息发送到队列后就关闭通道和连接发完就不管了解除了耦合 ①控制台 ② 管理的页面queue都表示消息已经成功发出去 Consumer消息的接收者 ①消费者就需要从队列中接收消息所以也会有创建连接工厂、准备参数、创建通道等操作这些代码不变 ②值得注意的是在这里我们又创建了队列这是为什么呢这是因为我们生产者和消费者的启动顺序是不同的万一消费者先启动找队列找不到怎办为了避免这种情况的发生都声明了对列。并且如果这个对列已经创建过了不会再次创建 ③下面实际上就相当于回调函数调用basicConsume方法表示消费一条消息那么去干什么呢什么行为这里就采用了匿名内部类对象DefaultConsumer默认的消费者重写了handleDelivery方法处理投递的消息把处理的行为挂载到队列queueName当中一旦消息队列中有了消息这个回调函数就会执行。 package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.2.129);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(itcast);factory.setPassword(123321);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){Override public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message new String(body); // 发的时候是字节接的时候也必须是字节这里在转换为字符串System.out.println(接收到消息【 message 】);}});System.out.println(等待接收消息。。。。);} }此时的执行结果 此时先打印的是 ”等待接收消息。。。。” 实际上这就是回调机制前面的代码只是让回调函数和队列进行绑定此时的消息还没过来代码会继续执行一直到MQ把消息投递过来才会打印。这也再次证明了是异步的 一旦消息被消费队列中的就会被删除 三SpringAMQP 前面我们使用官方的API实现了简单的MQ程序但是发现程序非常的麻烦接下来就学习一下SpringAMQP大大简化了消息的发送和接收。 什么是SpringAMQP SpringAmqp的官方地址Spring AMQP是应用间消息通信的一种协议与语言平台无关。 AMQP在学习SpringAMQP之前需要先了解一下AMQPAdvanced Message Queuing Protocol高级消息队列协议是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关更符合微服务中独立性的要求。 Spring AMQP是基于AMQP协议定义的一套API规范提供了模板来发送和接收消息。包含两部分其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。 ①用于异步处理入站消息的监听器容器 ②用于发送和接收消息的 RabbitTemplate ③RabbitAdmin 实现自动化的声明队列、交换和绑定自动创建队列 接下来就是用SpringAMQP实现消息队列的五种类型 1. Basic Queue 简单队列模型 案例利用SpringAMQP实现HelloWorld中的基础消息队列功能 第一步在父工程中引入spring-amqp的依赖 !--AMQP依赖包含RabbitMQ-- dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependency !--SpringBoot的单元测试依赖-- dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId /dependency 第二步在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列 ①在publisher服务中编写application.yml添加mq连接信息 以配置的方式制定建立连接的一些信息。 spring:rabbitmq:host: 192.168.2.129 # IP地址port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码 ②在publisher服务中新建一个测试类编写测试方法 直接使用RabbitTemplate工具类发送信息即可。 注springamqd不会帮你创建队列只能存在已有的队列中所以要自己提前在浏览器的控制页面上创建这个对列 package cn.itcast.mq.spring;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class) SpringBootTest public class SpringAmqpTest {// 注入RabbitTemplateAutowiredprivate RabbitTemplate rabbitTemplate;// 调用工具类的方法Testpublic void testSendMessageSimpleQueue(){// 第一个参数队列的名称String queueName simple2.queue;// 第二个参数消息String message hello SpringAMQP!;rabbitTemplate.convertAndSend(queueName,message);} }成功发送 第三步在consumer服务中编写消费逻辑绑定simple.queue这个队列进行监听 ①在consumer服务中编写application.yml添加mq连接信息 spring:rabbitmq:host: 192.168.2.129port: 5672virtual-host: /username: itcastpassword: 123321 ②在consumer服务中新建一个类添加Component注解类中声明方法添加RabbitListener注解编写消费逻辑 package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component // 纳入Spring管理 public class SpringRabbitListener {// 声明监听那个队列RabbitListener(queues simple2.queue)// 行为封装成方法public void listenSimpleQueueMessage(String msg){ // Spring会把消息传递过来给msg参数System.out.println(消费者接收到的消息是msg);} }运行主函数启动上面的Bean 2. Work Queue 工作队列模型 前面已经学习了简单队列的发送和接收一旦有人拿到消息就会从队列中删除其它消费者根本拿不到。那如果有多个消息怎么办呢就可以基于上述的特性让多个消费者合作处理。接下来就学习一下Work queue(工作队列)可以提高消息处理速度避免队列消息堆积。 案例模拟WorkQueue实现一个队列绑定多个消费者 第一步在publisher服务中定义测试方法每秒产生50条消息发送到simple.queue package cn.itcast.mq.spring;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class) SpringBootTest public class SpringAmqpTest {// 注入RabbitTemplateAutowiredprivate RabbitTemplate rabbitTemplate;// 发送消息Testpublic void testSendMessageWorkQueue(){String queueName simple2.queue;String message hello---;// 利用for循环发送50条消息for (int i 1; i 50; i) {rabbitTemplate.convertAndSend(queueName,messagei);// 休眠20毫秒try {Thread.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}} }第二步在consumer服务中定义两个消息监听者都监听simple.queue队列 注消费者1每秒处理50条消息消费者2每秒处理10条消息。 package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.time.LocalDateTime;Component // 纳入Spring管理 public class SpringRabbitListener {// 消费者1RabbitListener(queues simple2.queue)// 行为封装成方法public void listenWorkQueue1Message(String msg){System.out.println(消费者1接收到的消息是msg LocalDateTime.now());// 每秒处理50条消息try {Thread.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 消费者2RabbitListener(queues simple2.queue)// 行为封装成方法public void listenWorkQueue2Message(String msg){System.out.println(消费者2接收到的消息是---》msgLocalDateTime.now());// 每秒处理10条消息try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}} }执行结果 理论上1秒处理完实际上却是2秒才处理完并没有做到能者多劳消费者1实际上在1秒内很快就处理完消息而消费者2因为能力不够却需要2秒。实际上这是因为MQ的预取机制才开始就优先从队列中拿过来并没有考虑到消费能力如何 第三步消费预取限制 修改application.yml文件设置preFetch这个值可以控制预取消息的上限 spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机 username: itcast # 用户名password: 123321 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息执行结果能者多劳可以在1秒内完成 3. 发布订阅模型-Fanout 发布 发布订阅模式 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者实现方式是加入了exchange交换机。到底是发给谁这是由交换机的类型决定的 ①Fanout广播 ②Direct路由  ③Topic话题 注意exchange负责消息路由而不是存储路由失败则消息丢失消息的存储是由队列完成的 发布订阅-Fanout Exchange Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue 案例利用SpringAMQP演示FanoutExchange的使用 第一步在consumer服务中声明队列(Queue)、交换机(Exchange)并将两者绑定(Binding) ①SpringAMQP提供了声明交换机、队列、绑定关系的API例如 ②在consumer服务创建一个类添加Configuration注解并声明FanoutExchange、Queue和绑定关系对象Binding代码如下 package cn.itcast.mq.config;import com.rabbitmq.client.impl.AMQImpl; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class FanoutConfig {// 声明交换机fanout.exchangeBeanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(fanout.exchange);}// 声明队列1 fanout.queue1Beanpublic Queue queue1(){return new Queue(fanout.queue1);}Bean// 声明队列2 fanout.queue2public Queue queue2(){return new Queue(fanout.queue2);}// 进行绑定Beanpublic Binding bindingQueue1(FanoutExchange fanoutExchange,Queue queue1){return BindingBuilder.bind(queue1).to(fanoutExchange);}Beanpublic Binding bindingQueue2(FanoutExchange fanoutExchange,Queue queue2){return BindingBuilder.bind(queue2).to(fanoutExchange);}}成功声明交换机 成功声明队列 绑定成功 第二步在consumer服务中编写两个消费者方法分别监听fanout.queue1和fanout.queue2 package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.time.LocalDateTime; import java.time.LocalTime;Component // 纳入Spring管理 public class SpringRabbitListener {// 声明监听那个队列RabbitListener(queues fanout.queue1)// 行为封装成方法public void listenFanoutQueueMessage1(String msg){ // Spring会把消息传递过来给msg参数System.out.println(消费者1接收到的消息是msg);}RabbitListener(queues fanout.queue2)// 行为封装成方法public void listenFanoutQueueMessage2(String msg){ // Spring会把消息传递过来给msg参数System.out.println(消费者2接收到的消息是msg);}}第三步在publisher中编写测试方法向交换机itcast.fanout发送消息 注以前是发送到queue现在是发送到exchange注意区别 package cn.itcast.mq.spring;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class) SpringBootTest public class SpringAmqpTest {// 注入RabbitTemplateAutowiredprivate RabbitTemplate rabbitTemplate;// 向交换机发送消息Testpublic void testSendFanoutExchange(){// 交换机String exchangeName fanout.exchange;// 信息String message Hello eyeryone;// 发送消息rabbitTemplate.convertAndSend(exchangeName,,message); // 中间参数routingKey后面会讲这里先设置为空} }执行结果 4. 发布订阅模型-Direct 发布 发布订阅-DirectExchange Direct Exchange 会将接收到的消息根据规则路由到指定的Queue因此称为路由模式。 ①每一个Queue都与Exchange设置一个BindingKey相当于暗号密码 ②发布者发送消息到Exchange时也要指定一个消息的RoutingKey与上面的BindingKey对上就发给谁 ③Exchange将消息路由到BindingKey与消息RoutingKey一致的队列并且一个队列能绑定多个key如果两个队列的BindingKey都能与RountingKey对上就都会发送就相当于广播 声明单个key 声明多个key  案例利用SpringAMQP演示DirectExchange的使用 第一步在consumer服务中编写两个消费者方法分别监听direct.queue1和direct.queue2 注前面使用Bean方式声明一个个类发现太麻烦了所以这里就学习一下使用利用RabbitListener注解声明Exchange、Queue、RoutingKey。 package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.time.LocalDateTime; import java.time.LocalTime;Component // 纳入Spring管理 public class SpringRabbitListener {// DirectExchange使用注解的形式绑定RabbitListener(bindings QueueBinding(value Queue(direct.queue1),exchange Exchange(name itcast.direct,type ExchangeTypes.DIRECT),key {red,blue}))public void LitenDirectQueue1(String msg){System.out.println(消费者接收到direct.queue1的消息:[msg]);}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name itcast.direct,type ExchangeTypes.DIRECT),key {red,yellow}))public void ListenDirectQueue2(String msg){System.out.println(消费者接收到direct.queue2的消息:[msg]);}}声明后启动查看控制页面成功绑定 第二步在publisher中编写测试方法向itcast. direct发送消息 package cn.itcast.mq.spring;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class) SpringBootTest public class SpringAmqpTest {// 注入RabbitTemplateAutowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendDirectExchange(){String exchangeName itcast.direct;String message hello blue;rabbitTemplate.convertAndSend(exchangeName,blue,message);} }此时RoutingKey为blue只有direct.quque1能接收到 如果此时RoutingKey为red Test public void testSendDirectExchange(){String exchangeName itcast.direct;String message hello red;rabbitTemplate.convertAndSend(exchangeName,red,message); } 则direct.quque1和direct.queue2都能接收到 总结所以相对于Fanout ExchangeDirect Exchange更加的灵活可以通过key这个标记把消息传递给某一个或者所有Fanout Exchange可以看做是Direct Exchange的一种特殊存在。 5. 发布订阅模型-Topic 发布 发布订阅-TopicExchange TopicExchange与DirectExchange类似区别在于routingKey必须是多个单词的列表并且以点 “ .” 分割。 例如china.news 代表中国的新闻消息 japan.news 则代表日本新闻。 Queue与Exchange指定BindingKey时可以使用通配符 ①#代指0个或多个单词 ②*代指一个单词 案例利用SpringAMQP演示TopicExchange的使用 第一步利用RabbitListener声明Exchange、Queue、RoutingKey 在consumer服务中编写两个消费者方法分别监听topic.queue1和topic.queue2 // topic ExchangeRabbitListener(bindings QueueBinding(value Queue(topic.queue1),exchange Exchange(name itcast.topic,type ExchangeTypes.TOPIC),key china.#))public void ListenTopicQueue1(String msg){System.out.println(消费者接收到topic.queue1的消息:[msg]);}RabbitListener(bindings QueueBinding(value Queue(topic.queue2),exchange Exchange(name itcast.topic,type ExchangeTypes.TOPIC),key #.news))public void ListenTopicQueue2(String msg){System.out.println(消费者接收到topic.queue2的消息:[msg]);} 成功声明与绑定  第二步在publisher中编写测试方法向itcast. topic发送消息 Testpublic void testSendTopicExchange(){String exchangeName itcast.topic;String message Its a nice day ;rabbitTemplate.convertAndSend(exchangeName,china.weath,message);} 此时是topic.queue1接收到消息 总结Topic Exchange和Direct Exchange的本质相同Topic Exchange可以指定通配符的方式来表达BindingKey相对于Direct Exchange灵活度又变高了。 6. 消息转换器 说明在SpringAMQP的发送方法中接收消息的类型实际上是Object也就是说我们可以发送任意对象类型的消息SpringAMQP会帮我们序列化为字节后发送。 案例测试发送Object类型消息 在consumer中利用Bean声明一个队列 package cn.itcast.mq.config;import com.rabbitmq.client.impl.AMQImpl; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class FanoutConfig {// 声明一个队列Beanpublic Queue objectQueue(){return new Queue(object.queue);} }发送一个Map集合到object.queue队列 package cn.itcast.mq.spring;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap; import java.util.Map;RunWith(SpringRunner.class) SpringBootTest public class SpringAmqpTest {// 注入RabbitTemplateAutowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendObjectQueue(){// 准备一个Map集合MapString,Object msg new HashMap();msg.put(name, Jack);msg.put(age, 21);// 发送rabbitTemplate.convertAndSend(object.queue,msg);} }执行结果最终的结果是通过JDK序列化转换成字节发送的 注JDK序列化性能比较差、安全性比较差容易出现注入的情况、数据长度太长了占用额外的内存空间。 消息转换器 ①Spring的对消息对象的处理是由org.springframework.amqp.support.converter. MessageConverter来处理的。而默认实现是SimpleMessageConverter基于JDK的ObjectOutputStream完成序列化 ②如果要修改只需要定义一个MessageConverter 类型的Bean即可推荐用JSON方式序列化步骤如下 第一步在父工程中引入jackson依赖 !--jackson依赖-- dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId /dependency 第二步在publisher启动类声明MessageConverter覆盖掉原来的配置 package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean;SpringBootApplication public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}// 覆盖原理的序列化方式Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();} }执行结果成功转换成Json格式 第三步在consumer服务中MessageConverter并监听object.queue队列并消费消息 启动类声明MessageConverter覆盖掉原来的配置 package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean;SpringBootApplication public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}// 覆盖原理的序列化方式Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();} }消费object.queue队列的消息 RabbitListener(queues object.queue)public void ListenOjectQueue(MapString,Object msg){System.out.println(消费者接收到object.queue的消息:[msg]);} 成功消费
http://www.pierceye.com/news/211625/

相关文章:

  • 国外免费外贸网站dw网页制作教程个人网站
  • 西安建设局网站地址室内设计效果图一套方案
  • php 建网站电子商务网站建设项目规划书
  • 常熟建设局网站代理办营业执照的公司
  • 济南网站关键词优化公司如何制作网站赚钱
  • 长春旅游网站开发360投放广告怎么收费
  • 微信公众号做网站卖东西静态化网站的缺点
  • 网站空间购买今天的新闻头条最新消息
  • 网站制作教程图解怎么解压wordpress
  • 唐山市城市建设规划局网站腾讯云建设一个网站要多少钱
  • 邢台集团网站建设费用聚牛建设网站
  • 如何创建电子商务网站学校网站设计首页
  • 扬州建设投资集团网站世界总人口实时数据
  • 沧州制作网站食品商务网-网站建设
  • 0592 网站建设模板网站建设+百度
  • 请人做个网站多少钱免费商城app
  • 网站建设包括哪些方面?手游源码网站
  • 机关门户网站建设管理情况软件开发工具都有哪些
  • 官方网站建设专家磐石网络wordpress对应的id
  • 学生自做网站优秀作品徐州企业建站模板
  • 网络电子商务购物网站idc机房建设
  • 网站单页seo个人服务器网站备案
  • 装修队伍做网站做机票在线预订网站
  • 手机版企业网站php山西建设执业注册中心网站
  • 南充网站建设略奥科技凡科建站电话
  • 个人网站可以做自媒体吗手机网站建设需要多少钱
  • 网站 模板网站什么英文字体
  • 北京市朝阳区住房建设网站图片在线编辑网站
  • 柳州市诚信体系建设网站网站数据库网络错误
  • 微站网站vps lnmp wordpress