坑梓网站建设方案,网络编程技术及应用,做网站 广告费 步骤,郑州建设银行官网站MQ简介
MQ#xff1a;MessageQueue#xff0c;消息队列。是在互联网中使用非常广泛的一系列服务中间件。
Message#xff1a;消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上#xff0c;也可以分布在不同机器上。#xff08;数据形式#xff1a…MQ简介
MQMessageQueue消息队列。是在互联网中使用非常广泛的一系列服务中间件。
Message消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上也可以分布在不同机器上。数据形式二进制压缩数据、RPC、http都属于进程间通讯的机制
Queue队列。队列原意是指一种具有FIFO(先进先出)特性的数据结构是用来缓存数据的。对于消息中间件产品来说能不能保证FIFO特性尚值得考量。但是所有消息队列都是需要具备存储消息让消息排队的能力。
作用 异步提高系统的响应速度、吞吐量。 解耦减少服务之间的影响。提高系统整体的稳定性以及可扩展性。另外解耦后可以实现数据分发。生产者发送一个消息后可以由一个或者多个消费者进行消费并且消费者的增加或者减少对生产者没有影响。 消峰以稳定的系统资源应对突发的流量冲击。 RocketMQ产品特点
RocketMQ介绍
RocketMQ是阿里巴巴开源的一个消息中间件在阿里内部历经了双十一等很多高并发场景的考验能够处理亿万级别的消息。2016年开源后捐赠给Apache现在是Apache的一个顶级项目。
早期阿里使用ActiveMQ但是当消息开始逐渐增多后ActiveMQ的IO性能很快达到了瓶颈。于是阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时由于Partition文件也会过多这就会加大文件索引的耗时会严重影响IO性能。于是阿里才决定自研中间件最早叫做MetaQ后来改名成为RocketMQ。最早他所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进RocketMQ开始体现出一些不一样的优势。 RocketMQ特点
当今互联网MQ产品众多其中影响力和使用范围最大的当数Apache Kafka、RabbitMQ、Apache RocketMQ以及Apache Plusar。这几大产品虽然都是典型的MQ产品但是由于设计和实现上的一些差异造成他们适合于不同的细分场景。
优点缺点适合场景Apache Kafka吞吐量非常大性能非常好集群高可用。会有丢数据的可能功能比较单一日志分析、大数据采集RabbitMQ消息可靠性高功能全面。erlang语言不好定制。吞吐量比较低。企业内部小规模服务调用Apache Pulsar基于Bookeeper构建消息可靠性非常高。周边生态还有差距目前使用的公司比较少。企业内部大规模服务调用Apache RocketMQ高吞吐、高性能、高可用。功能全面。客户端协议丰富。使用java语言开发方便定制。服务加载比较慢。几乎全场景特别适合金融场景
其中RocketMQ孵化自阿里巴巴。历经阿里多年双十一的严格考验RocketMQ可以说是从全世界最严苛的高并发场景中摸爬滚打出来的过硬产品也是少数几个在金融场景比较适用的MQ产品。从横向对比来看RocketMQ与Kafka和RabbitMQ相比。RocketMQ的消息吞吐量虽然和Kafka相比还是稍有差距但是却比RabbitMQ高很多。在阿里内部RocketMQ集群每天处理的请求数超过5万亿次支持的核心应用超过3000个。而RocketMQ最大的优势就是他天生就为金融互联网而生。他的消息可靠性相比Kafka也有了很大的提升而消息吞吐量相比RabbitMQ也有很大的提升。另外RocketMQ的高级功能也越来越全面广播消费、延迟队列、死信队列等等高级功能一应俱全甚至某些业务功能比如事务消息已经呈现出领先潮流的趋势。 RocketMQ快速实战
快速搭建RocketMQ服务
RocketMQ的官网地址 RocketMQ · 官方网站 | RocketMQ
下载页面地址下载 | RocketMQ
当前最新的版本是5.x这是一个着眼于云原生的新版本给 RocketMQ 带来了非常多很亮眼的新特性。但是目前来看企业中用得还比较少。因此我们这里采用的还是更为稳定的4.9.5版本。 注在2020年下半年RocketMQ新推出了5.0的大版本这对于RocketMQ来说是一个里程碑式的大版本。在这个大版本中RocketMQ对整体功能做了一次大的升级。增加了很多非常有用的新特性也对已有功能重新做了升级。 比如在具体功能方面在4.x版本中对于定时消息只能设定几个固定的延迟级别而5.0版本中已经可以指定具体的发送时间了。在客户端语言方面4.x版本RocketMQ原生只支持基于Netty框架的Java客户端。而在5.0版本中增加了对Grpc协议的支持这基本上就解除了对客户端语言的限制。在服务端架构方面4.x版本只支持固定角色的普通集群和可以动态切换角色的Dledger集群而在5.0版本中增加了Dledger Controller混合集群模式即可以混合使用Dledger的集群机制以及 Broker 本地的文件管理机制。 但是功能强大同时也意味着问题会很多。所以目前来看企业中直接用新版本的还比较少。小部分使用新版本的企业也大都是使用内部的改造优化版本。 这里下载的是这个版本 上传到服务器并解压unzip rocketmq-all-4.9.5-bin-release.zip RocketMQ建议的运行环境需要至少12G的内存这是生产环境比较理想的资源配置。但是我买的云服务器是2核4g所以需要修改启动配置:set number临时显示行号 注意生产环境不建议修改上面两个配置。
RocketMQ是基于Java开发的所以依赖Java开发环境安装JDK步骤省略建议采用1.8版本
RocketMQ的后端服务分为nameserver和broker两个服务
# 第一步启动nameserver服务进入安装目录执行命令
nohup bin/mqnamesrv
# 是否启动成功可以通过jps检查启动成功或失败可以查看nohup.out文件# 为了方便测试在conf/broker.conf文件添加配置
autoCreateTopicEnabletrue
# 注意如果是云服务器还需要额外添加一行配置
brokerIP1 你的公网IP# 第二步启动broker服务进入安装目录执行命令
nohup bin/mqbroker
注意
1、在实际服务部署时通常会将RocketMQ的部署地址添加到环境变量当中。例如使用vi ~/.bash_profile指令添加以下内容
export ROCKETMQ_HOME/home/rocket/rocketmq-all-4.9.5-bin-release // 修改为你的安装目录
PATH$ROCKETMQ_HOME/bin:$PATH
export PATH
2、停止RocketMQ服务可以通过mqshutdown指令进行停止服务有短暂延迟不建议kill杀进程
mqshutdown namesrv # 关闭nameserver服务
mqshutdown broker # 关闭broker服务 快速实现消息收发
1、命令行快速实现消息收发
第一步需要配置一个环境变量NAMESRV_ADDR指向之前启动的nameserver服务。
通过vi ~/.bash_profile添加以下配置。然后使用source ~/.bash_profile让配置生效。
export NAMESRV_ADDRlocalhost:9876
修改后文件 第二步通过指令启动RocketMQ的消息生产者发送消息。默认往RocketMQ中发送1000条消息
tools.sh org.apache.rocketmq.example.quickstart.Producer ...消息发送日志
SendResult [sendStatusSEND_OK, msgId7F0000018FBA1B6D358697CBE7FB03E7, offsetMsgIdC0A800DA00002A9F000000000005DA64, messageQueueMessageQueue [topicTopicTest, brokerNamehcss-ecs-3744, queueId3], queueOffset499]
11:25:22.820 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.0.218:10911] result: true
第三步可以启动消息消费者接收之前发送的消息
tools.sh org.apache.rocketmq.example.quickstart.Consumer...消息消费日志
ConsumeMessageThread_please_rename_unique_group_name_4_15 Receive New Messages: [MessageExt [brokerNamehcss-ecs-3744, queueId2, storeSize192, queueOffset199, sysFlag0, bornTimestamp1701312827986, bornHost/192.168.0.218:32850, storeTimestamp1701312827987, storeHost/192.168.0.218:10911, msgIdC0A800DA00002A9F00000000000256D2, commitLogOffset153298, bodyCRC748130833, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topicTopicTest, flag0, properties{MIN_OFFSET0, MAX_OFFSET250, CONSUME_START_TIME1701314617997, UNIQ_KEY7F0000018E561B6D358697AEFE52031F, CLUSTERDefaultCluster, TAGSTagA}, body[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 57, 57], transactionIdnull}]]
注意这个Consumer消费者的指令并不会主动结束他会继续挂起等待消费新的消息。可以使用CTRLC停止该进程。 2、搭建Maven客户端项目
第一步创建一个标准的maven项目在pom.xml中引入以下核心依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.9.5/version
/dependency
第二步就可以直接创建一个简单的消息生产者
public class Producer
{public static void main(String[] args)throws MQClientException, InterruptedException{// 初始化一个消息生产者DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// 指定nameserver地址producer.setNamesrvAddr(192.168.232.128:9876);// 启动消息生产者服务producer.start();for (int i 0; i 2; i){try{// 创建消息。消息由Topic,Tag和body三个属性组成其中Body就是消息内容Message msg new Message(TopicTest, TagA, (Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息获取发送结果SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}catch (Exception e){e.printStackTrace();Thread.sleep(1000);}}// 消息发送完后停止消息生产者服务。producer.shutdown();}
}
注意对于生产者需要指定对应的nameserver服务的地址这个地址需要指向你自己的服务器。
第三步创建一个消息消费者接收RocketMQ中的消息。
public class Consumer
{public static void main(String[] args)throws InterruptedException, MQClientException{// 构建一个消息消费者DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name_4);// 指定nameserver地址consumer.setNamesrvAddr(192.168.232.128:9876);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题这个话题需要与消息的topic一致consumer.subscribe(TopicTest, *);// 注册一个消息回调函数消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently(){Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context){msgs.forEach(messageExt - {try{System.out.println(收到消息: new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));}catch (UnsupportedEncodingException e){}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者服务consumer.start();System.out.print(Consumer Started);}
}
注意对于消费者同样需要指定nameserver的地址另外消费者需要在RocketMQ中订阅具体的Topic只有发送到这个Topic上的消息才会被这个消费者接收到 生产消费报错RemotingTooMuchRequestException: sendDefaultImpl call timeout
解决方法
1、在conf/broker.conf 中加入配置
namesrvAddr 你的公网IP:9876
brokerIP1 你的公网IP
2、重启broker启动命令指定配置文件
nohup mqbroker -n localhost:9876 -c conf/broker.conf
重启完成上面的生产者消费者测试代码通过