电商网站多少钱,做欧美市场的网站,wordpress foundation,如何介绍设计的网站模板已经不需要为RabbitMQ交换机的离去而感到伤心了#xff0c;接下来登场的是RabbitMQ-核心特性!!! 文章目录 核心特性消息过期机制消息确认机制死信队列 核心特性
消息过期机制
官方文档#xff1a;https://www.rabbitmq.com/ttl.html 可以给每条消息指定一个有效期#xf…已经不需要为RabbitMQ交换机的离去而感到伤心了接下来登场的是RabbitMQ-核心特性!!! 文章目录 核心特性消息过期机制消息确认机制死信队列 核心特性
消息过期机制
官方文档https://www.rabbitmq.com/ttl.html 可以给每条消息指定一个有效期一段时间内未被消费者处理就过期了 适用场景清理过期数据
1给队列中的所有消息指定过期时间
MapString, Object args new HashMapString, Object();
args.put(x-message-ttl, 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);如果在过期时间内还没有消费者取消息消息才会过期 注意如果消息已经接收到但是没确认是不会过期的 消费者中给队列中所有消息设置过期时间
public class TtlConsumer {private final static String QUEUE_NAME ttl_queue;public static void main(String[] argv) throws Exception {// 创建连接ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();// 创建队列指定消息过期参数MapString, Object args new HashMapString, Object();args.put(x-message-ttl, 5000);// args 指定参数channel.queueDeclare(QUEUE_NAME, false, false, false, args);System.out.println( [*] Waiting for messages. To exit press CTRLC);// 定义了如何处理消息DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println( [x] Received message );};// 消费消息会持续阻塞channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - { });}
}2给某条消息指定过期时间
// 给消息指定过期时间
AMQP.BasicProperties properties new AMQP.BasicProperties.Builder().expiration(1000).build();
channel.basicPublish(my-exchange, routing-key, properties, message.getBytes(StandardCharsets.UTF_8));生产者给某条消息指定过期时间
public class TtlProducer {private final static String QUEUE_NAME ttl_queue;public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);
// factory.setUsername();
// factory.setPassword();
// factory.setPort();// 建立连接、创建频道try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 发送消息String message Hello World!;// 给消息指定过期时间AMQP.BasicProperties properties new AMQP.BasicProperties.Builder().expiration(1000).build();channel.basicPublish(my-exchange, routing-key, properties, message.getBytes(StandardCharsets.UTF_8));System.out.println( [x] Sent message );}}
}消息确认机制
官方文档https://www.rabbitmq.com/confirms.html 为保证消息成功被消费rabbitmq提供了消息确认机制当消费者收到消息后要给一个成功反馈 ●ack消费成功 ●nack消费失败 ●reject拒绝 如果告诉 rabbitmq 服务器消费成功服务器才会放心地移除消息。 支持配置 autoack会自动执行 ack 命令接收到消息立刻就成功了。 channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag - {});一般情况建议 autoack 改为 false根据实际情况去手动确认。 指定确认某条消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);第二个参数 multiple 批量确认是指是否要一次性确认所有的历史消息直到当前这条消息 指定拒绝某条消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);第 3 个参数表示是否重新入队可用于重试
死信队列
官方文档https://www.rabbitmq.com/dlx.html 为了保证消息的可靠性比如每条消息都成功消费需要提供一个容错机制即失败的消息怎么处理 死信过期的消息拒收的消息消息队列满了处理失败的消息的统称 死信队列专门处理死信的队列 死信交换机专门给死信队列转发消息的交换机 示例场景 实现 1创建死信交换机和死信队列并且绑定关系 2给失败之后需要容错处理的队列绑定死信交换机 3可以给要容错的队列指定死信之后的转发规则死信应该再转发到哪个死信队列 4可以通过程序来读取死信队列中的消息从而进行处理 生产者代码
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.Scanner;public class DlxDirectProducer {//死信交换机private static final String DEAD_EXCHANGE_NAME dlx-direct-exchange;//工作交换机private static final String WORK_EXCHANGE_NAME direct2-exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DEAD_EXCHANGE_NAME, direct);// 创建laoban死信队列String queueName laoban_dlx_queue;channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, DEAD_EXCHANGE_NAME, laoban);//创建waibao死信队列String queueName2 waibao_dlx_queue;channel.queueDeclare(queueName2, true, false, false, null);channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, waibao);DeliverCallback laobanDeliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println( [laoban] Received delivery.getEnvelope().getRoutingKey() : message );};DeliverCallback waibaoDeliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println( [waibao] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(queueName, false, laobanDeliverCallback, consumerTag - {});channel.basicConsume(queueName2, false, waibaoDeliverCallback, consumerTag - {});Scanner scanner new Scanner(System.in);while (scanner.hasNext()) {String userInput scanner.nextLine();String[] strings userInput.split( );if (strings.length 1) {continue;}String message strings[0];String routingKey strings[1];channel.basicPublish(WORK_EXCHANGE_NAME, routingKey, null, message.getBytes(UTF-8));System.out.println( [x] Sent message with routing: routingKey );}}}
}消费者代码
在这里插入代码片package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class DlxDirectConsumer {private static final String DEAD_EXCHANGE_NAME dlx-direct-exchange;private static final String WORK_EXCHANGE_NAME direct2-exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(WORK_EXCHANGE_NAME, direct);//小狗的死信要转发到waibao这个死信队列// 指定死信队列参数MapString, Object args new HashMap();// 要绑定到哪个交换机args.put(x-dead-letter-exchange, DEAD_EXCHANGE_NAME);// 指定死信要转发到哪个死信队列args.put(x-dead-letter-routing-key, waibao);// 创建队列随机分配一个队列名称String queueName xiaodog_queue;channel.queueDeclare(queueName, true, false, false, args);channel.queueBind(queueName, WORK_EXCHANGE_NAME, xiaodog);//小猫的死信要转发到laoban这个死信队列MapString, Object args2 new HashMap();args2.put(x-dead-letter-exchange, DEAD_EXCHANGE_NAME);args2.put(x-dead-letter-routing-key, laoban);// 创建队列随机分配一个队列名称String queueName2 xiaocat_queue;channel.queueDeclare(queueName2, true, false, false, args2);channel.queueBind(queueName2, WORK_EXCHANGE_NAME, xiaocat);System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback xiaoyuDeliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);// 拒绝消息并且不要重新将消息放回队列只拒绝当前消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println( [xiaodog] Received delivery.getEnvelope().getRoutingKey() : message );};DeliverCallback xiaopiDeliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println( [xiaocat] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(queueName, false, xiaoyuDeliverCallback, consumerTag - {});channel.basicConsume(queueName2, false, xiaopiDeliverCallback, consumerTag - {});}
}