当前位置: 首页 > news >正文

佛山网站设计外包网站开发文档

佛山网站设计外包,网站开发文档,什么事网站开发,甘肃网站建设公司哪家好为什么要使用MQ#xff1f; 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动#xff0c;已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢#xff1f; 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…为什么要使用MQ 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中在多实例/分布式环境下其他的服务在订阅同一事件Topic时可以在各自的服务中进行消费最大化空闲服务的利用。 源码地址Gitee 整合RocketMQ 依赖版本 JDK 17 Spring Boot 3.2.0 RocketMQ-Client 5.0.4 RocketMQ-Starter 2.2.0 可以参考这篇进行RocketMQ安装 Spring Boot 3.0 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。 引入RocketMQ依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client-java/artifactIdversion5.0.4/version /dependency dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.0/version /dependency解决Spring Boot3不兼容 spring.factories rocketmq-spring-boot-starter:2.2.2版本中 参考配置文件 # RocketMQ 配置 rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group保证唯一group: event-mq-group# 发送消息超时时间默认3000sendMessageTimeout: 10000# 发送消息失败重试次数默认2retryTimesWhenSendFailed: 2# 异步消息重试此处默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker默认falseretryNextServer: false参考Issue 方法一 通过Import(RocketMQAutoConfiguration.class)在配置类中引入 方法二在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports。 文件内容为RocketMQ自动配置类路径org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration RocketMQ 使用 解决Spring Boot3不支持spring.factories的问题 import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Import;/*** 启动类*/ Import(RocketMQAutoConfiguration.class) SpringBootApplication public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);} }RocketMQ操作工具 RocketMQ Message实体 import cn.hutool.core.util.IdUtil; import jakarta.validation.constraints.NotBlank; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ObjectUtils; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable; import java.util.List;/*** RocketMQ 消息*/ Data Builder AllArgsConstructor NoArgsConstructor public class RocketMQMessageT implements Serializable {/*** 消息队列主题*/NotBlank(message MQ Topic 不能为空)private String topic;/*** 延迟级别*/Builder.Defaultprivate DelayLevel delayLevel DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private ListT messages;/*** 使用有序消息发送时指定发送到队列*/private String hashKey;/*** 任务Id用于日志打印相关信息*/Builder.Defaultprivate String taskId IdUtil.fastSimpleUUID(); } RocketMQTemplate 二次封装 import com.yiyan.study.domain.RocketMQMessage; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/ Slf4j Component public class RocketMQService {Resourceprivate RocketMQTemplate rocketMQTemplate;Value(${rocketmq.producer.sendMessageTimeout})private int sendMessageTimeout;/*** 异步发送消息回调** param taskId 任务Id* param topic 消息主题* return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}], taskId, topic, sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {log.error(ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}], taskId, topic, throwable.getMessage());}};}/*** 发送同步消息使用有序发送请设置HashKey** param message 消息参数*/public T void syncSend(RocketMQMessageT message) {log.info(ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}], message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info(ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}],message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** param message 消息参数*/public T void syncSendBatch(RocketMQMessageT message) {log.info(ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}],message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info(ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}],message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息异步返回消息结果** param message 消息参数*/public T void asyncSend(RocketMQMessageT message) {log.info(ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}], message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** param message 消息参数*/public T void asyncSendBatch(RocketMQMessageT message) {log.info(ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}],message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息不关心返回结果容易消息丢失适合日志收集、不精确统计等消息发送;** param message 消息参数*/public T void sendOneWay(RocketMQMessageT message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** param message 消息体* param batch 是否为批量操作*/public T void sendOneWay(RocketMQMessageT message, boolean batch) {log.info((batch ? ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]: ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg - rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg - rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}} }定义RocketMQ消费者 import com.yiyan.study.constants.MQConfig; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;/*** MQ消息监听*/ Component Slf4j RocketMQMessageListener(topic MQConfig.EVENT_TOPIC,consumerGroup MQConfig.EVENT_CONSUMER_GROUP) public class MQListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(MQListener 接收消息 {}, message);} } 定义测试类发送消息 import cn.hutool.core.thread.ThreadUtil; import com.yiyan.study.constants.MQConfig; import com.yiyan.study.domain.RocketMQMessage; import com.yiyan.study.utils.RocketMQService; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/ SpringBootTest public class MQTest {Resourceprivate RocketMQService rocketMQService;Testpublic void sendMessage() {int count 1;while (count 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);} }测试
http://www.pierceye.com/news/359400/

相关文章:

  • wordpress搜索代码制做优化精灵
  • 连云港做网站推广东莞seo
  • 专业网站设计公司和普通设计公司的区别微信分销网站建设
  • 青海个人旅游网站建设网站建设教程软件下载
  • 做AMC12的题的网站龙华网站建设专业公司
  • 莱州网站制作友情链接交换形式
  • 如何编写网站做美食类网站现状
  • 一站式推广平台做家装模型的效果图网站
  • 企业电子商务网站开发实验报告苏州建筑设计公司排名
  • 网站的优化与网站建设有关吗网站先做移动站在做pc站可行吗
  • 河北网站制作公司电话建设网站的情况说明
  • 高校网站平台建设wordpress小工具不见了
  • 网站建设 会计处理北京垡头网站建设公司
  • 唐山网站制作案例网站建设中标
  • 网站开发培训费济南网络优化推广公司哪家好
  • 谷歌网站优化可以做物理题的网站
  • 公司的网站建设是什么部门品牌餐饮加盟网站建设
  • 深圳品牌网站建设公司哪家好学建网站 必须学那些知识
  • 国内设计网站推荐山东省建设安全生产协会网站
  • 南京专业网站开发团队如何用手机建网站
  • 在婚恋网站上做红娘怎么样正规网络推广服务
  • 网络媒体设计是做什么的西安网站优化公司
  • 有项目去哪里找投资人河南网站优化排名
  • 灯塔建设网站网上做流量对网站有什么影响
  • 网站模板 黑色建设网站安全措施
  • 临沂企业网站建设珠海建设网站的公司哪家好
  • 中国网站建设公司排行榜网站建设精美模板
  • 国家对于学校网站建设深圳网站建设公司网络服务
  • 承德建站公司福田庆三整鼻子好吗
  • 域名和网站关联seo优化是指