当前位置: 首页 > news >正文

商丘网站建设做企业网站步骤

商丘网站建设,做企业网站步骤,瑞丽住建局网站,网站 优化 教程目录 基本操作 启动 测试 双主双从集群搭建 总体架构 工作流程 服务器环境 Host添加信息 防火墙配置 环境变量配置 创建消息存储路径 broker配置文件 修改启动脚本文件 服务启动 查看进程状态 查看日志 mqadmin管理工具 使用方式 命令介绍 集群监控平台搭…目录 基本操作 启动 测试 双主双从集群搭建 总体架构 工作流程 服务器环境 Host添加信息 防火墙配置 环境变量配置 创建消息存储路径 broker配置文件 修改启动脚本文件 服务启动 查看进程状态 查看日志  mqadmin管理工具 使用方式 命令介绍 集群监控平台搭建 消息样例 基本样例 顺序消息 延时消息 批量消息 过滤消息 事务消息 基本操作 启动 启动RocketMQ # 1.启动NameServer nohup sh bin/mqnamesrv # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/namesrv.log 启动Broker # 1.启动Broker nohup sh bin/mqbroker -n localhost:9876 # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/broker.log RocketMQ默认的虚拟机内存较大启动Broker如果因为内存不足失败需要编辑如下两个配置文件修改JVM内存大小 # 编辑runbroker.sh和runserver.sh修改默认JVM大小 vi runbroker.sh vi runserver.sh 参考设置JAVA_OPT${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m 测试 发送消息 # 1.设置环境变量 export NAMESRV_ADDRlocalhost:9876 # 2.使用安装包的Demo发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 接收消息 # 1.设置环境变量 export NAMESRV_ADDRlocalhost:9876 # 2.接收消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 关闭RocketMQ # 1.关闭NameServer sh bin/mqshutdown namesrv # 2.关闭Broker sh bin/mqshutdown broker 双主双从集群搭建 总体架构 消息高可用采用2m-2s同步双写方式 工作流程 启动NameServerNameServer起来后监听端口等待Broker、Producer、Consumer连上来相当于一个路由控制中心。 Broker启动跟所有的NameServer保持长连接定时发送心跳包。心跳包中包含当前Broker信息(IP端口等)以及存储所有Topic信息。注册成功后NameServer集群中就有Topic跟Broker的映射关系。 收发消息前先创建Topic创建Topic时需要指定该Topic要存储在哪些Broker上也可以在发送消息时自动创建Topic。 Producer发送消息启动时先跟NameServer集群中的其中一台建立长连接并从NameServer中获取当前发送的Topic存在哪些Broker上轮询从队列列表中选择一个队列然后与队列所在的Broker建立长连接从而向Broker发消息。 Consumer跟Producer类似跟其中一台NameServer建立长连接获取当前订阅Topic存在哪些Broker上然后直接跟Broker建立连接通道开始消费消息。 服务器环境 序号IP角色架构模式1192.168.25.135nameserver、brokerserverMaster1、Slave22192.168.25.138nameserver、brokerserverMaster2、Slave1 Host添加信息 vim /etc/hosts 配置如下: # nameserver 192.168.25.135 rocketmq-nameserver1 192.168.25.138 rocketmq-nameserver2 # broker 192.168.25.135 rocketmq-master1 192.168.25.135 rocketmq-slave2 192.168.25.138 rocketmq-master2 192.168.25.138 rocketmq-slave1 配置完成后, 重启网卡 systemctl restart network 防火墙配置 宿主机需要远程访问虚拟机的rocketmq服务和web服务需要开放相关的端口号简单粗暴的方式是直接关闭防火墙 # 关闭防火墙 systemctl stop firewalld.service # 查看防火墙的状态 firewall-cmd --state # 禁止firewall开机启动 systemctl disable firewalld.service 或者为了安全只开放特定的端口号RocketMQ默认使用3个端口9876 、10911 、11011 。如果防火墙没有关闭的话那么防火墙就必须开放这些端口 nameserver 默认使用 9876 端口master 默认使用 10911 端口slave 默认使用11011 端口 执行以下命令 # 开放name server默认端口 firewall-cmd --remove-port9876/tcp --permanent # 开放master默认端口 firewall-cmd --remove-port10911/tcp --permanent # 开放slave默认端口 (当前集群模式可不开启) firewall-cmd --remove-port11011/tcp --permanent # 重启防火墙 firewall-cmd --reload 环境变量配置 vim /etc/profile 在profile文件的末尾加入如下命令 #set rocketmq ROCKETMQ_HOME/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release PATH$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH 输入:wq! 保存并退出 并使得配置立刻生效 source /etc/profile 创建消息存储路径 mkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir /usr/local/rocketmq/store/consumequeue mkdir /usr/local/rocketmq/store/index broker配置文件 master1 服务器192.168.25.135 vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties 修改配置如下 #所属集群名字 brokerClusterNamerocketmq-cluster #broker名字注意此处不同的配置文件填写的不一样 brokerNamebroker-a #0 表示 Master0 表示 Slave brokerId0 #nameServer地址分号分割 namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时自动创建服务器不存在的topic默认创建的队列数 defaultTopicQueueNums4 #是否允许 Broker 自动创建Topic建议线下开启线上关闭 autoCreateTopicEnabletrue #是否允许 Broker 自动创建订阅组建议线下开启线上关闭 autoCreateSubscriptionGrouptrue #Broker 对外服务的监听端口 listenPort10911 #删除文件时间点默认凌晨 4点 deleteWhen04 #文件保留时间默认 48 小时 fileReservedTime120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog1073741824 #ConsumeQueue每个文件默认存30W条根据业务情况调整 mapedFileSizeConsumeQueue300000 #destroyMapedFileIntervalForcibly120000 #redeleteHangedFileInterval120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio88 #存储路径 storePathRootDir/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize65536 #flushCommitLogLeastPages4 #flushConsumeQueueLeastPages2 #flushCommitLogThoroughInterval10000 #flushConsumeQueueThoroughInterval60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRoleSYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskTypeSYNC_FLUSH #checkTransactionMessageEnablefalse #发消息线程池数量 #sendMessageThreadPoolNums128 #拉消息线程池数量 #pullMessageThreadPoolNums128 slave2 服务器192.168.25.135 vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties 修改配置如下 #所属集群名字 brokerClusterNamerocketmq-cluster #broker名字注意此处不同的配置文件填写的不一样 brokerNamebroker-b #0 表示 Master0 表示 Slave brokerId1 #nameServer地址分号分割 namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时自动创建服务器不存在的topic默认创建的队列数 defaultTopicQueueNums4 #是否允许 Broker 自动创建Topic建议线下开启线上关闭 autoCreateTopicEnabletrue #是否允许 Broker 自动创建订阅组建议线下开启线上关闭 autoCreateSubscriptionGrouptrue #Broker 对外服务的监听端口 listenPort11011 #删除文件时间点默认凌晨 4点 deleteWhen04 #文件保留时间默认 48 小时 fileReservedTime120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog1073741824 #ConsumeQueue每个文件默认存30W条根据业务情况调整 mapedFileSizeConsumeQueue300000 #destroyMapedFileIntervalForcibly120000 #redeleteHangedFileInterval120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio88 #存储路径 storePathRootDir/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize65536 #flushCommitLogLeastPages4 #flushConsumeQueueLeastPages2 #flushCommitLogThoroughInterval10000 #flushConsumeQueueThoroughInterval60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRoleSLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskTypeASYNC_FLUSH #checkTransactionMessageEnablefalse #发消息线程池数量 #sendMessageThreadPoolNums128 #拉消息线程池数量 #pullMessageThreadPoolNums128 master2 服务器192.168.25.138 vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties 修改配置如下 #所属集群名字 brokerClusterNamerocketmq-cluster #broker名字注意此处不同的配置文件填写的不一样 brokerNamebroker-b #0 表示 Master0 表示 Slave brokerId0 #nameServer地址分号分割 namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时自动创建服务器不存在的topic默认创建的队列数 defaultTopicQueueNums4 #是否允许 Broker 自动创建Topic建议线下开启线上关闭 autoCreateTopicEnabletrue #是否允许 Broker 自动创建订阅组建议线下开启线上关闭 autoCreateSubscriptionGrouptrue #Broker 对外服务的监听端口 listenPort10911 #删除文件时间点默认凌晨 4点 deleteWhen04 #文件保留时间默认 48 小时 fileReservedTime120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog1073741824 #ConsumeQueue每个文件默认存30W条根据业务情况调整 mapedFileSizeConsumeQueue300000 #destroyMapedFileIntervalForcibly120000 #redeleteHangedFileInterval120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio88 #存储路径 storePathRootDir/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize65536 #flushCommitLogLeastPages4 #flushConsumeQueueLeastPages2 #flushCommitLogThoroughInterval10000 #flushConsumeQueueThoroughInterval60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRoleSYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskTypeSYNC_FLUSH #checkTransactionMessageEnablefalse #发消息线程池数量 #sendMessageThreadPoolNums128 #拉消息线程池数量 #pullMessageThreadPoolNums128 slave1 服务器192.168.25.138 vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties 修改配置如下 #所属集群名字 brokerClusterNamerocketmq-cluster #broker名字注意此处不同的配置文件填写的不一样 brokerNamebroker-a #0 表示 Master0 表示 Slave brokerId1 #nameServer地址分号分割 namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时自动创建服务器不存在的topic默认创建的队列数 defaultTopicQueueNums4 #是否允许 Broker 自动创建Topic建议线下开启线上关闭 autoCreateTopicEnabletrue #是否允许 Broker 自动创建订阅组建议线下开启线上关闭 autoCreateSubscriptionGrouptrue #Broker 对外服务的监听端口 listenPort11011 #删除文件时间点默认凌晨 4点 deleteWhen04 #文件保留时间默认 48 小时 fileReservedTime120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog1073741824 #ConsumeQueue每个文件默认存30W条根据业务情况调整 mapedFileSizeConsumeQueue300000 #destroyMapedFileIntervalForcibly120000 #redeleteHangedFileInterval120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio88 #存储路径 storePathRootDir/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize65536 #flushCommitLogLeastPages4 #flushConsumeQueueLeastPages2 #flushCommitLogThoroughInterval10000 #flushConsumeQueueThoroughInterval60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRoleSLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskTypeASYNC_FLUSH #checkTransactionMessageEnablefalse #发消息线程池数量 #sendMessageThreadPoolNums128 #拉消息线程池数量 #pullMessageThreadPoolNums128 修改启动脚本文件 runbroker.sh vi /usr/local/rocketmq/bin/runbroker.sh 需要根据内存大小进行适当的对JVM参数进行调整 # # 开发环境配置 JVM Configuration JAVA_OPT${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m runserver.sh vim /usr/local/rocketmq/bin/runserver.sh JAVA_OPT${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m 服务启动 启动NameServe集群 分别在192.168.25.135和192.168.25.138启动NameServer cd /usr/local/rocketmq/bin nohup sh mqnamesrv 启动Broker集群 在192.168.25.135上启动master1和slave2 ## master1cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-syncbroker-a.properties ## slave2cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties 在192.168.25.138上启动master2和slave2 # master2cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties # slave1cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties 查看进程状态 启动后通过JPS查看启动进程 查看日志  # 查看nameServer日志 tail -500f ~/logs/rocketmqlogs/namesrv.log # 查看broker日志 tail -500f ~/logs/rocketmqlogs/broker.log mqadmin管理工具 使用方式 进入RocketMQ安装位置在bin目录下执行./mqadmin {command} {args} 命令介绍 Topic相关 名称含义命令选项说明updateTopic创建更新Topic配置-bBroker 地址表示 topic 所在 Broker只支持单台Broker地址为ip:port-ccluster 名称表示 topic 所在集群集群可通过 clusterList 查询-h-打印帮助-nNameServer服务地址格式 ip:port-p指定新topic的读写权限( W2|R4|WR6 )-r可读队列数默认为 8-w可写队列数默认为 8-ttopic 名称名称只能使用字符 ^[a-zA-Z0-9_-]$ deleteTopic删除Topic-ccluster 名称表示删除某集群下的某个 topic 集群 可通过 clusterList 查询-h打印帮助-nNameServer 服务地址格式 ip:port-ttopic 名称名称只能使用字符 ^[a-zA-Z0-9_-]$ topicList查看 Topic 列表信息-h打印帮助-c不配置-c只返回topic列表增加-c返回clusterName, topic, consumerGroup信息即topic的所属集群和订阅关系没有参数-nNameServer 服务地址格式 ip:porttopicRoute查看 Topic 路由信息-ttopic 名称-h打印帮助-nNameServer 服务地址格式 ip:porttopicStatus查看 Topic 消息队列offset-ttopic 名称-h打印帮助-nNameServer 服务地址格式 ip:porttopicClusterList查看 Topic 所在集群列表-ttopic 名称-h打印帮助-nNameServer 服务地址格式 ip:portupdateTopicPerm更新 Topic 读写权限-ttopic 名称-h打印帮助-nNameServer 服务地址格式 ip:port-bBroker 地址表示 topic 所在 Broker只支持单台Broker地址为ip:port-p指定新 topic 的读写权限( W2|R4|WR6 )-ccluster 名称表示 topic 所在集群集群可通过 clusterList 查询-b优先如果没有-b则对集群中所有Broker执行命令updateOrderConf从NameServer上创建、删除、获取特定命名空间的kv配置目前还未启用-h打印帮助-nNameServer 服务地址格式 ip:port-ttopic键-vorderConf值-mmethod可选get、put、deleteallocateMQ以平均负载算法计算消费者列表负载消息队列的负载结果-ttopic 名称-h打印帮助-nNameServer 服务地址格式 ip:port-iipList用逗号分隔计算这些ip去负载Topic的消息队列statsAll打印Topic订阅关系、TPS、积累量、24h读写总量等信息-h打印帮助-nNameServer 服务地址格式 ip:port-a是否只打印活跃topic-t指定topic ​集群相关 名称含义命令选项说明clusterList查看集群信息集群、BrokerName、BrokerId、TPS等信息-m打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)-h打印帮助-nNameServer 服务地址格式 ip:port-i打印间隔单位秒clusterRT发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。-aamount每次探测的总数RT 总时间 / amount-s消息大小单位B-c探测哪个集群-p是否打印格式化日志以|分割默认不打印-h打印帮助-m所属机房打印使用-i发送间隔单位秒-nNameServer 服务地址格式 ip:port ​Broker相关 名称含义命令选项说明updateBrokerConfig更新 Broker 配置文件会修改Broker.conf-bBroker 地址格式为ip:port-ccluster 名称-kkey 值-vvalue 值-h打印帮助-nNameServer 服务地址格式 ip:portbrokerStatus查看 Broker 统计信息、运行状态你想要的信息几乎都在里面-bBroker 地址地址为ip:port-h打印帮助-nNameServer 服务地址格式 ip:portbrokerConsumeStatsBroker中各个消费者的消费情况按Message Queue维度返回Consume OffsetBroker OffsetDiffTImestamp等信息-bBroker 地址地址为ip:port-t请求超时时间-ldiff阈值超过阈值才打印-o是否为顺序topic一般为false-h打印帮助-nNameServer 服务地址格式 ip:portgetBrokerConfig获取Broker配置-bBroker 地址地址为ip:port-nNameServer 服务地址格式 ip:portwipeWritePerm从NameServer上清除 Broker写权限-bBroker 地址地址为ip:port-nNameServer 服务地址格式 ip:port-h打印帮助cleanExpiredCQ清理Broker上过期的Consume Queue如果手动减少对列数可能产生过期队列-nNameServer 服务地址格式 ip:port-h打印帮助-bBroker 地址地址为ip:port-c集群名称cleanUnusedTopic清理Broker上不使用的Topic从内存中释放Topic的Consume Queue如果手动删除Topic会产生不使用的Topic-nNameServer 服务地址格式 ip:port-h打印帮助-bBroker 地址地址为ip:port-c集群名称sendMsgStatus向Broker发消息返回发送状态和RT-nNameServer 服务地址格式 ip:port-h打印帮助-bBrokerName注意不同于Broker地址-s消息大小单位B-c发送次数 消息相关 名称含义命令选项说明queryMsgById根据offsetMsgId查询msg如果使用开源控制台应使用offsetMsgId此命令还有其他参数具体作用请阅读QueryMsgByIdSubCommand。-imsgId-h打印帮助-nNameServer 服务地址格式 ip:portqueryMsgByKey根据消息 Key 查询消息-kmsgKey-tTopic 名称-h打印帮助-nNameServer 服务地址格式 ip:portqueryMsgByOffset根据 Offset 查询消息-bBroker 名称这里需要注意 填写的是 Broker 的名称不是 Broker 的地址Broker 名称可以在 clusterList 查到-iquery 队列 id-ooffset 值-ttopic 名称-h打印帮助-nNameServer 服务地址格式 ip:portqueryMsgByUniqueKey根据msgId查询msgId不同于offsetMsgId区别详见常见运维问题。-g-d配合使用查到消息后尝试让特定的消费者消费消息并返回消费结果-h打印帮助-nNameServer 服务地址格式 ip:port-iuniqe msg id-gconsumerGroup-dclientId-ttopic名称checkMsgSendRT检测向topic发消息的RT功能类似clusterRT-h打印帮助-nNameServer 服务地址格式 ip:port-ttopic名称-a探测次数-s消息大小sendMessage发送一条消息可以根据配置发往特定Message Queue或普通发送。-h打印帮助-nNameServer 服务地址格式 ip:port-ttopic名称-pbody消息体-kkeys-ctags-bBrokerName-iqueueIdconsumeMessage消费消息。可以根据offset、开始结束时间戳、消息队列消费消息配置不同执行不同消费逻辑详见ConsumeMessageCommand。-h打印帮助-nNameServer 服务地址格式 ip:port-ttopic名称-bBrokerName-o从offset开始消费-iqueueId-g消费者分组-s开始时间戳格式详见-h-d结束时间戳-c消费多少条消息printMsg从Broker消费消息并打印可选时间段-h打印帮助-nNameServer 服务地址格式 ip:port-ttopic名称-c字符集例如UTF-8-ssubExpress过滤表达式-b开始时间戳格式参见-h-e结束时间戳-d是否打印消息体printMsgByQueue类似printMsg但指定Message Queue-h打印帮助-nNameServer 服务地址格式 ip:port-ttopic名称-iqueueId-aBrokerName-c字符集例如UTF-8-ssubExpress过滤表达式-b开始时间戳格式参见-h-e结束时间戳-p是否打印消息-d是否打印消息体-f是否统计tag数量并打印resetOffsetByTime按时间戳重置offsetBroker和consumer都会重置-h打印帮助-nNameServer 服务地址格式 ip:port-g消费者分组-ttopic名称-s重置为此时间戳对应的offset-f是否强制重置如果false只支持回溯offset如果true不管时间戳对应offset与consumeOffset关系-c 是否重置c客户端offset 消费者、消费组相关 名称含义命令选项说明consumerProgress查看订阅组消费状态可以查看具体的client IP的消息积累量-g消费者所属组名-s是否打印client IP-h打印帮助-nNameServer 服务地址格式 ip:portconsumerStatus查看消费者状态包括同一个分组中是否都是相同的订阅分析Process Queue是否堆积返回消费者jstack结果内容较多使用者参见ConsumerStatusSubCommand-h打印帮助-nNameServer 服务地址格式 ip:port-gconsumer group-iclientId-s是否执行jstackgetConsumerStatus获取 Consumer 消费进度-g消费者所属组名-t查询主题-iConsumer 客户端 ip-nNameServer 服务地址格式 ip:port-h打印帮助updateSubGroup更新或创建订阅关系-nNameServer 服务地址格式 ip:port-h打印帮助-bBroker地址-c集群名称-g消费者分组名称-s分组是否允许消费-m是否从最小offset开始消费-d是否是广播模式-q重试队列数量-r最大重试次数-i当slaveReadEnable开启时有效且还未达到从slave消费时建议从哪个BrokerId消费可以配置备机id主动从备机消费-w如果Broker建议从slave消费配置决定从哪个slave消费配置BrokerId例如1-a当消费者数量变化时是否通知其他消费者负载均衡deleteSubGroup从Broker删除订阅关系-nNameServer 服务地址格式 ip:port-h打印帮助-bBroker地址-c集群名称-g消费者分组名称cloneGroupOffset在目标群组中使用源群组的offset-nNameServer 服务地址格式 ip:port-h打印帮助-s源消费者组-d目标消费者组-ttopic名称-o暂未使用 连接相关 名称含义命令选项说明consumerConnec tion查询 Consumer 的网络连接-g消费者所属组名-nNameServer 服务地址格式 ip:port-h打印帮助producerConnec tion查询 Producer 的网络连接-g生产者所属组名-t主题名称-nNameServer 服务地址格式 ip:port-h打印帮助 ​NameServer相关​ 名称含义命令选项说明updateKvConfig更新NameServer的kv配置目前还未使用-s命名空间-kkey-vvalue-nNameServer 服务地址格式 ip:port-h打印帮助deleteKvConfig删除NameServer的kv配置-s命名空间-kkey-nNameServer 服务地址格式 ip:port-h打印帮助getNamesrvConfig获取NameServer配置-nNameServer 服务地址格式 ip:port-h打印帮助updateNamesrvConfig修改NameServer配置-nNameServer 服务地址格式 ip:port-h打印帮助-kkey-vvalue 其他 名称含义命令选项说明startMonitoring开启监控进程监控消息误删、重试队列消息数等-nNameServer 服务地址格式 ip:port-h打印帮助 ​注意事项 几乎所有命令都需要配置-n表示NameServer地址格式为ip:port 几乎所有命令都可以通过-h获取帮助 如果既有Broker地址-b配置项又有clusterName-c配置项则优先以Broker地址执行命令如果不配置Broker地址则对集群中所有主机执行命令 集群监控平台搭建 概述 RocketMQ有一个对其扩展的开源项目incubator-rocketmq-externals这个项目中有一个子模块叫rocketmq-console这个便是管理控制台项目了先将incubator-rocketmq-externals拉到本地因为我们需要自己对rocketmq-console进行编译打包运行。 下载并编译打包 git clone https://github.com/apache/rocketmq-externals cd rocketmq-console mvn clean package -Dmaven.test.skiptrue 注意打包前在rocketmq-console中配置namesrv集群地址 rocketmq.config.namesrvAddr192.168.25.135:9876;192.168.25.138:9876 启动rocketmq-console java -jar rocketmq-console-ng-1.0.0.jar 启动成功后我们就可以通过浏览器访问http://localhost:8080进入控制台界面了如下图 集群状态 消息样例 导入MQ客户端依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.4.0/version /dependency 消息发送者步骤分析 创建消息生产者producer并制定生产者组名指定Nameserver地址启动producer创建消息对象指定主题Topic、Tag和消息体发送消息关闭生产者producer 消息消费者步骤分析 创建消费者Consumer制定消费者组名指定Nameserver地址订阅主题Topic和Tag设置回调函数处理消息启动消费者consumer 基本样例 消息发送 发送同步消息 这种可靠性同步地发送方式使用的比较广泛比如重要的消息通知短信通知。 public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// 设置NameServer的地址producer.setNamesrvAddr(localhost:9876);// 启动Producer实例producer.start();for (int i 0; i 100; i) {// 创建消息并指定TopicTag和消息体Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf(%s%n, sendResult);}// 如果不再发送消息关闭Producer实例。producer.shutdown();} } 发送异步消息 异步消息通常用在对响应时间敏感的业务场景即发送端不能容忍长时间地等待Broker的响应。 public class AsyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// 设置NameServer的地址producer.setNamesrvAddr(localhost:9876);// 启动Producer实例producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);for (int i 0; i 100; i) {final int index i;// 创建消息并指定TopicTag和消息体Message msg new Message(TopicTest,TagA,OrderID188,Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.printf(%-10d OK %s %n, index,sendResult.getMsgId());}Overridepublic void onException(Throwable e) {System.out.printf(%-10d Exception %s %n, index, e);e.printStackTrace();}});}// 如果不再发送消息关闭Producer实例。producer.shutdown();} } 单向发送消息 这种方式主要用在不特别关心发送结果的场景例如日志发送。 public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// 设置NameServer的地址producer.setNamesrvAddr(localhost:9876);// 启动Producer实例producer.start();for (int i 0; i 100; i) {// 创建消息并指定TopicTag和消息体Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息关闭Producer实例。producer.shutdown();} } 消费消息 负载均衡模式 消费者采用负载均衡方式消费消息多个消费者共同消费队列消息每个消费者处理的消息不同 public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(group1);// 指定Namesrv地址信息.consumer.setNamesrvAddr(localhost:9876);// 订阅Topicconsumer.subscribe(Test, *);//负载均衡模式消费consumer.setMessageModel(MessageModel.CLUSTERING);// 注册回调函数处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消息者consumer.start();System.out.printf(Consumer Started.%n); } 广播模式 消费者采用广播的方式消费消息每个消费者消费的消息都是相同的 public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(group1);// 指定Namesrv地址信息.consumer.setNamesrvAddr(localhost:9876);// 订阅Topicconsumer.subscribe(Test, *);//广播模式消费consumer.setMessageModel(MessageModel.BROADCASTING);// 注册回调函数处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消息者consumer.start();System.out.printf(Consumer Started.%n); } 顺序消息 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序可以分为分区有序或者全局有序。 顺序消费的原理解析在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列)而消费消息的时候从多个queue上拉取消息这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中消费的时候只从这个queue上依次拉取则就保证了顺序。当发送和消费参与的queue只有一个则是全局有序如果多个queue参与则为分区有序即相对每个queue消息都是有序的。 下面用订单进行分区有序的示例。一个订单的顺序流程是创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中消费时同一个OrderId获取到的肯定是同一个队列。 顺序消息生产 /** * Producer发送顺序消息 */ public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);producer.setNamesrvAddr(127.0.0.1:9876);producer.start();String[] tags new String[]{TagA, TagC, TagD};// 订单列表ListOrderStep orderList new Producer().buildOrders();Date date new Date();SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String dateStr sdf.format(date);for (int i 0; i 10; i) {// 加个时间前缀String body dateStr Hello RocketMQ orderList.get(i);Message msg new Message(TopicTest, tags[i % tags.length], KEY i, body.getBytes());SendResult sendResult producer.send(msg, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {Long id (Long) arg; //根据订单id选择发送queuelong index id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(String.format(SendResult status:%s, queueId:%d, body:%s,sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}/*** 订单的步骤*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc desc;}Overridepublic String toString() {return OrderStep{ orderId orderId , desc desc \ };}}/*** 生成模拟订单数据*/private ListOrderStep buildOrders() {ListOrderStep orderList new ArrayListOrderStep();OrderStep orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(推送);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(完成);orderList.add(orderDemo);return orderList;} } 顺序消费消息 /** * 顺序消息消费带事务方式应用可控制Offset什么时候提交 */ public class ConsumerInOrder {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name_3);consumer.setNamesrvAddr(127.0.0.1:9876);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费br* 如果非第一次启动那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicTest, TagA || TagC || TagD);consumer.registerMessageListener(new MessageListenerOrderly() {Random random new Random();Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序System.out.println(consumeThread Thread.currentThread().getName() queueId msg.getQueueId() , content: new String(msg.getBody()));}try {//模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer Started.);} } 延时消息 比如电商里提交了一个订单就可以发送一个延时消息1h后去检查这个订单的状态如果还是未付款就取消订单释放库存。 启动消息消费者 public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ExampleConsumer);// 订阅Topicsconsumer.subscribe(TestTopic, *);// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println(Receive message[msgId message.getMsgId() ] (System.currentTimeMillis() - message.getStoreTimestamp()) ms later);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();} } 发送延时消息 public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer new DefaultMQProducer(ExampleProducerGroup);// 启动生产者producer.start();int totalMessagesToSend 100;for (int i 0; i totalMessagesToSend; i) {Message message new Message(TestTopic, (Hello scheduled message i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();} } 验证 将会看到消息的消费比存储时间晚10秒 使用限制 // org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h; 现在RocketMq并不支持任意时间的延时需要设置几个固定的延时等级从1s到2h分别对应着等级1到18 批量消息 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic相同的waitStoreMsgOK而且不能是延时消息。此外这一批消息的总大小不应超过4MB。 发送批量消息 如果每次只发送不超过4MB的消息则很容易使用批处理样例如下 String topic BatchTest; ListMessage messages new ArrayList(); messages.add(new Message(topic, TagA, OrderID001, Hello world 0.getBytes())); messages.add(new Message(topic, TagA, OrderID002, Hello world 1.getBytes())); messages.add(new Message(topic, TagA, OrderID003, Hello world 2.getBytes())); try {producer.send(messages); } catch (Exception e) {e.printStackTrace();//处理error } 如果消息的总长度可能大于4MB时这时候最好把消息进行分割 public class ListSplitter implements IteratorListMessage {private final int SIZE_LIMIT 1024 * 1024 * 4;private final ListMessage messages;private int currIndex;public ListSplitter(ListMessage messages) {this.messages messages;}Override public boolean hasNext() {return currIndex messages.size();}Override public ListMessage next() {int nextIndex currIndex;int totalSize 0;for (; nextIndex messages.size(); nextIndex) {Message message messages.get(nextIndex);int tmpSize message.getTopic().length() message.getBody().length;MapString, String properties message.getProperties();for (Map.EntryString, String entry : properties.entrySet()) {tmpSize entry.getKey().length() entry.getValue().length();}tmpSize tmpSize 20; // 增加日志的开销20字节if (tmpSize SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex;}break;}if (tmpSize totalSize SIZE_LIMIT) {break;} else {totalSize tmpSize;}}ListMessage subList messages.subList(currIndex, nextIndex);currIndex nextIndex;return subList;} } //把大的消息分裂成若干个小的消息 ListSplitter splitter new ListSplitter(messages); while (splitter.hasNext()) {try {ListMessage listItem splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();//处理error} } 过滤消息 在大多数情况下TAG是一个简单而有用的设计其可以来选择您想要的消息。例如 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CID_EXAMPLE); consumer.subscribe(TOPIC, TAGA || TAGB || TAGC); 消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签这对于复杂的场景可能不起作用。在这种情况下可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下可以实现一些简单的逻辑。下面是一个例子 ------------ | message | |----------| a 5 AND b abc | a 10 | -------------------- Gotten | b abc| | c true | ------------ ------------ | message | |----------| a 5 AND b abc | a 1 | -------------------- Missed | b abc| | c true | ------------ SQL基本语法 RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。 数值比较比如BETWEEN 字符比较比如IN IS NULL 或者 IS NOT NULL 逻辑符号 ANDORNOT 常量支持类型为 数值比如1233.1415 字符比如abc必须用单引号包裹起来 NULL特殊的常量 布尔值TRUE 或 FALSE 只有使用push模式的消费者才能用使用SQL92标准的sql语句接口如下 public void subscribe(finalString topic, final MessageSelector messageSelector) 消息生产者 发送消息时你能通过putUserProperty来设置消息的属性 DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name); producer.start(); Message msg new Message(TopicTest,tag,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // 设置一些属性 msg.putUserProperty(a, String.valueOf(i)); SendResult sendResult producer.send(msg);producer.shutdown(); 消息消费者 用MessageSelector.bySql来使用sql筛选消息 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name_4); // 只有订阅的消息有这个属性a, a 0 and a 3 consumer.subscribe(TopicTest, MessageSelector.bySql(a between 0 and 3); consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }); consumer.start(); 事务消息 流程分析 上图说明了事务消息的大致方案其中分为两个流程正常事务消息的发送及提交、事务消息的补偿流程。 事务消息发送及提交 (1) 发送消息half消息。 (2) 服务端响应消息写入结果。 (3) 根据发送结果执行本地事务如果写入失败此时half消息对业务不可见本地逻辑不执行。 (4) 根据本地事务状态执行Commit或者RollbackCommit操作生成消息索引消息对消费者可见 事务补偿 (1) 对没有Commit/Rollback的事务消息pending状态的消息从服务端发起一次“回查” (2) Producer收到回查消息检查回查消息对应的本地事务的状态 (3) 根据本地事务状态重新Commit或者Rollback 其中补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。 事务消息状态 事务消息共有三种状态提交状态、回滚状态、中间状态 TransactionStatus.CommitTransaction: 提交事务它允许消费者消费此消息。 TransactionStatus.RollbackTransaction: 回滚事务它代表该消息将被删除不允许被消费。 TransactionStatus.Unknown: 中间状态它代表需要检查消息队列来确定状态。 创建事务性生产者 使用 TransactionMQProducer类创建生产者并指定唯一的 ProducerGroup就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。 public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//创建事务监听器TransactionListener transactionListener new TransactionListenerImpl();//创建消息生产者TransactionMQProducer producer new TransactionMQProducer(group6);producer.setNamesrvAddr(192.168.25.135:9876;192.168.25.138:9876);//生产者这是监听器producer.setTransactionListener(transactionListener);//启动消息生产者producer.start();String[] tags new String[]{TagA, TagB, TagC};for (int i 0; i 3; i) {try {Message msg new Message(TransactionTopic, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.printf(%s%n, sendResult);TimeUnit.SECONDS.sleep(1);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}//producer.shutdown();} } 实现事务的监听接口 当发送半消息成功时我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。 public class TransactionListenerImpl implements TransactionListener {Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println(执行本地事务);if (StringUtils.equals(TagA, msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals(TagB, msg.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println(MQ检查消息Tag【msg.getTags()】的本地事务执行结果);return LocalTransactionState.COMMIT_MESSAGE;} } 使用限制 事务消息不支持延时消息和批量消息。 为了避免单个消息被检查太多次而导致半队列消息累积我们默认将单个消息的检查次数限制为 15 次但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话 N transactionCheckMax 则 Broker 将丢弃此消息并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制该参数优先于 transactionMsgTimeout 参数。 事务性消息可能不止一次被检查或消费。 提交给用户的目标主题消息可能会失败目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证如果希望确保事务消息不丢失、并且事务完整性得到保证建议使用同步的双重写入机制。 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。 相关文章 RocketMQ学习笔记基础篇_Cat凯94的博客-CSDN博客
http://www.pierceye.com/news/30088/

