1. 程式人生 > >【RabbitMQ】RabbitMQ的公平轉發和持久化

【RabbitMQ】RabbitMQ的公平轉發和持久化

問題:公平轉發

目前訊息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當訊息到達佇列進行轉發訊息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。

為了解決這樣的問題,我們可以使用basicQos方法,傳遞引數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條訊息。

換句話說,只有在消費者空閒的時候會發送下一條資訊。

排程分發訊息的方式,也就是告訴RabbitMQ每次只給消費者處理一條訊息,也就是等待消費者處理完畢並自己對剛剛處理的訊息進行確認之後,才傳送下一條訊息,防止消費者太過於忙碌,也防止它太過去清閒。

intprefetchCount = 1;

channel.basicQos(prefetchCount);


問題:持久化:

          訊息持久化在訊息宣告和通道釋出的時候來宣告他為持久化,同時在消費端宣告的時候也需要宣告為持久化。

          沒有持久化,重新啟動RabbitMQ,發現沒有了。

持久化之後,重新啟動RabbitMQ,發現是有的,執行消費者,可以獲取該訊息。

  1. package cn.itcast.rabbitmq.durable;  
  1. import com.rabbitmq.client.ConnectionFactory;  
  2. import
     com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.QueueingConsumer;  
  1. publicclass ClientReceive1  
  1. {
  2.     public static final String queue_name = "my_queue";  
  3.     public static final boolean autoAck = false;  
  4.     public static final boolean durable = true
    ;  
  5.     public static void main(String[] args) throws java.io.IOException,  
  6.         java.lang.InterruptedException  
  7.     {  
  8.         ConnectionFactory factory = new ConnectionFactory();  
  9.         factory.setHost("localhost");  
  10.         factory.setPort(5672);  
  11. // 設定賬號資訊,使用者名稱、密碼、vhost
  12.         factory.setVirtualHost("/taotao");  
  13.     factory.setUsername("taotao");  
  14.     factory.setPassword("taotao");  
  15.     Connection connection = factory.newConnection();  
  16.     Channel channel = connection.createChannel();  
  17.     channel.queueDeclare(queue_name, durable, falsefalsenull);  
  18.     System.out.println("Wait for message");  
  19.     channel.basicQos(1); // 訊息分發處理
  20.     QueueingConsumer consumer = new QueueingConsumer(channel);  
  21.     channel.basicConsume(queue_name, autoAck, consumer);  
  22.     while (true)  
  23.     {  
  24.         Thread.sleep(500);  
  25.         QueueingConsumer.Delivery deliver = consumer.nextDelivery();  
  26.         String message = new String(deliver.getBody());  
  27.         System.out.println("Message received:" + message);  
  28.         channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);  
  29.     }  
  30.     }  
  31. }

程式碼是 從world中貼上過來的,格式有點不好,請諒解!