佛山网站设计外包,网站开发文档,什么事网站开发,甘肃网站建设公司哪家好为什么要使用MQ#xff1f;
在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动#xff0c;已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢#xff1f;
首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…为什么要使用MQ
在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢
首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中在多实例/分布式环境下其他的服务在订阅同一事件Topic时可以在各自的服务中进行消费最大化空闲服务的利用。 源码地址Gitee
整合RocketMQ
依赖版本 JDK 17 Spring Boot 3.2.0 RocketMQ-Client 5.0.4 RocketMQ-Starter 2.2.0
可以参考这篇进行RocketMQ安装
Spring Boot 3.0 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。
引入RocketMQ依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client-java/artifactIdversion5.0.4/version
/dependency
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.0/version
/dependency解决Spring Boot3不兼容 spring.factories
rocketmq-spring-boot-starter:2.2.2版本中
参考配置文件
# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group保证唯一group: event-mq-group# 发送消息超时时间默认3000sendMessageTimeout: 10000# 发送消息失败重试次数默认2retryTimesWhenSendFailed: 2# 异步消息重试此处默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker默认falseretryNextServer: false参考Issue 方法一 通过Import(RocketMQAutoConfiguration.class)在配置类中引入 方法二在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports。 文件内容为RocketMQ自动配置类路径org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
RocketMQ 使用
解决Spring Boot3不支持spring.factories的问题
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
Import(RocketMQAutoConfiguration.class)
SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}RocketMQ操作工具
RocketMQ Message实体
import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
Data
Builder
AllArgsConstructor
NoArgsConstructor
public class RocketMQMessageT implements Serializable {/*** 消息队列主题*/NotBlank(message MQ Topic 不能为空)private String topic;/*** 延迟级别*/Builder.Defaultprivate DelayLevel delayLevel DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private ListT messages;/*** 使用有序消息发送时指定发送到队列*/private String hashKey;/*** 任务Id用于日志打印相关信息*/Builder.Defaultprivate String taskId IdUtil.fastSimpleUUID();
}
RocketMQTemplate 二次封装
import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
Slf4j
Component
public class RocketMQService {Resourceprivate RocketMQTemplate rocketMQTemplate;Value(${rocketmq.producer.sendMessageTimeout})private int sendMessageTimeout;/*** 异步发送消息回调** param taskId 任务Id* param topic 消息主题* return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}], taskId, topic, sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {log.error(ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}], taskId, topic, throwable.getMessage());}};}/*** 发送同步消息使用有序发送请设置HashKey** param message 消息参数*/public T void syncSend(RocketMQMessageT message) {log.info(ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}], message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info(ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}],message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** param message 消息参数*/public T void syncSendBatch(RocketMQMessageT message) {log.info(ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}],message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info(ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}],message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息异步返回消息结果** param message 消息参数*/public T void asyncSend(RocketMQMessageT message) {log.info(ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}], message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** param message 消息参数*/public T void asyncSendBatch(RocketMQMessageT message) {log.info(ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}],message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息不关心返回结果容易消息丢失适合日志收集、不精确统计等消息发送;** param message 消息参数*/public T void sendOneWay(RocketMQMessageT message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** param message 消息体* param batch 是否为批量操作*/public T void sendOneWay(RocketMQMessageT message, boolean batch) {log.info((batch ? ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]: ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg - rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg - rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}定义RocketMQ消费者
import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
Component
Slf4j
RocketMQMessageListener(topic MQConfig.EVENT_TOPIC,consumerGroup MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(MQListener 接收消息 {}, message);}
}
定义测试类发送消息
import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
SpringBootTest
public class MQTest {Resourceprivate RocketMQService rocketMQService;Testpublic void sendMessage() {int count 1;while (count 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}测试