网站建设步骤 文档,沈阳无痛人流大概多少费用,怎么用h5网站做动效,网易企业邮箱免费注册1、简介
Spring Cloud Stream是一个用来为微服务应用构建 消息驱动 能力的框架。通过使用 Spring Cloud Strea m #xff0c;可以有效简化开发人员对消息中间件的使用复杂度#xff0c;降低代码与消息中间件间的耦合度#xff0c;屏蔽消息中间件 之 间的差异性#xff0c;…1、简介
Spring Cloud Stream是一个用来为微服务应用构建 消息驱动 能力的框架。通过使用 Spring Cloud Strea m 可以有效简化开发人员对消息中间件的使用复杂度降低代码与消息中间件间的耦合度屏蔽消息中间件 之 间的差异性让开发人员可以有更多的精力关注于核心业务逻辑的处理。
主要有以下几个组件
1、目的地绑定器Destination Binders负责提供与外部消息系统集成的组件。
2、固定器Bindings介于外部消息系统与应用程序间的桥梁 这个应用程序提供了生产者和消费者的消息 由 Destination Binders 创建。
3、输入管道Input Bindings消费者通过Input Bindings 连接 Binder 而 Binder 与 MQ 连接即消费者通过 Input Bindings 从 MQ 读取数据。
4、输出管道Output Bindings生产者通过Output Bindings 连接 Binder 而 Binder 与 MQ 连接即生产者通过 Output Bindings 向 MQ 写入数据。
5、消息Message生产者和消费者使用的规范数据结构用于与 Binders 通信从而通过外部消息系统与其他应用程序通信。
2、具体应用示例1MQ使用kafka
引入依赖
dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-kafka/artifactId
/dependency
2.1、生产者
配置文件
server:port: 8090
spring:cloud:stream:kafka:binder:brokers: 192.168.30.88:9092,192.168.30.89:9092bindings:producer-out-0:destination: topic1content-type: application/json
代码实现
Autowired
private StreamBridge streamBridge;GetMapping(/test/send)
public String sendMsg(RequestParam(msg) String msg){MapString , Object map new HashMap();map.put(tag, tags);MessageHeaders headers new MessageHeaders(map);// 封装消息MessageString message MessageBuilder.createMessage(msg, headers);//发送消息streamBridge.send(producer-out-0, message);return msg;
}
2.2、消费者
配置文件
server:port: 8091
spring:cloud:stream:kafka:binder:brokers: 192.168.30.88:9092,192.168.30.89:9092function:definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样bindings:consumer-in-0:destination: topic1content-type: application/json
代码实现
// 向容器中添加ConsumerMessageString类型的bean即可
Bean
public ConsumerMessageString consumer(){return msg - {System.out.println(接收到消息 msg.getPayload());};
}
3、具体应用示例2MQ使用Rocketmq
引入依赖
dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactId
/dependency
代码实现
Autowired
private StreamBridge streamBridge;GetMapping(/test/send)
public String sendMsg(RequestParam(msg) String msg){MapString , Object map new HashMap();map.put(MessageConst.PROPERTY_TAGS, tags);MessageHeaders headers new MessageHeaders(map);// 封装消息MessageString message MessageBuilder.createMessage(msg, headers);//发送消息streamBridge.send(producer-out-0, message);return JSON.toJSONString(message);
}
3.2、消费者
配置文件
server:port: 8091
spring:cloud:stream:rocketmq:binder:name-server: 192.168.30.88:9876function:definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样bindings:consumer-in-0:destination: topic1content-type: application/json
代码实现:
// 向容器中添加ConsumerMessageString类型的bean即可
Bean
public ConsumerMessageString consumer(){return msg - {System.out.println(接收到消息 msg.getPayload());};
}
注
1、在spring-cloud-stream 3.1.0之前的版本还有采用定义Source、Sink等方式编写消息生产者和消费者在3.1.0以后的版本中弃用StreamListener的方式而采用函数式编程的方式接入使用StreamBrige来进行发送。
2、注意binding的名称命名规则
例如上面的代码中定义的consumer。
# 输入: 方法名 -in- index
# 输出: 方法名 -out- index
总结本文介绍Stream统一消息中间件的模型给出基于kafka和Rocketmq两种消息中间件模型下的使用案例以及给出废弃使用老版本的Source、Sink模式解释。帮助大家快速上手Stream的使用。 本人是一个从小白自学计算机技术对运维、后端、各种中间件技术、大数据等有一定的学习心得想获取自学总结资料pdf版本或者希望共同学习关注微信公众号上了年纪的小男孩。后台回复相应技术名称/技术点即可获得。本人学习宗旨学会了就要免费分享