石家庄外贸做网站,手机怎么安装网站程序,一键建站免费,营业执照公示网消费端限流1. 为什么要对消费端限流假设一个场景#xff0c;首先#xff0c;我们 Rabbitmq 服务器积压了有上万条未处理的消息#xff0c;我们随便打开一个消费者客户端#xff0c;会出现这样情况: 巨量的消息瞬间全部推送过来#xff0c;但是我们单个客户端无法同时处理这…消费端限流1. 为什么要对消费端限流假设一个场景首先我们 Rabbitmq 服务器积压了有上万条未处理的消息我们随便打开一个消费者客户端会出现这样情况: 巨量的消息瞬间全部推送过来但是我们单个客户端无法同时处理这么多数据!当数据量特别大的时候我们对生产端限流肯定是不科学的因为有时候并发量就是特别大有时候并发量又特别少我们无法约束生产端这是用户的行为。所以我们应该对消费端限流用于保持消费端的稳定当消息数量激增的时候很有可能造成资源耗尽以及影响服务的性能导致系统的卡顿甚至直接崩溃。2.限流的 api 讲解RabbitMQ 提供了一种 qos (服务质量保证)功能即在非自动确认消息的前提下如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前不进行消费新的消息。/*** Request specific quality of service settings.* These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* param global true if the settings should be applied to the* entire channel rather than each consumer* throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize0单条消息大小限制0代表不限制prefetchCount一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息即一旦有 N 个消息还没有 ack则该 consumer 将 block 掉直到有消息 ack。globaltrue、false 是否将上面设置应用于 channel简单点说就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效设置为 true 的时候没有了限流功能因为 channel 级别尚未实现。注意prefetchSize 和 global 这两项rabbitmq 没有实现暂且不研究。特别注意一点prefetchCount 在 no_askfalse 的情况下才生效即在自动应答的情况下这两个值是不生效的。3.如何对消费端进行限流首先第一步我们既然要使用消费端限流我们需要关闭自动 ack将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);第三步在消费者的 handleDelivery 消费方法中手动 ack并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);这是生产端代码与前几章的生产端代码没有做任何改变主要的操作集中在消费端。import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class QosProducer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); factory.setVirtualHost(/); factory.setUsername(guest); factory.setPassword(guest); //2. 通过连接工厂来创建连接 Connection connection factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel connection.createChannel(); //4. 声明 String exchangeName test_qos_exchange; String routingKey item.add; //5. 发送 String msg this is qos msg; for (int i 0; i 10; i) { String tem msg : i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println(Send message : tem); } //6. 关闭连接 channel.close(); connection.close(); }}这里我们创建了两个消费者以方便验证限流api中的 global 参数设置为 true 时不起作用.。整体结构如下图所示两个 Consumer 都绑定在同一个队列上这样的话两个消费者将共同消费发送的10条消息。import com.rabbitmq.client.*;import java.io.IOException;public class QosConsumer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); factory.setVirtualHost(/); factory.setUsername(guest); factory.setPassword(guest); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通过连接工厂来创建连接 Connection connection factory.newConnection(); //3. 通过 Connection 来创建 Channel final Channel channel connection.createChannel(); //4. 声明 String exchangeName test_qos_exchange; String queueName test_qos_queue; String queueName1 test_qos_queue_1; String routingKey item.#; channel.exchangeDeclare(exchangeName, topic