网站域名和密码,网站导航的作用,湖南网站建设公司速来磐石网络,罗定网站优化文章目录 一、Hello World#xff08;简单#xff09;模式1.导入依赖2.消息生产者3.消息消费者 二、Work Queues#xff08;工作#xff09;模式1.抽取工具类2.启动两个工作线程3.启动一个发送线程4.结果 总结 一、Hello World#xff08;简单#xff09;模式
在下图中简单模式1.导入依赖2.消息生产者3.消息消费者 二、Work Queues工作模式1.抽取工具类2.启动两个工作线程3.启动一个发送线程4.结果 总结 一、Hello World简单模式
在下图中“ P”是我们的生产者“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代 表使用者保留的消息缓冲区 博主这里使用JAVA实现。
1.导入依赖
!--指定 jdk 编译版本--
buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins
/build
dependencies!--rabbitmq 依赖客户端--dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.8.0/version/dependency!--操作文件流的一个依赖--dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.6/version/dependency
/dependencies2.消息生产者
public class Producer {private final static String QUEUE_NAME hello;public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.10.130);factory.setUsername(guest);factory.setPassword(guest);//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection factory.newConnection();Channel channel
connection.createChannel()) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String messagehello world;/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送完毕);}}
}3.消息消费者
public class Consumer {private final static String QUEUE_NAME hello;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.10.130);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();System.out.println(等待接收消息....);//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback(consumerTag,delivery)-{String message new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback(consumerTag)-{System.out.println(消息消费被中断);};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}二、Work Queues工作模式
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时这些工作线程将一起处理这些任务。
1.抽取工具类
public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception{//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.10.130);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();return channel;}
}2.启动两个工作线程
public class Worker01 {private static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();DeliverCallback deliverCallback(consumerTag,delivery)-{String receivedMessage new String(delivery.getBody());System.out.println(接收到消息:receivedMessage);};CancelCallback cancelCallback(consumerTag)-{System.out.println(consumerTag消费者取消消费接口回调逻辑);};System.out.println(C2 消费者启动等待消费......);channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}3.启动一个发送线程
public class Task01 {private static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception {try(Channel channelRabbitMqUtils.getChannel();) {channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受信息Scanner scanner new Scanner(System.in);while (scanner.hasNext()){String message scanner.next();channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(发送消息完成:message);}}}
}4.结果
通过程序执行发现生产者总共发送 4 个消息消费者 1 和消费者 2 分别分得两个消息并且 是按照有序的一个接收一次消息 总结
以上就是RabbitMQ 核心部分之简单模式和工作模式的相关知识希望对你有所帮助。