平阳住房和城乡规划建设局网站,宿州高端网站建设公司哪家好,个人网站网址,论坛定制Work消息模型
* work模型#xff1a;
* 多个消费者消费同一个队列中的消息#xff0c;每个消费者获取到的消息唯一#xff0c;且只能消费一次
* 作用#xff1a;提高消息的消费速度#xff0c;避免消息的堆积
* 默认采用轮询的方式分发消息
* 如果某…Work消息模型
* work模型
* 多个消费者消费同一个队列中的消息每个消费者获取到的消息唯一且只能消费一次
* 作用提高消息的消费速度避免消息的堆积
* 默认采用轮询的方式分发消息
* 如果某个消费者处理消息慢会导致消息堆积生产者
package com.example.demo02.mq.work;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** author Allen* 4/10/2024 9:37 PM* version 1.0* description: work模式发送者** work模型* 多个消费者消费同一个队列中的消息每个消费者获取到的消息唯一且只能消费一次* 作用提高消息的消费速度避免消息的堆积* 默认采用轮询的方式分发消息* 如果某个消费者处理消息慢会导致消息堆积*/
public class WorkSender {public static void main(String[] args) throws Exception {
// 1获取连接Connection connection ConnectionUtils.getConnection();
// 2创建通道Channel channel connection.createChannel();
// 3声明队列// 参数1队列名称 参数2是否持久化 参数3是否排他性 参数4是否自动删除 参数5队列的属性channel.queueDeclare(work.queue, false, false, false, null);
// 4发送100条消息
for (int i 0; i 100; i) {String msg work模式消息 i;//休眠i*5毫秒TimeUnit.MILLISECONDS.sleep(i * 5);// 参数1交换机名称 参数2队列名称 参数3消息的其他属性 参数4消息的内容channel.basicPublish(, work.queue, null, msg.getBytes());System.out.println(work模式发送消息 msg);}
// 5关闭通道channel.close();
// 6关闭连接connection.close();}
}消费者1
能者多劳角色
package com.example.demo02.mq.work;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** author Allen* 4/10/2024 9:37 PM* version 1.0* description: work模式消费者1号*/
public class WorkReciver1 {public static void main(String[] args) throws Exception {// 1获取连接Connection connection ConnectionUtils.getConnection();// 2创建通道Channel channel connection.createChannel();// 3声明队列// 参数1队列名称 参数2是否持久化 参数3是否排他性 参数4是否自动删除 参数5队列的属性channel.queueDeclare(work.queue, false, false, false, null);// 4定义消费者消费消息// 参数1队列名称 参数2是否自动确认消息 参数3消费者对象Consumer consumer new DefaultConsumer(channel) {// 消费者接收消息调用此方法// 参数1消费者标签 参数2队列参数 参数3消息属性 参数4消息内容Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 获取消息String msg new String(body);System.out.println(work模式消费者1号接收消息 msg);channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(work.queue, false, consumer);}
}消费者2
消费能力差
package com.example.demo02.mq.work;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** author Allen* 4/10/2024 9:37 PM* version 1.0* description: work模式消费者1号*/
public class WorkReciver2 {public static void main(String[] args) throws Exception {// 1获取连接Connection connection ConnectionUtils.getConnection();// 2创建通道Channel channel connection.createChannel();// 3声明队列// 参数1队列名称 参数2是否持久化 参数3是否排他性 参数4是否自动删除 参数5队列的属性channel.queueDeclare(work.queue, false, false, false, null);//如果此消费者性能较差配置能者多劳指定一次获取几条信息消息消费成功后 ack之后 mq才会发送下一条消息channel.basicQos(1);// 4定义消费者消费消息// 参数1队列名称 参数2是否自动确认消息 参数3消费者对象Consumer consumer new DefaultConsumer(channel) {// 消费者接收消息调用此方法// 参数1消费者标签 参数2队列参数 参数3消息属性 参数4消息内容Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//模拟二号消费者处理消息慢try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 获取消息:执行业务String msg new String(body);System.out.println(work模式消费者2号接收消息 msg);channel.basicAck(envelope.getDeliveryTag(), false);}};// 参数1队列名称 参数2ACK是否自动确认 参数3消费者对象//必须手动确认消息否则会报406错误channel.basicConsume(work.queue, false, consumer);}
}结果
能者多劳