百度推广电话号码,网站代码优化有哪些,宿迁房产网官网房价,好看的静态页面方式一#xff1a;Consumer设置exclusive 注意条件
作用于basic.consume不支持quorum queue 当同时有A、B两个消费者调用basic.consume方法消费#xff0c;并将exclusive设置为true时#xff0c;第二个消费者会抛出异常#xff1a;
com.rabbitmq.client.AlreadyClosedEx…方式一Consumer设置exclusive 注意条件
作用于basic.consume不支持quorum queue 当同时有A、B两个消费者调用basic.consume方法消费并将exclusive设置为true时第二个消费者会抛出异常
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #methodchannel.close(reply-code403, reply-textACCESS_REFUSED - queue test in vhost / in exclusive use, class-id60, method-id20)at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)at com.dms.rabbitmq.TopicSender.lambda$main$2(TopicSender.java:63)at java.base/java.lang.Thread.run(Thread.java:840)Spring AMQP 如何通过exclusive实现顺序消费 核心逻辑
while (!DirectMessageListenerContainer.this.started isRunning()) {this.cancellationLock.reset();try {for (String queue : queueNames) {consumeFromQueue(queue);}}catch (AmqpConnectException | AmqpIOException e) {long nextBackOff backOffExecution.nextBackOff();if (nextBackOff 0 || e.getCause() instanceof AmqpApplicationContextClosedException) {DirectMessageListenerContainer.this.aborted true;shutdown();this.logger.error(Failed to start container - fatal error or backOffs exhausted,e);this.taskScheduler.schedule(this::stop, Instant.now());break;}this.logger.error(Error creating consumer; retrying in nextBackOff, e);doShutdown();try {Thread.sleep(nextBackOff); // NOSONAR}catch (InterruptedException e1) {Thread.currentThread().interrupt();}continue; // initialization failed; try again having rested for backOff-interval}DirectMessageListenerContainer.this.started true;DirectMessageListenerContainer.this.startedLatch.countDown();
}抛出异常后会重试重试间隔、次数受recoveryInterval默认无限、recoveryBackOff控制
方式二single active consumer 原理 代码示例
Channel ch ...;
MapString, Object arguments newHashMapString, Object();
arguments.put(x-single-active-consumer, true);
ch.queueDeclare(my-queue, false, false, false, arguments);参考资料https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams