销售网站建设推广,wordpress的版本号,个人网站申请空间,百度热搜广告设计公司最近收到好几个类似的问题#xff1a;使用Spring Cloud Stream操作RabbitMQ或Kafka的时候#xff0c;出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实#xff0c;在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概…最近收到好几个类似的问题使用Spring Cloud Stream操作RabbitMQ或Kafka的时候出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及作用。
那么什么是消费组呢为什么要用消费组它解决什么问题呢摘录一段之前博文的内容来解答这些疑问 通常在生产环境我们的每个服务都不会以单节点的方式运行在生产环境当同一个服务启动多个实例的时候这些实例都会绑定到同一个消息通道的目标主题Topic上。默认情况下当生产者发出一条消息到绑定通道上这条消息会产生多个副本被每个消费者实例接收和处理出现上述重复消费问题。但是有些业务场景之下我们希望生产者产生的消息只被其中一个实例消费这个时候我们需要为这些消费者设置消费组来实现这样的功能。 详细也可查看原文消息驱动的微服务消费组。
下面通过一个例子来看看如何使用消费组
问题重现
构建消息消费端
第一步创建绑定接口绑定example-topic输入通道默认情况下会绑定到RabbitMQ的同名Exchange或Kafaka的同名Topic。
interface ExampleBinder { String NAME example-topic; Input(NAME) SubscribableChannel input();}第二步对上述输入通道创建监听与处理逻辑。
EnableBinding(ExampleBinder.class)public class ExampleReceiver { private static Logger logger LoggerFactory.getLogger(ExampleReceiver.class); StreamListener(ExampleBinder.NAME) public void receive(String payload) { logger.info(Received: payload); }}第三步创建应用主类和配置文件
SpringBootApplicationpublic class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); }}spring.application.namestream-consumer-groupserver.port0这里设置server.port0以方便在本地启动多实例来重现问题。
完成上述操作之后启动两个该应用的实例以备后续调用。
构建消息生产端
比较简单需要注意的是使用Output创建一个同名的输出绑定这样发出的消息才能被上述启动的实例接收到。具体实现如下
RunWith(SpringRunner.class)EnableBinding(value {ExampleApplicationTests.ExampleBinder.class})public class ExampleApplicationTests { Autowired private ExampleBinder exampleBinder; Test public void exampleBinderTester() { exampleBinder.output().send(MessageBuilder.withPayload(Produce a message from : http://blog.didispace.com).build()); } public interface ExampleBinder { String NAME example-topic; Output(NAME) MessageChannel output(); }}启动上述测试用例之后可以发现之前启动的两个实例都收到的消息并在日志中打印了Received: Produce a message from : http://blog.didispace.com。消息重复消费的问题成功重现
使用消费组解决问题
如何解决上述消息重复消费的问题呢我们只需要在配置文件中增加如下配置即可
spring.cloud.stream.bindings.example-topic.groupaaa当我们指定了某个绑定所指向的消费组之后往当前主题发送的消息在每个订阅消费组中只会有一个订阅者接收和消费从而实现了对消息的负载均衡。只所以之前会出现重复消费的问题是由于默认情况下任何订阅都会产生一个匿名消费组所以每个订阅实例都会有自己的消费组从而当有消息发送的时候就形成了广播的模式。
另外需要注意上述配置中example-topic是在代码中Output和Input中传入的名字。
代码示例
本文示例读者可以通过查看下面仓库的中的stream-consumer-group项目
GithubGitee
如果您对这些感兴趣欢迎star、follow、收藏、转发给予支持
以下专题教程也许您会有兴趣
Spring Boot基础教程Spring Cloud基础教程