有帮忙做儿童房设计的网站吗,怎么制作自己的网址,如何创建网站小程序,互联网培训学校哪个好RocketMQ 入门
视频地址#xff1a;
千锋教育RocketMQ全套视频教程#xff0c;快速掌握MQ消息中间件
什么是 MQ #xff1f;
Message Queue#xff08;消息 队列#xff09;#xff0c;从字面上理解#xff1a;首先它是一个队列。FIFO 先进先出的数据结构 —— 队列…RocketMQ 入门
视频地址
千锋教育RocketMQ全套视频教程快速掌握MQ消息中间件
什么是 MQ
Message Queue消息 队列从字面上理解首先它是一个队列。FIFO 先进先出的数据结构 —— 队列。
消息队列就是所谓的存放消息的队列。
消息队列解决的不是存放消息的队列的目的解决的是通信问题。 1、同步通信情况下 比如以电商订单系统为例如果各服务之间使用同步通信不仅耗时较久且过程中受到网络波动的影响不能保证高成功率。因此使用异步的通信方式对架构进行改造。 2、使用消息队列后。异步通信的情况下 使⽤异步的通信方式对模块间的调⽤进行解耦可以快速的提升系统的吞吐量。上游执行完消息的发送业务后立即获得结果下游多个服务订阅到消息后各⾃消费。通过消息队列屏蔽底层的通信协议使得解藕和并行消费得以实现。 RocketMQ 的基本概念
技术架构 RocketMQ 架构上主要分为四部分如上图所示 Producer消息发布的⻆⾊⽀持分布式集群⽅式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递投递的过程⽀持快速失败并且低延迟。 Consumer消息消费的角色支持分布式集群方式部署。⽀持以 push 推pull 拉两种模式对消息进行消费。同时也⽀持集群方式和广播方式的消费它提供实时消息订阅机制可以满足大多数用户的需求。 NameServerNameServer 是⼀个非常简单的 Topic 路由注册中心其⻆⾊类似 Dubbo 中的 zookeeper支持 Broker 的动态注册与发现。 主要包括两个功能Broker 管理NameServer 接收 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制检查 Broker 是否还存活路由信息管理每个 NameServer 将保存关于 Broker 集群的整个路由信息和⽤于客户端查询的队列信息。 然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息从⽽进⾏消息的投递和消费。 NameServer 通常也是集群的⽅式部署各实例间相互不进⾏信息通讯。因为 Broker 是向每⼀台 NameServer 注册⾃⼰的路由信息所以每⼀个 NameServer 实例上⾯都保存⼀份完整的路由信息。 当某个 NameServer 因某种原因下线了Broker 仍然可以向其它 NameServer 同步其路由信息Producer,Consumer 仍然可以动态感知 Broker 的路由的信息。 BrokerServerBroker 主要负责消息的存储、投递和查询以及服务高可用保证为了实现这些功能Broker 包含了以下几个重要⼦模块。 Remoting Module负责处理请求整个 Broker 的实体负责处理来自 clients 端的请求。Client Manager负责管理订阅信息负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息。Store Service提供⽅便简单的 API 接⼝处理消息存储到物理硬盘和查询功能。HA Service提供数据同步功能⾼可⽤服务提供 Master Broker 和 Slave Broker 之间的数据同步功能。Index Service索引服务根据特定的 Message key 对投递到 Broker 的消息进⾏索引服务以提供消息的快速查询。 简而言之 NameServer 服务器在消息发送时充当了路由信息的提供者帮助生产者找到正确的 Broker 来发送消息。它起到了注册和发现 Broker 的作用以确保消息能够准确地路由到目标位置。
当【生产者】发送消息时它会根据 Topic 选择合适的 Broker 来发送消息。Broker 负责存储和管理消息每个 Broker 上可以管理多个 Topic。因此Topic 是在 Broker 上创建和管理的而不是在 NameServer 服务器上。
部署架构 RocketMQ 网络部署特点 NameServer 是⼀个⼏乎⽆状态节点可集群部署节点之间⽆任何信息同步。 Broker 部署相对复杂Broker 分为 Master 与 Slave⼀个 Master 可以对应多个 Slave但是⼀个 Slave 只能对应⼀个 Master。 Master 与 Slave 的对应关系通过指定相同的 BrokerName不同的 BrokerId 来定义BrokerId 为 0 表示 Master⾮ 0 表示 Slave。Master 也可以部署多个。 每个 Broker 与 NameServer 集群中的所有节点建立长连接定时注册 Topic 信息到所有 NameServer。 注意当前 RocketMQ 版本在部署架构上⽀持⼀ Master 多 Slave但只有 BrokerId1 的从服务器才会参与消息的读负载。 Producer 与 NameServer 集群中的其中⼀个节点随机选择建⽴⻓连接定期从 NameServer 获取 Topic 路由信息并向提供 Topic 服务的 Master 建⽴⻓连接且定时向 Master 发送⼼跳。Producer 完全⽆状态可集群部署。 Consumer 与 NameServer 集群中的其中⼀个节点随机选择建⽴⻓连接定期从 NameServer 获取 Topic 路由信息并向提供 Topic 服务的 Master、Slave 建⽴⻓连接且定时向 Master、Slave 发送⼼跳。Consumer 既可以从 Master 订阅消息也可以从 Slave 订阅消息消费者在向 Master 拉取消息时Master 服务器会根据拉取偏移量与最⼤偏移量的距离判断是否读⽼消息产⽣读 I/O以及从服务器是否可读等因素建议下⼀次是从 Master 还是 Slave 拉取。 结合部署架构图描述集群⼯作流程 启动 NameServerNameServer 起来后监听端⼝等待 Broker、Producer、Consumer 连上来相当于⼀个路由控制中心。Broker 启动跟所有的 NameServer 保持⻓连接定时发送⼼跳包。 心跳包中包含当前 Broker 信息(IP端⼝等)以及存储所有 Topic 信息。注册成功后NameServer 集群中就有 Topic 跟 Broker 的映射关系。收发消息前先创建 Topic创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上也可以在发送消息时⾃动创建 Topic。 Producer 发送消息启动时先跟 NameServer 集群中的其中⼀台建⽴⻓连接并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上轮询从队列列表中选择⼀个队列然后与队列所在的 Broker 建⽴⻓连接从⽽向 Broker 发消息。Consumer 跟 Producer 类似跟其中⼀台 NameServer 建⽴⻓连接获取当前订阅的 Topic 存在哪些 Broker 上然后直接跟 Broker 建⽴连接通道开始消费消息。
快速开始
下载 RocketMQ
官网下载地址https://rocketmq.apache.org/zh/download/
1、运行版【二进制压缩包】 2、另一个是【源码】 – source 下载
安装 RocketMQ 准备⼀台装有Linux系统的虚拟机。需要关闭防火墙。 安装 jdk上传 jdk 安装包并解压缩在 /usr/local/java ⽬录下。 安装 rocketmq上传 rocketmq 安装包并使⽤ unzip 命令解压缩在 /usr/local/rocketmq ⽬录下。 # 进入指定目录
cd /usr/local
# 新建文件夹
mkdir rocketmq
# 解压压缩包
unzip [压缩包名]
# 删除压缩包
rm -rf [压缩包名]# -fforce强制删除否则会每个文件都询问是否删除
# -r表示递归移除文件夹的时候需要使用配置 jdk 和 rocketmq 的环境变量 1、进入配置文件 vim /etc/profile2、i 插入插入后ESC :wq 退出 export JAVA_HOME/usr/local/java/jdk1.8.0_171
export JRE_HOME/usr/local/java/jdk1.8.0_171/jre
export ROCKETMQ_HOME/usr/local/rocketmq/rocketmq-all-5.1.2-bin-release
export CLASSPATH$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$ROCKETMQ_HOME/bin:$PATH:$HOME/bin注意RocketMQ 的环境变量⽤来加载 ROCKETMQ_HOME/conf 下的配置⽂件 如果不配置则⽆法启动 NameServer 和 Broker。 3、使环境变量生效 source /etc/profile4、验证一下 javac5、修改 bin/runserver.sh ⽂件由于 RocketMQ 默认设置的 JVM 内存为 4G但虚拟机⼀般是 2G 内存因此调整为 512mb。 cd ./binvim runserver.sh修改前 修改后
启动 NameServer
1、在 bin ⽬录下使⽤静默⽅式启动
nohup ./mqnamesrv # 建议使用带名称空间的形式 -n 服务器地址:端口
nohup ./mqnamesrv -n 192.168.194.132:9876 2、查看是否启动成功
cat nohup.out启动 Broker
1、修改 broker 的 JVM 参数配置将默认 8G 内存修改为 512m。修改 bin/runbroker.sh ⽂件
vim runbroker.sh修改
JAVA_OPT${JAVA_OPT} -server -Xms8g -Xmx8g
# 改为
JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m2、conf/broker.conf ⽂件中加⼊如下配置开启⾃动创建 Topic 功能
vim broker.conf
# 开启⾃动创建 Topic 功能
autoCreateTopicEnabletrue3、以静默⽅式启动 broker
nohup ./mqbroker -n localhost:9876
# 可以使用 localhost但不建议
nohup ./mqbroker -n 192.168.194.132:9876
# 查看 bin/nohup.out ⽇志看是否开启成功
cat nohup.out使用发送和接收信息验证 MQ
1、配置 nameserver 的环境变量
在发送/接收消息之前需要告诉客户端 nameserver 的位置。配置环境变量 NAMESRV_ADDR
export NAMESRV_ADDRlocalhost:98762、使⽤ bin/tools.sh ⼯具验证消息的发送默认会发 1000 条消息发送的消息⽇志
./tools.sh org.apache.rocketmq.example.quickstart.Producer发送的消息日志
...
SendResult [sendStatusSEND_OK, msgIdAC110001E73F0133314B8998AB2503E7, offsetMsgIdC0A8C28400002A9F000000000003AC09, messageQueueMessageQueue [topicTopicTest, brokerNamelocalhost.localdomain, queueId2], queueOffset249]3、使⽤ bin/tools.sh ⼯具验证消息的接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer看到接收到的消息
...
ConsumeMessageThread_please_rename_unique_group_name_4_20 Receive New Messages: [MessageExt [brokerNamelocalhost.localdomain, queueId2, storeSize241, queueOffset232, sysFlag0, bornTimestamp1687857283710, bornHost/192.168.194.132:59242, storeTimestamp1687857283711, storeHost/192.168.194.132:10911, msgIdC0A8C28400002A9F0000000000036C05, commitLogOffset224261, bodyCRC1379786659, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topicTopicTest, flag0, properties{CONSUME_START_TIME1687857375912, MSG_REGIONDefaultRegion, UNIQ_KEYAC110001E73F0133314B8998AA7E03A3, CLUSTERDefaultCluster, MIN_OFFSET0, TAGSTagA, WAITtrue, TRACE_ONtrue, MAX_OFFSET250}, body[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 49], transactionIdnull}]]
关闭服务器
1、关闭 broker
./mqshutdown broker2、关闭 nameserver
./mqshutdown namesrv关闭前 关闭后 搭建 RocketMQ 集群
保存服务的 高性能、高可用。
RocketMQ 集群模式
为了追求更好的性能RocketMQ 的最佳实践方式都是在集群模式下完成。
RocketMQ 官⽅提供了三种集群搭建⽅式。
2 主 2 从异步通信方式
使用异步方式进行主从之间的数据复制吞吐量大但可能会丢消息因为异步的情况下从服务器 slave 可能会复制失败。
使用 conf/2m-2s-async 文件夹内的配置文件做集群配置。 2 主 2 从同步通信方式
使⽤同步方式进行主从之间的数据复制保证消息安全投递不会丢失但影响吞吐量因为此时相对于生产者来说进行了阻塞。
使用 conf/2m-2s-sync 文件夹内的配置文件做集群配置。
2 主 无 从方式
会存在单点故障且读的性能没有前两种⽅式好。
使⽤ conf/2m-noslave ⽂件夹内的配置⽂件做集群配置。
Dledger 高可用集群
上述三种官方提供的集群没办法实现⾼可用即在 master 节点挂掉后slave 节点没办法⾃动被选举为新的 master需要人工实现。
RocketMQ 在 4.5 版本之后引⼊了第三方的 Dleger 高可用集群。
搭建主从异步集群
旨在提供高可用性、数据冗余和负载均衡以确保消息系统的可靠性和性能。
1、准备三台 Linux 服务器
三台 Linux 服务器中 nameserver 和 broker 之间的关系如下
# 查看 ip 地址
ifconfig服务器服务器IPNameServerBroker节点部署服务器1192.168.194.133192.168.194.133:9876服务器2192.168.194.134192.168.194.134:9876broker-amaster,broker-b-sslave服务器3192.168.194.135192.168.194.135:9876broker-bmaster,broker-a-sslave 注意这是交叉部署 服务器 2 上的是broker-a 的主节点和 broker-b 的从节点服务器 3 上的是broker-b 的主节点和 broker-a 的从节点 三台服务器都需要安装 jdk 和 rocketmq安装步骤参考上⼀章节笔者用的是 CentOS 不需要安装配置不知道是不是这个原因 克隆步骤
第一步点击克隆 第二步创建链接克隆 2、启动三台 NameServer
nameserver 是一个轻量级的注册中心broker 把自己的信息注册到 nameserver 上。因为 nameserver 是无状态的所以直接启动即可。
三台 nameserver 之间不需要通信而是被请求方来关联三台 nameserver 的地址。 修改三台服务器的的 runserver.sh 文件 修改 JVM 内存默认的 4g 为 512m。
JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -
XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m在每台服务器的 bin 目录下执行如下命令
服务器 1
nohup ./mqnamesrv -n 192.168.194.133:9876 服务器 2
nohup ./mqnamesrv -n 192.168.194.134:9876 服务器 3
nohup ./mqnamesrv -n 192.168.194.135:9876 3、配置 Broker
两对主从节点在不同的服务器上服务器 1 上没有部署 broker。
需要修改每台 broker 的配置⽂件。
注意同⼀台服务器上的两个 broker 保存路径不能⼀样。 broker-a 的 master 节点 在服务器 2 上进⼊到 conf/2m-2s-async ⽂件夹内修改 broker-a.properties ⽂件 # 所属集群名称
brokerClusterNameDefaultCluster
# broker名字
brokerNamebroker-a
# broker所在服务器的ip注意
brokerIP1192.168.194.134
# broker的id0表示master0表示slave
brokerId0
# 删除⽂件时间点默认在凌晨4点
deleteWhen04
# ⽂件保留时间为48⼩时
fileReservedTime48
# broker的⻆⾊为master
brokerRoleASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskTypeASYNC_FLUSH
# 名称服务器的地址列表注意
namesrvAddr192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
# 在发送消息⾃动创建不存在的topic时默认创建的队列数为4个
defaultTopicQueueNums4
# 是否允许 Broker ⾃动创建Topic建议线下开启线上关闭
autoCreateTopicEnabletrue
# 是否允许 Broker ⾃动创建订阅组建议线下开启线上关闭
autoCreateSubscriptionGrouptrue
# broker对外服务的监听端⼝
listenPort10911
# abort⽂件存储路径
abortFile/usr/local/rocketmq/store/abort
# 消息存储路径
storePathRootDir/usr/local/rocketmq/store
# commitLog存储路径
storePathCommitLog/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex/usr/local/rocketmq/store/index
# checkpoint⽂件存储路径
storeCheckpoint/usr/local/rocketmq/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize65536
# commitLog每个⽂件的⼤⼩默认1G
mapedFileSizeCommitLog1073741824
# ConsumeQueue每个⽂件默认存30W条根据业务情况调整
mapedFileSizeConsumeQueue300000broker-a 的 slave 节点 在服务器 3 上进⼊到 conf/2m-2s-async ⽂件夹内修改 broker-a-s.properties ⽂件。 brokerClusterNameDefaultCluster
brokerNamebroker-a
# broker所在服务器的ip注意
brokerIP1192.168.194.135
# broker的id0表示master0表示slave
brokerId1
deleteWhen04
fileReservedTime48
brokerRoleSLAVE
flushDiskTypeASYNC_FLUSH
# 名称服务器的地址列表注意
namesrvAddr192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
defaultTopicQueueNums4
autoCreateTopicEnabletrue
autoCreateSubscriptionGrouptrue
listenPort11011
abortFile/usr/local/rocketmq/store-slave/abort
storePathRootDir/usr/local/rocketmq/store-slave
storePathCommitLog/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue/usr/local/rocketmq/store-slave/consumequeue
storePathIndex/usr/local/rocketmq/store-slave/index
storeCheckpoint/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize65536broker-b 的 master 节点 在服务器 3 上进⼊到 conf/2m-2s-async ⽂件夹内修改 broker-b.properties ⽂件。 brokerClusterNameDefaultCluster
brokerNamebroker-b
# broker所在服务器的ip注意
brokerIP1192.168.194.135
brokerId0
deleteWhen04
fileReservedTime48
brokerRoleASYNC_MASTER
flushDiskTypeASYNC_FLUSH
# 名称服务器的地址列表注意
namesrvAddr192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
defaultTopicQueueNums4
autoCreateTopicEnabletrue
autoCreateSubscriptionGrouptrue
listenPort10911
abortFile/usr/local/rocketmq/store/abort
storePathRootDir/usr/local/rocketmq/store
storePathCommitLog/usr/local/rocketmq/store/commitlog
storePathConsumeQueue/usr/local/rocketmq/store/consumequeue
storePathIndex/usr/local/rocketmq/store/index
storeCheckpoint/usr/local/rocketmq/store/checkpoint
maxMessageSize65536broker-b 的 slave 节点 在服务器 2 上进⼊到 conf/2m-2s-async ⽂件夹内修改 broker-b-s.properties ⽂件。 brokerClusterNameDefaultCluster
#
brokerNamebroker-b
#
brokerIP1192.168.194.134
#
brokerId1
deleteWhen04
fileReservedTime48
brokerRoleSLAVE
flushDiskTypeASYNC_FLUSH
# 名称服务器的地址列表注意
namesrvAddr192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
defaultTopicQueueNums4
autoCreateTopicEnabletrue
autoCreateSubscriptionGrouptrue
listenPort11011
abortFile/usr/local/rocketmq/store-slave/abort
storePathRootDir/usr/local/rocketmq/store-slave
storePathCommitLog/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue/usr/local/rocketmq/store-slave/consumequeue
storePathIndex/usr/local/rocketmq/store-slave/index
storeCheckpoint/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize65536修改服务器 2 和服务器 3 的 bin/runbroker.sh ⽂件 修改 JVM 内存默认的 8g 为 512m。(CentOS 的不需要) JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m4、启动 broker 在服务器 2 中启动 broker-amaster和 broker-b-sslave -c 表示指定哪个配置文件需要注意的是路径
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties 在服务器 3 中启动 broker-bmaster和 broker-a-sslave
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties 可以用 jps 命令查看一下服务 验证集群
使用 RocketMQ 提供的 tools 工具验证集群是否正常工作。
在服务器 2 上配置环境变量
用于被 tools 中的生产者和消费者程序读取该变量。
vim /etc/profileexport NAMESRV_ADDR192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876source /etc/profile启动生产者
./tools.sh org.apache.rocketmq.example.quickstart.Producer报错
[rootlocalhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 268435456, 0) failed; errorCannot allocate memory (errno12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 268435456 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/rocketmq/rocketmq-all-5.1.2-bin-release/bin/hs_err_pid9766.log原因内存不够
启动消费者
./tools.sh org.apache.rocketmq.example.quickstart.Consumer可视化管理控制平台安装
RocketMQ 没有提供可视化管理控制平台可以使⽤第三方管理控制平台https://github.com/apache/rocketmq-externals/tree/rocketmq-console-1.0.0/rocketmq-console 下载管理控制平台 解压缩在 linux 服务器上放在 /rocketmq/rocketmq-externals 目录下
1、修改 rocketmq-externals/rocketmq-externals-master/rocketmqconsole/src/main/resources/application.properties 配置⽂件中的 nameserver 地址
rocketmq.config.namesrvAddr192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:98762、回到 rocketmq-externals/rocketmq-externals-master/rocketmqconsole 路径下执⾏ maven 命令进⾏打包
安装 maven 环境可以先输入 mvn 看是否有环境
apt install maven打包
mvn clean package -Dmaven.test.skiptrue报错
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:2.6:resources (default-resources) on project rocketmq-console-ng: Execution default-resources of goal org.apache.maven.plugins:maven-resources-plugin:2.6:resources failed: A required class was missing while executing org.apache.maven.plugins:maven-resources-plugin:2.6:resources: org/apache/maven/shared/filtering/MavenFilteringException
[ERROR] -----------------------------------------------------
[ERROR] realm pluginorg.apache.maven.plugins:maven-resources-plugin:2.6
[ERROR] strategy org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy
[ERROR] urls[0] file:/usr/local/maven/repository/org/apache/maven/plugins/maven-resources-plugin/2.6/maven-resources-plugin-2.6.jar
[ERROR] urls[1] file:/usr/local/maven/repository/org/codehaus/plexus/plexus-utils/2.0.5/plexus-utils-2.0.5.jar
[ERROR] urls[2] file:/usr/local/maven/repository/org/apache/maven/shared/maven-filtering/1.1/maven-filtering-1.1.jar
[ERROR] urls[3] file:/usr/local/maven/repository/org/codehaus/plexus/plexus-interpolation/1.13/plexus-interpolation-1.13.jar
[ERROR] Number of foreign imports: 1
[ERROR] import: Entry[import from realm ClassRealm[maven.api, parent: null]]
[ERROR]
[ERROR] -----------------------------------------------------
[ERROR] : org.apache.maven.shared.filtering.MavenFilteringException
[ERROR] - [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginContainerException解决方法在 pom.xml 文件里添加两个插件然后重新打包 jar 包 plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-resources-plugin/artifactIdversion3.1.0/version/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-war-plugin/artifactIdversion3.1.0/version/plugin参考博客https://blog.csdn.net/csdn565973850/article/details/125785792
也可以看一下这篇文章可视化控制台安装
运行完成之后进入到 /target 目录下就可以看到生成的 jar 包了。
在 /target 目录下运行 jar 包jar包版本注意看target目录下的内容
java -jar rocketmq-console-ng-1.0.0.jar报错显示端口被占用进入 application.properties 文件修改端口号笔者修改的是 8081
访问所在服务器的 8081 端口查看集群界面可以看到之前部署的集群。 消息示例
1、构建基础 Java 基础环境
在 maven 项⽬中构建出 RocketMQ 消息示例的基础环境即创建生产者程序和消费者程序。
通过⽣产者和消费者了解 RocketMQ 操作消息的原生 API。 引入依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.7.1/version/dependency编写生产者程序
public class BaseProducer {public static void main(String[] args) throws Exception {//1.创建生产者DefaultMQProducer producer new DefaultMQProducer(my-producer-group1);//2.指定nameserver的地址producer.setNamesrvAddr(192.168.194.133:9876);//3.启动生产者producer.start();//4.创建消息for (int i 0; i 10; i) {Message message new Message(MyTopic1,TagA,(hello rocketmqi).getBytes(StandardCharsets.UTF_8));//5.发送消息SendResult sendResult producer.send(message);System.out.println(sendResult);}//6.关闭生产者producer.shutdown();}}创建消息 new Message() 的参数含义
topic主题指定往哪个 topic 去发消息tags过滤消息keys消息的 keyflag消息的标记body消息具体的内容waitStoreMsgOK指定是否等待消息存储操作完成默认值为 true此时发送方法会等待消息在 Broker 存储操作完成后才返回。
编写消费者程序
public class BaseConsumer {public static void main(String[] args) throws MQClientException {//1.创建消费者对象DefaultMQPushConsumer consumer new DefaultMQPushConsumer(my-consumer-group1);//2.指明nameserver的地址consumer.setNamesrvAddr(192.168.194.133:9876);//3.订阅主题:topic 和 过滤消息用的tag表达式consumer.subscribe(MyTopic1,*);//4.创建一个监听器当broker把消息推过来时调用consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
// System.out.println(收到的消息new String(msg.getBody()));System.out.println(收到的消息msg);}// 返回一个消费成功状态// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;return null;}});//5.启动消费者consumer.start();System.out.println(消费者已启动);}}启动消费者和生产者验证消息的收发 发送消息 消费消息 具体流程图 消息队列中的消费者通过订阅或者轮询的方式来获取消息队列中的消息。
在一些消息队列系统中消费者可以根据消息的偏移量Offset来获取 Broker 中的消息。这通常用于支持消息的顺序消费或者消息的重放机制。对于消息的重放机制消费者可以根据消息的偏移量重新消费一条或多条消息。通过指定特定的偏移量消费者可以重新消费该偏移量之后的消息以实现消息的重放或者回溯消费。不同的消息队列系统可能有不同的机制和方式来处理消息的偏移量。一些消息队列系统使用偏移量来定位消息在日志文件中的位置而另一些系统可能使用消息的序号或者其他标识来标识消息的顺序或位置。 每个 broker 有 2 个主节点和 2 个从节点副本。当消息进入系统后它会在这 4 个服务器节点之间进行轮询以实现消息的分发和冗余备份。
在消息发送时每个 Topic 通常会与一个 broker 绑定。然后在该 broker包含主从上创建的队列会被分配【四个队列ID】。这四个队列 ID 用于标识该 Topic 在 broker 上的不同分区或分片。每个队列 ID 对应一个特定的消息队列用于存储和处理消息。通过使用多个队列 ID可以实现消息的并行处理和负载均衡。
2、简单消息示例
简单消息分成三种: 同步消息、异步消息、单向消息。 同步消息 生产者发送消息后必须等待 broker 返回信息后才继续之后的业务逻辑在 broker 返回信息之前生产者阻塞等待。
同步消息的应用场景如重要通知消息、短信通知、短信营销系统等。注重安全性不希望消息丢失
/*** 同步生产者*/
public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {// Instantiate with a producer group name. -- 使用生产者组名进行123实例化DefaultMQProducer producer newDefaultMQProducer(producerGroup1);// Specify name server addresses. -- 指定名称服务器地址producer.setNamesrvAddr(192.168.194.133:9876);// Launch the instance. -- 启动实例producer.start();for (int i 0; i 100; i) {// Create a message instance, specifying topic, tag and message body.// 创建消息实例指定主题、标记和消息主体Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// Call send message to deliver message to one of brokers.// 调用发送消息以将消息传递到 broker集群 之一SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}// Shut down once the producer instance is not longer in use. -- 关闭生产者producer.shutdown();}}异步消息 生产者发完消息后不需要等待 broker 的回信可以直接执行之后的业务逻辑。生产者提供一个回调函数SendCallback()供 broker 调用体现了异步的方式。
当消费者处理消息完成后通过回调函数通知生产者生产者可以根据需要进行后续的处理。
具体的流程如下
生产者发送消息到消息队列中同时提供一个回调函数作为参数。消息队列将消息保存并将消息与回调函数进行关联。消费者从消息队列中获取消息进行处理。消费者处理完消息后消息队列会调用与该消息关联的回调函数。生产者在回调函数中执行相应的操作例如更新状态、记录日志、发送通知等。
异步消息的应用场景一般用于响应时间敏感的业务场景注重时间。
/*** 异步消息生产者*/
public class AsyncProducer {public static void main(String[] args) throws Exception {// Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// Specify name server addresses.producer.setNamesrvAddr(192.168.194.133:9876);// Launch the instance.producer.start();producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败时的重试次数int messageCount 100;final CountDownLatch countDownLatch new CountDownLatch(messageCount);for (int i 0; i messageCount; i) {try {final int index i;Message msg new Message(Jodie_topic_1023,TagA,OrderID188,Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {// 定义回调函数Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf(%-10d OK %s %n, index, sendResult.getMsgId());}Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf(%-10d Exception %s %n, index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}System.out.println(); // 会在消息反馈前打印countDownLatch.await(5, TimeUnit.SECONDS); // 等待全部消息返回完成producer.shutdown();}}单向消息 生产者发送完消息后不需要等待任何回复直接进行之后的业务逻辑。
应用场景单向传输用于需要中等可靠性的情况例如日志收集.
/*** 单向消息生产者*/
public class OnewayProducer {public static void main(String[] args) throws Exception {// Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// Specify name server addresses.producer.setNamesrvAddr(192.168.194.133:9876);// Launch the instance.producer.start();for (int i 0; i 100; i) {// Create a message instance, specifying topic, tag and message body.Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// Call send message to deliver message to one of brokers.producer.sendOneway(msg);}// Wait for sending to complete -- 等待发送完成Thread.sleep(5000);producer.shutdown();}}3、顺序消息
顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执行。
顺序消息分成两种: 局部顺序和全局顺序。
局部顺序
局部消息指的是消费者消费某个 topic 的某个队列中的消息是顺序的。
消费者使用 MessageListenerOrderly 类做消息监听实现局部顺序。
/*** 顺序消息*/
public class OrderProducer {public static void main(String[] args) throws Exception {// Instantiate with a producer group name.MQProducer producer new DefaultMQProducer(example_group_name);//名字服务器的地址已经在环境变量中配置好了NAMESRV_ADDR192.168.194.133:9876//Launch the instance.producer.start();for (int i 0; i 10; i) {int orderId i;for(int j 0 ; j 5 ; j ){Message msg new Message(OrderTopicTest, order_orderId, KEY orderId,(order_orderId step j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg, new MessageQueueSelector() { // 消息队列选择器Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {Integer id (Integer) arg;int index id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf(%s%n, sendResult);}}// server shutdownproducer.shutdown();}}环境变量配置 运行结果 从纵向来看是乱序的从横向来看是有序的 全局顺序
消费者消费全部消息都是顺序的(消费者按顺序消费全部消息)通常需要满足特定主题只有一个队列并确保消费者按顺序从队列中获取消息这种应用场景相对较少且性能较差。因为只有一个 broker也就意味着不能实现高可用
某个特定主题topic只有一个队列这意味着所有相关的消息都被发送到同一个队列中确保消息按照顺序存储在队列中。消费者从该队列中顺序消费消息消费者按照先后顺序从队列中获取消息并且保证每个消息都被处理完毕后再获取下一个消息以确保顺序性。 乱序消费
消费者消费消息不需要关注消息的顺序。消费者使用 MessageListenerConcurrently 类做消息监听。
public class OrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(example_group_name);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(OrderTopicTest, *);consumer.registerMessageListener(new MessageListenerConcurrently() { // 一个普通的监听器Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){System.out.println(消息内容new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}如下图所示 4、广播消息
广播是向主题 (topic) 的所有订阅者发送消息。订阅同一个 topic 的多个消费者能全量收到生产者发送的所有消息。 消费者 public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(example_group_name);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// set to broadcast mode -- 设置为广播模式consumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe(TopicTest, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){System.out.println(消息内容new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Broadcast Consumer Started.%n);}
}生产者 public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);producer.start();for (int i 0; i 100; i){Message msg new Message(TopicTest,TagA,OrderID188,(Hello worldi).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}producer.shutdown();}
}5、延迟消息
延迟消息与普通消息的不同之处在于它们要等到指定的时间之后才会被传递。 延迟消息生产者 public class ScheduledProducer {public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer new DefaultMQProducer(ExampleProducerGroup);// Launch producerproducer.start();int totalMessagesToSend 100;for (int i 0; i totalMessagesToSend; i) {Message message new Message(TestTopic, (Hello scheduled message i).getBytes());// 设置延迟等级为 3 级此消息将在 10 秒后传递给消费者。message.setDelayTimeLevel(3);// Send the messageproducer.send(message);}// Shutdown producer after use.producer.shutdown();}
}延迟等级如下所示 系统为这 18 个等级配置了 18 个 topic⽤于实现延迟队列的效果
在商业版 RocketMQ 中不仅可以设置延迟等级还可以设置具体的延迟时间但是在社区版 RocketMQ 中只能设置延迟等级。 延迟消息消费者 public class ScheduledConsumer {public static void main(String[] args) throws MQClientException {// Instantiate message consumerDefaultMQPushConsumer consumer new DefaultMQPushConsumer(ExampleConsumer);// Subscribe topicsconsumer.subscribe(TestTopic, *);// Register message listenerconsumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time period -- 打印大约延迟时间周期System.out.println(Receive message[msgId message.getMsgId() ] (System.currentTimeMillis() - message.getStoreTimestamp()) ms later);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// Launch consumerconsumer.start();}
}6、批量消息
批量发送消息提高了传递小消息的性能 使用批量消息 public class BatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);producer.start();String topic BatchTest;ListMessage messages new ArrayList();messages.add(new Message(topic, TagA, OrderID001, Hello world 0.getBytes()));messages.add(new Message(topic, TagA, OrderID002, Hello world 1.getBytes()));messages.add(new Message(topic, TagA, OrderID003, Hello world 2.getBytes()));producer.send(messages);producer.shutdown();}
}超出限制的批量消息 官方建议批量消息的总大小不应超过 1m实际不应超过 4m。
如果超过 4m 的批量消息需要进行分批处理同时设置 broker 的配置参数为 4m (在 broker 的配置文件中修改: maxMessageSize4194394
上面默认配置的是
# 限制的消息⼤⼩
maxMessageSize65536具体代码如下
public class MaxBatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(BatchProducerGroupName);producer.start();//large batchString topic BatchTest;// 发送 一万 条信息ListMessage messages new ArrayList(100*1000);for (int i 0; i 100*1000; i) {messages.add(new Message(topic, Tag, OrderID i, (Hello world i).getBytes()));}
// producer.send(messages);//split the large batch into small ones: 将大批分成小批ListSplitter splitter new ListSplitter(messages);while (splitter.hasNext()) {ListMessage listItem splitter.next();producer.send(listItem);}producer.shutdown();}}在使用 ListSplitter 进行消息拆分时需要注意以下限制
同一批次的消息应具有相同的主题即拆分的消息列表应属于同一个主题。同一批次的消息应具有相同的 waitStoreMsgOK 参数即等待消息存储成功的设置应相同。ListSplitter 不支持拆分延迟消息即消息的延迟级别应为 0。ListSplitter 不支持拆分事务消息即不支持对事务消息使用该工具类进行拆分。
7、过滤消息
在大多数情况下标签是一种简单而有用的设计可以用来选择您想要的消息。 tag 过滤的生产者 public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);producer.start();String[] tags new String[] {TagA, TagB, TagC};for (int i 0; i 15; i) {Message msg new Message(TagFilterTest,tags[i % tags.length],Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg);// 将 sendResult 的值格式化为字符串并以新的一行123打印到控制台System.out.printf(%s%n, sendResult);}producer.shutdown();}}%s%n 的含义
%s 是格式化字符串中的占位符表示将要替换为字符串类型的值。而 %n 则是换行符表示在输出结果中添加一个新的空行。 tag 过滤的消费者 public class TagConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name);consumer.subscribe(TagFilterTest, TagA || TagC);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}使用 SQL 过滤 SQL 功能可以通过您在发送消息时输入的属性进行一些计算。
在 RocketMQ 定义的语法下可以实现一些有趣的逻辑。这是一个例子:
------------
| message |
|----------| a 5 AND b abc
| a 10 | -------------------- Gotten
| b abc|
| c true |
------------
------------
| message |
|----------| a 5 AND b abc
| a 1 | -------------------- Missed
| b abc|
| c true |
------------语法
RocketMQ 只定义了一些基本的语法来支持这个特性也可以轻松扩展它。
数值比较如 , , , , BETWEEN, 字符比较如 , , INIS NULL 或 IS NOT NULL逻辑 AND, OR, NOT
常量类型有:
数字如 123、3.1415字符如 abc必须用单引号NULL特殊常数布尔值TRUE 或 FALSE
使用注意: 只有推模式的消费者可以使用 SQL 过滤。拉模式是用不了的。 SQL 过滤的生产者示例 public class SQLProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);producer.start();String[] tags new String[] {TagA, TagB, TagC};for (int i 0; i 15; i) {Message msg new Message(SqlFilterTest,tags[i % tags.length],(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties. -- 设置一些属性msg.putUserProperty(a, String.valueOf(i));SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}producer.shutdown();}
}SQL 过滤的消费者示例 使用前提
在 broker 的配置文件中加上 enablePropertyFiltertrue 属性。
public class SQLConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name);// Dont forget to set enablePropertyFiltertrue in brokerconsumer.subscribe(SqlFilterTest,MessageSelector.bySql((TAGS is not null and TAGS in (TagA, TagB)) and (a is not null and a between 0 and 3)));consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}8、事务消息 事务消息的定义 它可以被认为是一个两阶段的提交消息实现以确保分布式系统的最终一致性。
事务性消息确保本地事务的执行和消息的发送可以原子地执行。 事务消息有三种状态 TransactionStatus.CommitTransaction: 提交事务表示允许消费者消费该消息。TransactionStatus.RollbackTransaction: 回滚事务表示该消息将被删除不允许消费。TransactionStatus.Unknown: 中间状态表示需要 MQ 回查才能确定状态。 事务消息的实现流程 场景联想
下订单 – 支付这两个阶段。
half 消息在哪里 生产者 public class TransactionProducer {public static void main(String[] args) throws Exception {// 事务监听器TransactionListener transactionListener new TransactionListenerImpl();TransactionMQProducer producer new TransactionMQProducer(please_rename_unique_group_name);producer.setNamesrvAddr(192.168.194.133:9876);// 线程池ExecutorService executorService new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable(2000), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags new String[] {TagA, TagB, TagC, TagD, TagE};for (int i 0; i 10; i) {try {Message msg new Message(TopicTest, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.printf(%s%n, sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i 0; i 100000; i) {Thread.sleep(1000);}producer.shutdown();}
}本地事务处理 – 事务监听器实现类 TransactionListenerImpl public class TransactionListenerImpl implements TransactionListener {/*** When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.* 当发送事务准备(半)消息成功时将调用此方123法 执行123本地事务。** param msg Half(prepare) message* param arg Custom business parameter 自定义业务参数* return 事务状态*/Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String tags msg.getTags();if(StringUtils.contains(tags,TagA)){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.contains(tags,TagB)){return LocalTransactionState.ROLLBACK_MESSAGE;}else{return LocalTransactionState.UNKNOW;}}/*** 回查本地事务** param msg Check message* return Transaction state*/Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String tags msg.getTags();if(StringUtils.contains(tags,TagC)){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.contains(tags,TagD)){return LocalTransactionState.ROLLBACK_MESSAGE;}else{return LocalTransactionState.UNKNOW;}}
}消费者 public class TransactionConsumer {public static void main(String[] args) throws MQClientException {//1.创建消费者对象DefaultMQPushConsumer consumer new DefaultMQPushConsumer(my-consumer-group1);//2.指明nameserver的地址consumer.setNamesrvAddr(192.168.194.133:9876);//3.订阅主题:topic 和过滤消息用的tag表达式consumer.subscribe(TopicTest,*);//4.创建一个监听器当broker把消息推过来时调用consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
// System.out.println(收到的消息new String(msg.getBody()));System.out.println(收到的消息msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumer.start();System.out.println(消费者已启动);}
}使用限制
事务性消息没有调度和批处理⽀持。为避免单条消息被检查次数过多导致半队列消息堆积我们默认将单条消息的检查次数限制为 15 次但⽤户可以通过更改 “transactionCheckMax” 来更改此限制。参数在 broker 的配置中如果⼀条消息的检查次数超过 “transactionCheckMax” 次broker 默认会丢弃这条消息同时打印错误日志。用户可以通过重写 “AbstractTransactionCheckListener” 类来改变这种行为。事务消息将在⼀定时间后检查该时间由代理配置中的参数 “transactionTimeout” 确定。并且用户也可以在发送事务消息时通过设置用户属性 “CHECK_IMMUNITY_TIME_IN_SECONDS” 来改变这个限制这个参数优先于 “transactionMsgTimeout” 参数。⼀个事务性消息可能会被检查或消费不止⼀次。提交给用户目标主题的消息 reput 可能会失败。目前它取决于日志记录。高可用是由 RocketMQ 本身的高可用机制来保证的。如果要保证事务消息不丢失保证事务完整性推荐使用同步双写机制。事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同事务性消息允许向后查询。MQ 服务器通过其⽣产者 ID 查询客户端。