RabbitMQ 消費端的限流策略
假設一個場景,由於我們的消費端突然全部不可用了,導致 rabbitMQ 伺服器上有上萬條未處理的訊息,這時候如果沒做任何現在,隨便開啟一個消費端客戶端,就會導致巨量的訊息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多的資料,就會導致消費端變得巨卡,有可能直接崩潰不可用了。所以在實際生產中,限流保護是很重要的。
rabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息(通過基於 consume 或者 channel 設定 QOS 的值)未被確認前,不進行消費新的訊息。關鍵程式碼就是在宣告消費者程式碼裡面的
void basicQos(unit prefetchSize , ushort prefetchCount, bool global ) 複製程式碼
-
prefetchSize:0
-
prefetchCount:會告訴 RabbitMQ 不要同時給一個消費者推送多於 N 個訊息,即一旦有 N 個訊息還沒有 ack,則該 consumer 將 block 掉,直到有訊息 ack
-
global:true、false 是否將上面設定應用於 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別
備註:prefetchSize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。
程式碼演示:
程式碼地址:https://github.com/hmilyos/rabbitmqdemo.gitrabbitmq-api 專案下 複製程式碼
生產端程式碼基本沒變化,改了 exchange 和 routingKey 而已
public class Procuder { private static final Logger log = LoggerFactory.getLogger(Procuder.class); public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST); connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT); connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String msg = "Hello RabbitMQ limit Message"; for(int i = 0; i < 5; i ++){ log.info("生產端傳送:{}", msg + i); channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes()); } } } 複製程式碼
autoAck 設定為 false **
增加 ** channel.basicQos(0, 1, false);
完整的消費端程式碼如下
/** * 使用自定義消費者 */ public class Consumer { private static final Logger log = LoggerFactory.getLogger(Consumer.class); public static final String EXCHANGE_NAME = "test_qos_exchange"; public static final String EXCHANGE_TYPE = "topic"; public static final String ROUTING_KEY_TYPE = "qos.#"; public static final String ROUTING_KEY = "cqos.save"; public static final String QUEUE_NAME = "test_qos_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST); connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT); connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 通過Connection建立一個新的Channel Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE); /** * prefetchSize:0 prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個訊息,限速N個 即一旦有 N 個訊息還沒有 ack,則該 consumer 將 block 掉,直到有訊息 ack 回來,你再發送 N 個過來 global:true\false 是否將上面設定應用於channel級別,false是consumer級別 prefetchSize 和global這兩項,rabbitmq沒有實現,暫且不研究 */ channel.basicQos(0, 1, false); //使用自定義消費者 //1 限流方式第一件事就是 autoAck設定為 false //使用自定義消費者 channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel)); log.info("消費端啟動成功"); } } 複製程式碼
自定義消費者
public class MyConsumer extends DefaultConsumer { private static final Logger log = LoggerFactory.getLogger(MyConsumer.class); private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag,//消費者標籤 Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { log.info("------limit-----consume message----------"); log.info("consumerTag: " + consumerTag); log.info("envelope: " + envelope); log.info("properties: " + properties); log.info("body: " + new String(body)); //一定要手動ACK回去 //channel.basicAck(envelope.getDeliveryTag(), false); } } 複製程式碼
然後啟動消費端,上管控臺檢視 test_qos_exchange 和 test_qos_queue 是否生成了

確認 test_qos_exchange 上綁定了 test_qos_queue

啟動生產端傳送 5 條訊息

發現消費端只打印了一條訊息

從管控臺上也看到總共 5 條訊息,有 4 條等待著,一條消費了但是沒有 ack 回去

修改自定義消費者裡面的程式碼,如下所示
public class MyConsumer extends DefaultConsumer { private static final Logger log = LoggerFactory.getLogger(MyConsumer.class); private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag,//消費者標籤 Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { log.info("------limit-----consume message----------"); log.info("consumerTag: " + consumerTag); log.info("envelope: " + envelope); log.info("properties: " + properties); log.info("body: " + new String(body)); //一定要手動ACK回去 channel.basicAck(envelope.getDeliveryTag(), false); } } 複製程式碼
重啟消費端,看到消費端就按照一條一條消費,並且 ACK 回去了


如上所示就是簡單的RabbitMQ消費端的限流策略