相关文章:

  • 网站建设 建议电商网站方案
  • 苏州网站开发培训班99设计网站
  • 那个公司可以做网站wordpress 仿站 主题
  • 设计师个人网站架构龙岩上杭县
  • 网站制作的英文网站401错误
  • 上市公司集团网站建设电商素材网站
  • 网站管理助手 mysql电动牙刷开发
  • 创建官方网站电子商务网店运营推广
  • 关于做网站的外语文献tk域名注册官网
  • 注册免费的网站号号网站开发
  • 商城网站入驻系统wordpress数组
  • 业务网站制作无锡响应式网站设计
  • 做视频网站软件有哪些江苏网站建设代理商
  • 网页设计思路怎么写优化关键词的方法有哪些
  • 成都市网站建设服务商wordpress 添加目录
  • 重庆市建设公共资源交易中心网站首页做网站有现成的程序
  • 正能量不良网站直接进入网站内容标签设计
  • 制作网站公司服务器租赁一年的费用做网站自己买服务器吗
  • 门户网站建设美丽广州联享网站建设公司怎么样
  • 如何用两台电脑做服务器建网站南通网站建设方案
  • 深圳网站建设是哪个千万pv网站开发成本
  • 哪种源码做视频网站好用网站推广员
  • 免费开发网站大全北京市网站制作
  • 福建省建设厅官方网站wordpress登陆后段后端
  • 装修平台网站排名前十名有哪些网站字体特效代码
  • 网站营销外包公司wordpress 2017
  • 建行网站登录剑阁县规划和建设局网站
  • 海口企业自助建站seo基础知识考试
  • 网站代理浏览器7网页qq登录记录网站
  • 给单位做网站需要备案吗wordpress wp option