长春鸿祥建设有限公司网站,php学多久可以做网站,设计专业的网址,微信朋友圈广告30元 1000次目录
一、RocketMQ
0、RocketMQ的产品发展
1、RocketMQ安装
1.1、windows下的安装
注意事项
1.2、Linux下的安装
1.3、源码的安装
1.4、控制台
2、消息发送方式
2.1、发送同步消息
2.2、发送异步消息
2.3、单向发送
3、消息消费方式
3.1、负载均衡模式#xff0…目录
一、RocketMQ
0、RocketMQ的产品发展
1、RocketMQ安装
1.1、windows下的安装
注意事项
1.2、Linux下的安装
1.3、源码的安装
1.4、控制台
2、消息发送方式
2.1、发送同步消息
2.2、发送异步消息
2.3、单向发送
3、消息消费方式
3.1、负载均衡模式集群消费
3.2、广播消费 一、RocketMQ
0、RocketMQ的产品发展 MetaQ2011年阿里基于Kafka的设计使用Java完全重写并推出了MetaQ 1.0版本 。 2012年阿里对MetaQ的存储进行了改进推出MetaQ 2.0同年阿里把Meta2.0从阿里内部开源出来取名RocketMQ为了命名上的规范以及版本上的延续对外称为RocketMQ3.0。 2016年阿里宣布将开源分布式消息中间件RocketMQ捐赠给Apache同时RocketMQ3也升级为RocketMQ4现在RocketMQ主要维护的是4.x的版本也是大家使用得最多的版本。 2021年RocketMQ在github上发布5.0预览版。RocketMQ 5.0定义为云原生的消息、事件、流的超融合平台。
RocketMQ源码链接
RocketMQ官网下载地址
1、RocketMQ安装
1.1、windows下的安装
1.确保安装好了JDK1.864位系统
2.解压运行版本(Binary)
3.配置环境变量
变量名ROCKETMQ_HOME 变量值MQ解压路径\MQ文件夹名 4.启动
在RocketMQ的架构中都是需要先启动NameServer再启动Broker的。所以先启动NameServer。启动NameServer 使用cmd命令框执行进入至‘MQ文件夹\bin’下然后执行‘start mqnamesrv.cmd’启动NameServer。成功后会弹出提示框此框勿关闭。 启动Broker 使用cmd命令框执行进入至‘MQ文件夹\bin’下然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnabletrue’启动Broker。成功后会弹出提示框此框勿关闭。
注意事项 弹出提示框‘错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_202\lib\tools.jar;C:\Program’的处理 打开‘MQ文件夹\bin’下的runbroker.cmd然后将‘%CLASSPATH%’加上英文双引号。保存并重新执行start语句。 再次启动 1.2、Linux下的安装
...
1.3、源码的安装
1.解压源码版本(Source)
2.导入idea中maven编译通过
3.创建目录D盘创建文件夹RocketMQ 1.把源码下的distribution下的conf文件夹拷贝到RocketMQ下面 2.再创建logs和store文件夹 4、启动RocketMQ源码
4.1、启动NameServer
namesrv工程下NamesrvStartup启动类启动前需要配置环境变量
ROCKETMQ_HOMED:\RocketMQ 4.2、启动Broker 在broker模块找到broker模块同时找到启动类BrokerStartup.java
需要修改配置文件broker.conf #配置如下
#nameServer
namesrvAddr127.0.0.1:9876
autoCreateTopicEnable true
storePathRootDir D:\\RocketMQ\\store
#commitLog存储路径
storePathCommitLog D:\\RocketMQ\\store\\commitlog
#消费队列存储路径
storePathConsumeQueue D:\\RocketMQ\\store\\consumequeue
#消息索引存储路径
storePathindex D:\\RocketMQ\\store\\index
#checkpoint文件存储路径
storeCheckpoint D:\\RocketMQ\\store\\checkpoint
#abort文件存储路径
abortFile D:\\RocketMQ\\store\\abort
配置环境变量
ROCKETMQ_HOMED:\RocketMQ
配置参数
-c D:\RocketMQ\conf\broker.conf 启动成功检查下数据文件 1.4、控制台
Rocketmq老版本下载
Rocketmq新版本下载
这里下载的新版本rocketmq-dashboard启动项目下面如下
浏览器中输入‘http://localhost:8080’成功后即可进行管理端查看。
运维页面 你可以修改这个服务使用的namesrv的地址 你可以修改这个服务是否使用VIPChannel(如果你的mq server版本小于3.5.8请设置不使用) 驾驶舱 查看broker的消息量总量/5分钟图 查看单一主题的消息量总量/趋势图 集群 查看集群的分布情况cluster与broker关系、broker 查看broker具体信息/运行信息 查看broker配置信息 主题页面 展示所有的主题可以通过搜索框进行过滤 筛选 普通/重试/死信 主题 添加/更新主题 clusterName 创建在哪几个cluster上 brokerName 创建在哪几个broker上 topicName 主题名 writeQueueNums 写队列数量 readQueueNums 读队列数量 perm //2是写 4是读 6是读写 状态 查询消息投递状态投递到哪些broker/哪些queue/多少量等 路由 查看消息的路由现在你发这个主题的消息会发往哪些broker对应broker的queue信息 CONSUMER管理这个topic都被哪些group消费了消费情况何如 topic配置查看变更当前的配置 发送消息向这个主题发送一个测试消息 重置消费位点(分为在线和不在线两种情况不过都需要检查重置是否成功) 删除主题 会删除掉所有broker以及namesrv上的主题配置和路由信息 消费者页面 展示所有的消费组可以通过搜索框进行过滤 刷新页面/每隔五秒定时刷新页面 按照订阅组/数量/TPS/延迟 进行排序 添加/更新消费组 clusterName 创建在哪几个集群上 brokerName 创建在哪几个broker上 groupName 消费组名字 consumeEnable //是否可以消费 FALSE的话将无法进行消费 consumeBroadcastEnable //是否可以广播消费 retryQueueNums //重试队列的大小 brokerId //正常情况从哪消费 whichBrokerWhenConsumeSlowly//出问题了从哪消费 终端 在线的消费客户端查看包括版本订阅信息和消费模式 消费详情 对应消费组的消费明细查看这个消费组订阅的所有Topic的消费情况每个queue对应的消费client查看包括Retry消息 配置 查看变更消费组的配置 删除 在指定的broker上删除消费组 生产者页面 通过Topic和Group查询在线的消息生产者客户端 信息包含客户端主机 版本 消息查询页面 根据Topic和时间区间查询由于数据量大 最多只会展示2000条多的会被忽略 根据Topic和Key进行查询 最多只会展示64条 根据消息主题和消息Id进行消息的查询 消息详情可以展示这条消息的详细信息查看消息对应到具体消费组的消费情况如果异常可以查看具体的异常信息。可以向指定的消费组重发消息 2、消息发送方式
2.1、发送同步消息
同步发送是指消息发送方发出数据后同步等待直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛比如重要的消息通知短信通知。
RocketMQ源码中的example模块的org.apache.rocketmq.example.quickstart.Producer
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static final int MESSAGE_COUNT 1000;public static final String PRODUCER_GROUP please_rename_unique_group_name;public static final String DEFAULT_NAMESRVADDR 127.0.0.1:9876;public static final String TOPIC TopicTest;public static final String TAG TagA;public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();for (int i 0; i MESSAGE_COUNT; i) {try {Message msg new Message(TOPIC /* Topic */,TAG /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
发送结果分析
SendResult [sendStatusSEND_OK, msgIdC0A800AB34BC18B4AAC228E6CFB90000, offsetMsgIdC0A8380100002A9F0000000000046F14, messageQueueMessageQueue [topicTopicTest, brokerNameMS-TGOOFNKABBOB, queueId3], queueOffset275]
SendResult [sendStatusSEND_OK, msgIdC0A800AB34BC18B4AAC228E6CFCD0001, offsetMsgIdC0A8380100002A9F0000000000047003, messageQueueMessageQueue [topicTopicTest, brokerNameMS-TGOOFNKABBOB, queueId0], queueOffset275]
SendResult [sendStatusSEND_OK, msgIdC0A800AB34BC18B4AAC228E6CFD10002, offsetMsgIdC0A8380100002A9F00000000000470F2, messageQueueMessageQueue [topicTopicTest, brokerNameMS-TGOOFNKABBOB, queueId1], queueOffset275]
SendResult [sendStatusSEND_OK, msgIdC0A800AB34BC18B4AAC228E6CFD30003, offsetMsgIdC0A8380100002A9F00000000000471E1, messageQueueMessageQueue [topicTopicTest, brokerNameMS-TGOOFNKABBOB, queueId2], queueOffset275]
SendResult [sendStatusSEND_OK, msgIdC0A800AB34BC18B4AAC228E6CFD50004, offsetMsgIdC0A8380100002A9F00000000000472D0, messageQueueMessageQueue [topicTopicTest, brokerNameMS-TGOOFNKABBOB, queueId3], queueOffset276]
SendResult [sendStatusSEND_OK, msgIdC0A800AB34BC18B4AAC228E6CFD60005, offsetMsgIdC0A8380100002A9F00000000000473BF, messageQueueMessageQueue [topicTopicTest, brokerNameMS-TGOOFNKABBOB, queueId0], queueOffset276]
SendResult [sendStatusSEND_OK, msgIdC0A800AB34BC18B4AAC228E6CFD90006, offsetMsgIdC0A8380100002A9F00000000000474AE, messageQueueMessageQueue [topicTopicTest, brokerNameMS-TGOOFNKABBOB, queueId1], queueOffset276
....
msgId 消息的全局唯一标识RocketMQ的ID生成是使用机器IP和消息偏移量的组成由消息队列 MQ 系统自动生成唯一标识某条消息。sendStatus 发送的标识成功失败等queueId queueId是Topic的分区Producer发送具体一条消息的时对应选择的该Topic下的某一个Queue的标识ID。queueOffset Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffsetqueueOffset是从0开始递增。
2.2、发送异步消息 异步消息通常用在对响应时间敏感的业务场景即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后不等接收方发回响应接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应并对响应结果进行处理。
RocketMQ源码中的example模块的org.apache.rocketmq.example.quickstart.Producer
package org.apache.rocketmq.example.simple;import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class AsyncProducer {public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {DefaultMQProducer producer new DefaultMQProducer(Jodie_Daily_test);producer.setNamesrvAddr(127.0.0.1:9876);producer.start();// suggest to on enableBackpressureForAsyncMode in heavy traffic, default is falseproducer.setEnableBackpressureForAsyncMode(true);producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount 100;final CountDownLatch countDownLatch new CountDownLatch(messageCount);for (int i 0; i messageCount; i) {try {final int index i;Message msg new Message(Jodie_topic_1023,TagA,OrderID188,Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf(%-10d OK %s %n, index, sendResult.getMsgId());}Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf(%-10d Exception %s %n, index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}
2.3、单向发送 这种方式主要用在不特别关心发送结果的场景例如日志发送。单向Oneway发送特点为发送方只负责发送消息不等待服务器回应且没有回调函数触发即只发送请求不等待应答。此方式发送消息的过程耗时非常短一般在微秒级别。
package org.apache.rocketmq.example.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;public class OnewayProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// Specify name server addresses.producer.setNamesrvAddr(localhost:9876);//Launch the instance.producer.start();for (int i 0; i 100; i) {//Create a message instance, specifying topic, tag and message body.Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(StandardCharsets.UTF_8) /* Message body */);//Call send message to deliver message to one of brokers.producer.sendOneway(msg);}//Wait for sending to completeThread.sleep(5000);producer.shutdown();}
}
3、消息消费方式
3.1、负载均衡模式集群消费 消费者采用负载均衡方式消费消息一个分组(Group)下的多个消费者共同消费队列消息每个消费者处理的消息不同。一个Consumer Group中的各个Consumer实例分摊去消费消息即一条消息只会投递到一个Consumer Group下面的一个实例。例如某个Topic有3个队列其中一个Consumer Group 有 3 个实例那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。 package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;public class Consumer {public static final String CONSUMER_GROUP please_rename_unique_group_name_4;public static final String DEFAULT_NAMESRVADDR 127.0.0.1:9876;public static final String TOPIC TopicTest;public static void main(String[] args) throws MQClientException {//实例化消息生产者指定组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CONSUMER_GROUP);//指定Namesrv地址信息consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅Topicconsumer.subscribe(TOPIC, *);//负载均衡模式消费(可以不设置默认就是负载均衡模式)consumer.setMessageModel(MessageModel.CLUSTERING);//注册回调函数处理消息consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) - {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();//启动消费者System.out.printf(Consumer Started.%n);}
}
3.2、广播消费 广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group消息也会被Consumer Group 中的每个Consumer都消费一次。实际上是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。 package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;public class Consumer {public static final String CONSUMER_GROUP please_rename_unique_group_name_4;public static final String DEFAULT_NAMESRVADDR 127.0.0.1:9876;public static final String TOPIC TopicTest;public static void main(String[] args) throws MQClientException {//实例化消息生产者指定组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CONSUMER_GROUP);//指定Namesrv地址信息consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅Topicconsumer.subscribe(TOPIC, *);//广播消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//注册回调函数处理消息consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) - {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();//启动消费者System.out.printf(Consumer Started.%n);}
}
消息消费时的权衡负载均衡模式适用场景注意事项 消费端集群化部署每条消息只需要被处理一次。 由于消费进度在服务端维护可靠性更高。 集群消费模式下每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理请使用广播模式。 集群消费模式下不保证每一次失败重投的消息路由到同一台机器上因此处理消息时不应该做任何确定性假设。广播模式适用场景注意事项 每条消息都需要被相同逻辑的多台机器处理。 消费进度在客户端维护出现重复的概率稍大于集群模式。 广播模式下消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次但是并不会对消费失败的消息进行失败重投因此业务方需要关注消费失败的情况。 广播模式下客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过请谨慎选择。 广播模式下每条消息都会被大量的客户端重复处理因此推荐尽可能使用集群模式。 目前仅 Java 客户端支持广播模式。 广播消费模式下不支持顺序消息。 广播消费模式下不支持重置消费位点。 广播模式下服务端不维护消费进度所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。 不是你觉的悟到的东西给了你你也接不住
干我们这行啥时候懈怠就意味着长进的停止长进的停止就意味着被淘汰只能往前冲直到凤凰涅槃的一天