云南建设人才网站首页,ppt模板免费的网站,公司网站建设多少费用济南兴田德润团队怎么样,佛山网站建设哪家专业目录 一:交换机
1:Direct交换机
1.1生产者端代码: 1.2:消费者端代码:
2:Topic主题交换机
2.1:生产者代码:
2.2:消费者代码: 二:核心特性
2.1:消息过期机制
2.1.1:给队列中的全部消息指定过期时间
2.1.2:给某条消息指定过期时间
2.2:死信队列 一:交换机
1:Direct交…目录 一:交换机
1:Direct交换机
1.1生产者端代码: 1.2:消费者端代码:
2:Topic主题交换机
2.1:生产者代码:
2.2:消费者代码: 二:核心特性
2.1:消息过期机制
2.1.1:给队列中的全部消息指定过期时间
2.1.2:给某条消息指定过期时间
2.2:死信队列 一:交换机
1:Direct交换机 绑定:让交换机和队列进行关联可以指定让交换机把什么样的消息发送给队列。
rountingkey:路由键控制消息要发送哪个队列。
特点:根据路由键指定要转发到指定的队列
场景:特定的消息指定给特定的队列
1.1生产者端代码:
我们规定通过控制台输入消息和路由来指定谁完成该任务。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class DirectProducer {private static final String EXCHANGE_NAME 2;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {//创建交换机的名称channel.exchangeDeclare(EXCHANGE_NAME, direct);Scanner scannernew Scanner(System.in);while(scanner.hasNext()){String userInputscanner.nextLine();String[] s userInput.split( );if(s.length1){continue;}//指定路由keyString messages[0];String routingKeys[1];//发布消息/*第一个参数:发布到哪个交换机第二个参数:路由键*/channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(UTF-8));System.out.println([x] Sentmessagewith rountingroutingKey );}}}//..} 1.2:消费者端代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class DirectProducer {private static final String EXCHANGE_NAME 2;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {//创建交换机的名称channel.exchangeDeclare(EXCHANGE_NAME, direct);Scanner scannernew Scanner(System.in);while(scanner.hasNext()){String userInputscanner.nextLine();String[] s userInput.split( );if(s.length1){continue;}//指定路由keyString messages[0];String routingKeys[1];//发布消息/*第一个参数:发布到哪个交换机第二个参数:路由键*/channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(UTF-8));System.out.println([x] Sentmessagewith rountingroutingKey );}}}//..}
运行结果: 2:Topic主题交换机
特点:消息会根据一个模糊的路由键转发到指定的队列中。
场景:特定的一类消息只交给特定的一类系统(程序来处理)。
绑定关系:模糊匹配消息队列 *:匹配一个单词 #:匹配0个或多个单词 2.1:生产者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class TopicProducer {private static final String EXCHANGE_NAME 3;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, topic);Scanner scannernew Scanner(System.in);while(scanner.hasNext()){String userInputscanner.nextLine();String[] s userInput.split( );if(s.length1){continue;}//指定路由keyString messages[0];String routingKeys[1];//发布消息/*第一个参数:发布到哪个交换机第二个参数:路由键*/channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(UTF-8));System.out.println([x] Sentmessagewith rountingroutingKey );}}}
}2.2:消费者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class TopicConsumer {private static final String EXCHANGE_NAME 3;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, topic);//创建消息队列String queueNamefronted_queue;channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,#.前端.#);String queueName2backed-_queue;channel.queueDeclare(queueName2,true,false,false,null);channel.queueBind(queueName2,EXCHANGE_NAME,#.后端.#);String queueName3product_queue;channel.queueDeclare(queueName3,true,false,false,null);channel.queueBind(queueName3,EXCHANGE_NAME,#.产品.#);System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback1 (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [前端] Received delivery.getEnvelope().getRoutingKey() : message );};DeliverCallback deliverCallback2 (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [后端] Received delivery.getEnvelope().getRoutingKey() : message );};DeliverCallback deliverCallback3 (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [产品] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(queueName, true, deliverCallback1, consumerTag - { });channel.basicConsume(queueName2, true, deliverCallback2, consumerTag - { });channel.basicConsume(queueName3, true, deliverCallback3, consumerTag - { });}}运行结果: 二:核心特性
2.1:消息过期机制
特点:给每条消息指定一个有效期一段时间内未被消费就过期了。
2.1.1:给队列中的全部消息指定过期时间
在消费者中对于队列的全部消息指定过期时间如果在过期时间内还没有消费者取消息消息才会过期如果消息已经接收到但是没确认是不会过期的。 public class TTLConsumer {private final static String QUEUE_NAME ttl_queue;public static void main(String[] argv) throws Exception {//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);//创建频道提供通信Connection connection factory.newConnection();Channel channel connection.createChannel();//指定消息队列的过期时间MapString ,Object argsnew HashMap();args.put(x-message-ttl,5000);//args:指定参数channel.queueDeclare(QUEUE_NAME, false, false,false, args);System.out.println( [*] Waiting for messages. To exit press CTRLC);//如何处理消息DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - { });}
}2.1.2:给某条消息指定过期时间 //在发送者这边设置过期时间
public class TTLProducer {private final static String QUEUE_NAME ttl_queue;public static void main(String[] argv) throws Exception {//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();//频道相当于客户端(jdbcClient,redisClient),提供了和消队列server建立通信程序通过channel进行发送消息Channel channel connection.createChannel()) {//创建消息队列第二个参数(durable):是否开启持久化,第三个参数exclusiove:是否允许当前这个创建消息队列的//连接操作消息队列 第四个参数:没有人使用队列是否需要删除String message Hello World!;//给消息指定过期时间AMQP.BasicProperties propertiesnew AMQP.BasicProperties.Builder().expiration(1000).build();channel.basicPublish(, QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));System.out.println( [x] Sent message );}}2.2:死信队列
为了保证消息的可靠性比如每条消息都成功消费需要提供一个容错机制即失败的消息怎么处理相当于死信。
死信:过期的消息拒收的消息处理失败的消息消息队列满了统称为死信。
死信队列:处理死信的队列。
死信交换机:给死信队列发送消息的交换机也存在路由绑定。 a:创建死信交换机和死信队列 //声明死信交换机channel.exchangeDeclare(WORK_NAME,direct);//声明死信队列String queueNameboss_queue;channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,EXCHANGE_Name,boss);String queueName2waibao_queue;channel.queueDeclare(queueName2, false, false, false, null);channel.queueBind(queueName2,EXCHANGE_Name,waibao); b:给失败后的需要容错的队列绑定死信交换机 //声明交换机channel.exchangeDeclare(WORK_NAME, direct);MapString,Object mapnew HashMap();//声明要绑定的死信交换机map.put(x-dead-letter-exchange,DEAD_EXCHANGE_NAME);//声明要绑定的死信队列map.put(x-dead-letter-routing-key,waibao_queue);//创建消息队列String queueNamexiaodog_queue;channel.queueDeclare(queueName,true,false,false,map);channel.queueBind(queueName,WORK_NAME,xiaodog);MapString,Object map2new HashMap();//声明要绑定的死信交换机map2.put(x-dead-letter-exchange,DEAD_EXCHANGE_NAME);map2.put(x-dead-letter-routing-key,boss_queue);String queueName2xiaocat_queue;channel.queueDeclare(queueName2,true,false,false,map2);channel.queueBind(queueName2,WORK_NAME,xiaocat);