整套网站建设视频教程,中国建设银行手机网站,泰安房产交易网官网,dw做网站投票百度上查的大部分都是一些很简单的单消费者或者单生产者的例子#xff0c;并且多是同一个服务器的配置#xff0c;本文的例子为多服务器配置下的消费生产和消费者配置。 参考资料#xff1a;https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/ht…百度上查的大部分都是一些很简单的单消费者或者单生产者的例子并且多是同一个服务器的配置本文的例子为多服务器配置下的消费生产和消费者配置。 参考资料https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_implementations 1、POM引入spring-cloud-starter-stream-rabbit dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId
/dependency 2、application.properties 通用配置 #rabbit的配置信息
spring.rabbitmq.addressesamqp://10.18.75.231:5672
spring.rabbitmq.usernameuser_admin
spring.rabbitmq.password12345678#下面这个配置优先级太高在配置中心分模块(分文件)的场景下后面的binder属性无法被覆盖如果有存在多个vhost的情况下建议将该属性注释掉spring.rabbitmq.virtual-hostboss 当存在多个binder时必须指定一个默认的binder # 设置一个默认的binder如果不配置将报错
spring.cloud.stream.defaultBinderboss 消费者配置 1 # 配置ecm消费者的服务器配置信息 2 spring.cloud.stream.binders.ecm.typerabbit3 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.addresses${spring.rabbitmq.addresses}4 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.username${spring.rabbitmq.username}5 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.password${spring.rabbitmq.password}6 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.virtual-hostecm7 8 #交易系统ECM的货柜模板变更消费者9 spring.cloud.stream.bindings.ecm_shop_template.binderecm
10 spring.cloud.stream.bindings.ecm_shop_template.destination这里填exchange的名字
11 #默认情况下同一个队列的只能被同一个group的消费者消费
12 spring.cloud.stream.bindings.ecm_shop_template.group这里是消费者的名称
13 spring.cloud.stream.bindings.ecm_shop_template.contentTypetext/plain
14 #指定该主题的类型为广播模式
15 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.exchangeTypefanout
16 #消费失败的消息放入dlq队列
17 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.autoBindDlqtrue
18 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.republishToDlqtrue 配置死信队列会在消费者出现异常的时候重试3(默认为3可以配置)次后将消息放入死信队列中效果如下 生产者配置 1 # BOSS消息生产者服务器配置2 spring.cloud.stream.binders.boss.typerabbit3 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.addresses${spring.rabbitmq.addresses}4 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.username${spring.rabbitmq.username}5 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.password${spring.rabbitmq.password}6 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.virtual-hostboss7 8 #BOSS基础信息生产者9 spring.cloud.stream.bindings.message_output.destinationexchange的名称
10 #exchange的类型为广播模式
11 spring.cloud.stream.rabbit.bindings.message_output.producer.exchangeTypefanout 下面是java代码 1、定义消息的Input和Output配置信息 1 import org.springframework.cloud.stream.annotation.Input;2 import org.springframework.cloud.stream.annotation.Output;3 import org.springframework.messaging.MessageChannel;4 5 /**6 * mq连接源定义7 * 8 * 其中类中的2个属性的值和properties里的配置需要一致9 **/
10 public interface MqMessageSource {
11 // BOSS生产者
12 String MESSAGE_OUTPUT message_output;
13 // ECM消费者
14 String ECM_SHOP_TEMPLATE_INPUT ecm_shop_template;
15
16 Output(MESSAGE_OUTPUT)
17 MessageChannel messageOutput();
18
19 Input(ECM_SHOP_TEMPLATE_INPUT)
20 MessageChannel messageInput();
21
22 } 2、消息消费 1 import org.springframework.beans.factory.annotation.Autowired;2 import org.springframework.cloud.stream.annotation.EnableBinding;3 import org.springframework.cloud.stream.annotation.StreamListener;4 import org.springframework.messaging.Message;5 6 import com.alibaba.fastjson.JSONObject;7 8 import lombok.extern.slf4j.Slf4j;9
10 /**
11 * MQ消费者
12 * author yangzhilong
13 *
14 */
15 Slf4j
16 EnableBinding(MqMessageSource.class)
17 public class MqMessageConsumer {
18
19 Autowired
20 private XXService xxService;
21
22 /**
23 * 消费ECM的货柜模板变更
24 * param message
25 */
26 StreamListener(MqMessageSource.ECM_SHOP_TEMPLATE_INPUT)
27 public void receive(MessageString message) {
28 log.info(接收货柜模板开始参数{}, JSONObject.toJSONString(message));
29 if (null message) {
30 return;
31 }
32 try {
33 String payload message.getPayload();
34 log.info(具体消息内容 {}, JSONObject.toJSONString(payload));
35 JSONObject jsonObject JSONObject.parseObject(payload);
36 ShopReqDto shopReqDto new ShopReqDto();
37 shopReqDto.setCode(jsonObject.getString(shopNo));
38 shopReqDto.setGoodsMarketTemplateId(jsonObject.getLong(goodsMarketTemplateId));
39 shopReqDto.setGoodsMarketTemplateName(jsonObject.getString(goodsMarketTemplateName));
40 ResponseResultString responseResult xxService.updateTemplateIdAndName(shopReqDto);
41 if(responseResult.isSuccess()){
42 log.info(【MQ消费货柜模板更新信息成功】);
43 }else{
44 log.error(【MQ消费货柜模板更新信息失败】,返回结果信息 JSONObject.toJSONString(responseResult));
45 }
46 } catch (Exception e) {
47 log.error(接收处理货柜模板MQ时出现异常:{}, e);
48 throw new RuntimeException(e);
49 }
50 }
51 } 3、消息生产者代码 1 import org.springframework.beans.factory.annotation.Autowired;2 import org.springframework.cloud.stream.annotation.EnableBinding;3 import org.springframework.cloud.stream.annotation.Output;4 import org.springframework.messaging.MessageChannel;5 import org.springframework.messaging.support.MessageBuilder;6 import com.alibaba.fastjson.JSON;7 import lombok.extern.slf4j.Slf4j;8 9 /**
10 * 消息生产者
11 *
12 **/
13 EnableBinding(MqMessageSource.class)
14 Slf4j
15 public class MqMessageProducer {
16 Autowired
17 Output(MqMessageSource.MESSAGE_OUTPUT)
18 private MessageChannel channel;
19
20
21 //品牌
22 public void sendBrandAdd(Brand brand) {
23 BossMessageBrand message new BossMessage();
24 message.setData(brand);
25 message.setOpType(MqMessageProducer.ADD);
26 message.setDataType(MqMessageProducer.BRAND);
27 channel.send(MessageBuilder.withPayload(JSON.toJSONString(message)).build());
28 log.info(【MQ发送内容】 JSON.toJSONString(message));
29 }
30 } 转载于:https://www.cnblogs.com/yangzhilong/p/7904461.html