辽阳企业网站建设,广州市网站建设报价,登别的网站应怎么做,昆明网站排名优化公司哪家好RabbitMQ消息队列快速入门
初始MQ
MQ全称为Message Queue#xff0c;即消息队列#xff0c;是在消息的传输过程中保存消息的容器。它是典型的生产者-消费者模型。 生产者不断向消息队列中生产消息#xff0c;消费者不断的从队列中获取消息。消息的生产和消费都是异步的即消息队列是在消息的传输过程中保存消息的容器。它是典型的生产者-消费者模型。 生产者不断向消息队列中生产消息消费者不断的从队列中获取消息。消息的生产和消费都是异步的可以解耦发送者和接收者之间的通信提高系统的可扩展性和可靠性。
技术选型
目比较常见的MQ实现有
ActiveMQRabbitMQRocketMQKafka
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQPXMPPSMTPSTOMPOpenWire,STOMPREST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息通信中间件官网地址 Messaging that just works — RabbitMQ
安装
基于Docker来安装RabbitMQ docker pull rabbitmq 运行
docker run \-e RABBITMQ_DEFAULT_USERdaybreak \-e RABBITMQ_DEFAULT_PASS123456 \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq在安装命令中有两个映射的端口
15672RabbitMQ提供的管理控制台的端口5672RabbitMQ的消息发送处理接口
运行成功后访问http://ip:15672输入username和password即可进入管理控制台。
RabbitMQ架构 publisher生产者也就是发送消息的一方consumer消费者也就是消费消息的一方queue队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理exchange交换机负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。virtual host虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的exchange、queue
Spring AMQP
由于RabbitMQ采用了AMQP协议因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。 但是RabbitMQ官方提供的Java客户端编码相对复杂一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具Spring AMQP。
SpringAmqp的官方地址Spring AMQP
SpringAMQP提供了三个功能
自动声明队列、交换机及其绑定关系基于注解的监听器模式异步接收消息封装了RabbitTemplate工具用于发送消息
交换机 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失
交换机的类型有四种
Fanout广播将消息交给所有绑定到交换机的队列。Direct订阅基于RoutingKey路由key发送给订阅了消息的队列。Topic通配符订阅与Direct类似只不过RoutingKey可以使用通配符。Headers头匹配基于MQ的消息头匹配用的较少。
声明队列和交换机
基于Bean方式声明
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class FanoutConfiguration {/*** 声明交换机* return*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(daybreak.fanout);}/*** 声明队列* return*/Beanpublic Queue fanoutQueue(){return new Queue(fanout.queue);}/*** 绑定队列和交换机* param fanoutQueue3* param fanoutExchange* return*/Beanpublic Binding FanoutBinding3(Queue fanoutQueue, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}
}基于注解声明
声明Direct模式的交换机和队列
RabbitListener(bindings QueueBinding(value Queue(name direct.queue, durable true),exchange Exchange(name daybreak.direct, type ExchangeTypes.DIRECT),key {red, yellow}
))
public void listenDirectQueue(String msg){System.out.println(消费者收到了direct.queue的消息 msg);
}声明Topic模式的交换机和队列
RabbitListener(bindings QueueBinding(value Queue(name topic.queue),exchange Exchange(name daybreak.topic, type ExchangeTypes.TOPIC),key #.news
))
public void listenTopicQueue(String msg){System.out.println(消费者接收到topic.queue的消息【 msg 】);
}快速入门
导入依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency添加配置
在application.yml中添加配置
spring:rabbitmq:host: 192.168.200.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: daybreak # 用户名password: 123456 # 密码配置JSON转换器
Spring的消息发送代码接收的消息体是一个Object 在数据传输时它会把发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。 然而默认情况下Spring采用的序列化方式是JDK序列化。
JDK序列化存在下列问题
数据体积过大有安全漏洞可读性差
显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化和反序列化。
使用JSON方式序列化需要引入以下依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency注意如果项目中引入了spring-boot-starter-web依赖则无需再次引入Jackson依赖。
配置消息转换器在服务的启动类中添加一个Bean即可
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}接收端
package com.itheima.consumer.listeners;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class MyListener {RabbitListener(bindings QueueBinding(value Queue(name daybreak.queue, durable true),exchange Exchange(name daybreak.direct, type ExchangeTypes.DIRECT),key demo))public void listenDirectQueue(String msg){System.out.println(消费者收到了direct.queue的消息 msg);}
}
发送端
package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class MyPublisher {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void myTest(){String exchangeName daybreak.direct;String routingKey demo;String msg Hello,RabbitMQ;rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);}
}启动发送端和接收端后运行结果如下 消费者收到了direct.queue的消息Hello,RabbitMQ