高端建站是什么,网站开发程序介绍,邢台做网站建设优化制作公司金信一条龙,wordpress 做表格概览#xff1a; MQ基本概念 RabbitMQ入门 基本工作模 1.MQ是什么#xff1f;
MQ:Message Queue, 存储消息的中间件#xff0c;是消息发送过程中的暂存容器#xff0c;主要用于解决分布式系统进程间的通信。
分布式系统通信的两种方式#xff1a;直接远程调用、借助第三… 概览 MQ基本概念 RabbitMQ入门 基本工作模 1.MQ是什么
MQ:Message Queue, 存储消息的中间件是消息发送过程中的暂存容器主要用于解决分布式系统进程间的通信。
分布式系统通信的两种方式直接远程调用、借助第三方间接通信
为什么要使用消息中间件 如有一个电商交易的场景用户下单之后调用库存系统减库存然后调用物流系统进行发货如果刚开始交易库存物流都是属于一个系统那么他们之间就是接口调用。但是随着系统的发展各个模块业务越来越庞大、业务逻辑越来越复杂这个时候就必然要做服务化和业务拆分。这个时候就需要考虑这些系统之间是如何交互的。首先想到的就是RPCRemote Procedure Call但是随着系统的发展可能一笔交易后序需要调用几十个接口位于不同系统的接口比如短信服务、邮件服务等等这个时候就需要消息中间件来解决问题了。消息中间件最突出的特点就是提供数据传输的可靠性和高效性主要解决分布式的系统数据传输需求 摘自为什么使用消息中间件_为什么用消息中间件-CSDN博客 使用MQ的优势
1应用解耦提高系统容错性和可维护性 整个系统耦合会导致系统容错性低、可扩展性低、可维护性低。解耦之后一个系统挂了其它系统不会有问题。容错性高、可扩展性高、可维护性高。
2异步提速提升用户体验和系统吞吐量. 如果一个订单系统同步执行则有 订单进数据库20ms调用子系统一300ms调用子系统二300ms调用子系统三300ms920ms,用户等待920ms 如果采用异步模式则有 订单进数据库20ms消息发到MQ5ms25ms用户等待25ms
3削峰填谷提高系统稳定性 请求瞬间增多每秒5000个但是子系统每秒只能处理1000请求。 削峰指的是提高子系统的稳定性。加入中间件后所有请求进入消息队列。帮子系统处理高并发的请求量。 填谷指的是大量的消息积压在MQ里。子系统每秒从MQ拉取1000个进行处理直到处理完所有积压的消息。
使用MQ的劣势
(1)系统可用性降低 引入的第三方插件|依赖越多系统稳定性越差。 如果MQ宕机业务功能就会收到影响。需要额外的工作来确保MQ高可用性。
(2)系统复杂性提高 没有MQ时系统间同步远程调用 引入MQ时通过MQ异步调用 引发的问题如何保证消息不被重复消费如何处理消息丢失如何保证消息传递的顺序性。
(3)一致性问题 消息A——MQ——B|C|D 如果BC处理成功D失败了如何保证数据一致性。
使用MQ应满足什么条件
生产者不需要消费者的反馈。消费者消费消息后返回值为空这才能使异步调用成为可能容许短暂的不一致用了有效果解耦、提速、削峰等超过引入MQ的管理成本
常见的MQ产品 2.RabbitMQ概述
RabbitMQ简介
2007年Rabbit技术公司 基于AMQP标准开发 Rabbit MQ1.0
使用Erlang语言一种专门为高并发、分布式系统开发的语言|电信领域使用广泛 AMQP协议
Advanced Message Queuing Protocol高级消息队列协议网络协议应用层协议的一个开放标准为面向消息的中间件设计。2006年AMQP规范发布类比HTTP基于此协议的客户端与消息中间件之间传递消息不受客户端/中间件产品、开发语言等限制。
RabbitMQ的组成
Broker中间件用于接收发信息的用用如RabbitMQ ServerVirtual host 虚拟机处于多租户和安全因素设计的把AMQP基本组件划分到一个虚拟的分组中类似于网络中的namespace 。 应用场景用户隔离 多个不同用户使用同一个RabbitMQ时可以划分出多个vhost每个用户在自己的vhost创建exchange/queue等。 Connection链接publisher/consumer和broker之间建立TCP链接。Channel connection内部的逻辑链接。作为一个轻量级Connection,极大减少了操作系统建立TCP链接的开销。 存在原因 如果每次访问MQ都建立链接消息量大的时候建立TCP链接开销非常大效率也低。 所以使用channel内部逻辑链接如果应用支持多线程通常每个thread独占一个channel通信。 AMQP method中 包含channel id帮助客户端和message broker识别channel 所以channel之间是完全隔离的。 Exchange 交换机message到达broker的第一站根据分发规则匹配查询表中的routing key, 分发消息到queue。 常见的类型 direct: point to point topic: publisher-subscribe fanout: multicast Queue消息队列消息最终被送到这里等待被消费。Binding 绑定exchange 和 queue之间的虚拟连接binding中包含routing key. Binding信息被保存在exchange的查询表中是message分发的依据。
RabbitMQ的工作模式 6种包含简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics主题模式、RPC远程调用模式。 RabbitMQ官网RabbitMQ Tutorials — RabbitMQ
补充 JMS:java消息服务Java Message Service应用程序接口是java平台关于面向消息的中间件API JMS是Java EE 规范的一种类比JDBC. 很多消息中间件都实现了JMS规范如ActiveMQ。RabbitMQ没有提供JMS的实现但开源社区有。
3.RabbitMQ的简单实现
RabbitMQ的安装
1.安装RabbitMQ软件
windows环境下安装RabbitMQ超详细_windows安装rabbitmq-CSDN博客
java依赖 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/version
/dependency 2.开启管理界面及配置
默认端口号5672图形化界面地址 http://127.0.0.1:15672登陆名 guest 密码guest 配置文件 3.启动服务及基础配置 RabbitMQ简单模式
RabbitMQ生产者
package org.example.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SimpleProducer {public void publishMessage() throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(127.0.0.1);//ip 默认值localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost(/itcast);//虚拟机 默认值/factory.setUsername(heima);factory.setPassword(heima);//3.创建链接 ConnectionConnection connectionfactory.newConnection();//4.创建ChannelChannel channelconnection.createChannel();//5.创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments)queue: 队列名称durable: 是否持久化是的话mq重启后数据还在。exclusive:是否独占只能有一个消费者监听队列当connection关闭时是否删除队列autoDelete: 是否自动删除当没有consumer时自动删除。arguments: 参数*/channel.queueDeclare(hello_world,true,false,false,null);//6.发送消息/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) texchange:交换机名称简单模式下交换机会使用默认的“”routingKey: 路由名称props: 配置消息body: 发送消息数据*/String bodyhello rabbitMQ!;channel.basicPublish(,hello_world,null,body.getBytes());//7.释放资源channel.close();connection.close();}
}生产者生产一个消息 RabbitMQ消费者
package org.example.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SimpleConsumer {public static void main(String[] args) throws IOException, TimeoutException {SimpleConsumer producernew SimpleConsumer();producer.consumerMessage();}public void consumerMessage() throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(127.0.0.1);//ip 默认值localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost(/);//虚拟机 默认值/factory.setUsername(guest);factory.setPassword(guest);//3.创建链接 ConnectionConnection connectionfactory.newConnection();//4.创建ChannelChannel channelconnection.createChannel();//5.创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments)queue: 队列名称durable: 是否持久化是的话mq重启后数据还在。exclusive:是否独占只能有一个消费者监听队列当connection关闭时是否删除队列autoDelete: 是否自动删除当没有consumer时自动删除。arguments: 参数*/channel.queueDeclare(hello_world,true,false,false,null);//6.接收消息/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) texchange:交换机名称简单模式下交换机会使用默认的“”routingKey: 路由名称props: 配置消息body: 发送消息数据*/Consumer consumer new DefaultConsumer(channel){/*此处重写该方法是为了打印回调结果回调方法,收到回调方法后,会自动执行该方法consumerTag: 标识envelope: 获取一些信息交换机路由keyproperties: 配置信息body: 数据*/Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(Exchange:envelope.getExchange());System.out.println(RoutingKey:envelope.getRoutingKey());System.out.println(properties:properties);System.out.println(body:new String(body));}};channel.basicConsume(hello_world,true,consumer);//7.不需要释放资源因为要监听队列}
}总结 P生产者也就是要发送消息的程序。
C: 消费者消息的接收者监听队列等待消息到来。
Queue: 消息队列类似邮箱可以缓存消息生产者向其中投递消息消费者从中取出消息。 4.5种基本工作模式
(1)简单模式 Hello World 一个生产者一个队列一个消费者不需要设置交换机使用默认交换机
(2)工作队列模式Work Queue 特点 一个生产者一个队列多个消费者竞争关系不需要设置交换机使用默认交换机同一个消息对消费者来说是竞争关系只有一个消费者能消费。
应用场景对于任务过重或任务较多的情况使用工作队列可以提高任务处理速度。
例如短信服务部署多个只需要有一个节点发送成功即可。
发送端发布多条消息
//创建新的队列channel.queueDeclare(work_queues,true,false,false,null);
//发多条消息String bodyhello rabbitMQ!;for(int i0;i10;i){channel.basicPublish(,hello_world,null,(i-----body).getBytes());}
接收端多个消费者竞争消费
创建两个消费者consumer1和consumer2先启动两个消费者再启动生产者生产消息观察到两个消费者的消费过程。
消费者需要修改的地方监听新的队列
channel.basicConsume(work_queues,true,consumer);
consumer1: consumer2: (3)发布Publish/subscribe 需要设置类型为fanout的交换机并且交换机和队列进行绑定当消息发送到交换机后交换机将消息发送到绑定队列。 订阅模式多了一个交换机概念Exchange,且过程略有变化
P: 消息生产者消息发送给交换机
C消息的接收者监听消息队列等待消息到来
Queue: 消息队列接收消息缓存消息。
Exchange: 交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失。
发送端引入交换机并对交换机和队列进行绑定
创建交换机、两个队列、绑定交换机和队列发送消息并释放资源
其中交换机类型BuiltinExchangeType枚举
关键代码
//4.创建ChannelChannel channelconnection.createChannel();//定义交换机/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, MapString, Object arguments) throws IOException {exchange: 交换机名称type: 交换机类型DIRECT(direct),:定向FANOUT(fanout),:扇形广播发送消息到每一个消费者TOPIC(topic),:通配符的方式HEADERS(headers);:参数匹配durable: 是否持久化autoDelete: 自动删除internal: 内部使用一般falsearguments: 参数*/String exchangeNametest_fanout;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//5.创建队列QueueString queueName1test_fanout_queue1;String queueName2test_fanout_queue2;channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//绑定交换机/*queueBind(String queue, String exchange, String routingKey, MapString, Object arguments)queue:队列名称exchange:交换机名称routingKey:路由键绑定规则如果交换机的类型为fanout,routingKey设置为“”arguments:参数*/channel.queueBind(queueName1,exchangeName,);channel.queueBind(queueName2,exchangeName,);//6.发送消息String body日志信息 张三调用了FindAll方法...日志级别info...;channel.basicPublish(exchangeName,,null,body.getBytes());//7.释放资源channel.close();connection.close(); 接收端创建两个消费者分别监听两个队列
consumer1监听queue1,consumer2监听queue2
//Consumer1:
channel.basicConsume(test_fanout_queue1,true,consumer);//Consumer2:
channel.basicConsume(test_fanout_queue2,true,consumer);启动生产者消息生产正常启动消费者1启动消费者2两个消费者都收到了消息。
consumer1: consumer2: (4)路由模式Routing 需要设置类型为direct的交换机交换机和队列进行绑定并且指定routing key, 当发送消息到交换机后交换机会根据routing key 将消息发送到对应的队列。 队列与交换机的绑定不能是任意绑定而是根据routing key绑定队列消息根据绑定来决定分发到哪个队列中。
发送端交换机绑定队列时指定路由模式
注意交换者的类型为BuiltinExchangeType.TOPIC
关键代码 //4.创建ChannelChannel channelconnection.createChannel();String exchangeNametest_topic;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);//5.创建队列QueueString queueName1test_routing_queue1;String queueName2test_routing_queue2;channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//绑定交换机/*queueBind(String queue, String exchange, String routingKey, MapString, Object arguments)queue:队列名称exchange:交换机名称routingKey:路由键绑定规则如果交换机的类型为fanout,routingKey设置为“”arguments:参数*/channel.queueBind(queueName1,exchangeName,error);channel.queueBind(queueName2,exchangeName,info);channel.queueBind(queueName2,exchangeName,error);channel.queueBind(queueName2,exchangeName,warning);//6.发送消息String body日志信息 张三调用了FindAll方法...日志级别info...;channel.basicPublish(exchangeName,info,null,(info:\tbody).getBytes());channel.basicPublish(exchangeName,error,null,(error:\tbody).getBytes());channel.basicPublish(exchangeName,warning,null,(warning:\tbody).getBytes());
接收端创建Consumer1类Consumer2类。
consumer1 监听队列1 consumer2监听队列2
//consumer1
channel.basicConsume(test_routing_queue1,true,consumer);//consumer2
channel.basicConsume(test_routing_queue2,true,consumer);
启动生产者生产正常启动消费者1启动消费者2
consumer1: consumer2: (5)通配符模式Topic 需要设置类型为topic的交换机交换机和队列进行绑定并且指定通配符方式的routing key, 当发送消息到交换机后交换机会根据routing key将消息发送到对应的队列。
应用场景如像根据日志级别监听某个子系统 系统名.error 消息并入库
发送者
指定路由方式但是路由方式以通配符匹配的形式存在: #匹配一个 *匹配多个 channel.queueBind(queueName1,exchangeName,#.error);channel.queueBind(queueName2,exchangeName,order.*);channel.queueBind(queueName2,exchangeName,*.*);//6.发送消息String body日志信息 张三调用了FindAll方法...日志级别info...;channel.basicPublish(exchangeName,order.info,null,(order.info:\tbody).getBytes());channel.basicPublish(exchangeName,order.error,null,(order.error:\tbody).getBytes());channel.basicPublish(exchangeName,A.error,null,(A.error:\tbody).getBytes());
接收端创建Consumer1类Consumer2类。
consumer1监听队列1,consumer2监听队列2
//Consumer1
channel.basicConsume(test_topic_queue1,true,consumer);//Consumer2
channel.basicConsume(test_topic_queue2,true,consumer);
consumer1: consumer2: