1. 程式人生 > >工作佇列work queues 公平分發(fair dispatch) And 訊息應答與訊息持久化

工作佇列work queues 公平分發(fair dispatch) And 訊息應答與訊息持久化

生產者

 1 package cn.wh.work;
 2 
 3 import cn.wh.util.RabbitMqConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 public class Send {
 8     private static final String QUEVE_NAME = "test_work_queue";
 9 
10     public static void main(String[] args) throws
Exception { 11 ; 12 Connection connection = RabbitMqConnectionUtil.getConnection(); 13 Channel channel = connection.createChannel(); 14 channel.queueDeclare(QUEVE_NAME, false, false, false, null); 15 16 int i1 =1 ; 17 channel.basicQos(i1); 18 for
(int i = 0; i < 50; i++) { 19 String msg = "hello " + i; 20 System.out.println(msg); 21 channel.basicPublish("", QUEVE_NAME, null, msg.getBytes()); 22 Thread.sleep(i * 20); 23 } 24 channel.close(); 25 connection.close();
26 } 27 }

 消費者 1

 1 package cn.wh.work;
 2 
 3 import cn.wh.util.RabbitMqConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class Recv1 {
 9     private static final String QUEVE_NAME = "test_work_queue";
10     public static void main(String[] args) throws Exception {
11         Connection connection = RabbitMqConnectionUtil.getConnection();
12       final   Channel channel = connection.createChannel();
13         int i1 =1 ;
14         channel.basicQos(i1);
15         DefaultConsumer consumer = new DefaultConsumer(channel) {
16             @Override
17             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
18                 String msg = new String(body);
19                 System.out.println("recv1"+msg);
20                 try {
21                     Thread.sleep(2000);
22                 } catch (InterruptedException e) {
23                     e.printStackTrace();
24                 }finally {
25                     System.out.println(1+"OK");
26                     channel.basicAck(envelope.getDeliveryTag(),false);
27                 }
28             }
29         };
30         boolean autoAck=false;
31         channel.basicConsume(QUEVE_NAME,autoAck,consumer);
32     }
33 }

消費者2

 1 package cn.wh.work;
 2 
 3 import cn.wh.util.RabbitMqConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class Recv2 {
 9 
10     private static final String QUEVE_NAME = "test_work_queue";
11     public static void main(String[] args) throws Exception {
12         Connection connection = RabbitMqConnectionUtil.getConnection();
13      final    Channel channel = connection.createChannel();
14         int i1 =1 ;
15         channel.basicQos(i1);
16         DefaultConsumer consumer = new DefaultConsumer(channel) {
17             @Override
18             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
19                 String msg = new String(body);
20                 System.out.println("recv2"+msg);
21 
22                 try {
23                     Thread.sleep(1000);
24                 } catch (InterruptedException e) {
25                     e.printStackTrace();
26                 }finally {
27                     System.out.println(2+"OK");
28                     channel.basicAck(envelope.getDeliveryTag(),false);
29                 }
30             }
31         };
32         boolean autoAck=false;
33         channel.basicConsume(QUEVE_NAME,autoAck,consumer);
34     }
35 
36 }

 這時候現象就是消費者 1 速度大於消費者

Message acknowledgment(訊息應答)

  • boolean autoAck = true;(自動確認模式)一旦 RabbitMQ 將訊息分發給了消費者,就會從記憶體中刪除。在這種情況下,如果殺死正在執行任務的消費者,會丟失正在處理的訊息,也會丟失已經分發給這個消費者但尚未處理的訊息。
  • boolean autoAck = false; (手動確認模式) 我們不想丟失任何任務,如果有一個消費者掛掉了,那麼我們應該將分發給它的任務交付給另一個消費者去處理。 為了確保訊息不會丟失,RabbitMQ 支援訊息應答。消費者送一個訊息應答,告訴 RabbitMQ 這個訊息已經接收並且處理完畢了。RabbitMQ 可以刪除它了。
  • 訊息應答是預設開啟的。也就是 boolean autoAck =false

 Message durability(訊息持久化)

我們已經瞭解瞭如何確保即使消費者死亡,任務也不會丟失。但是如果 RabbitMQ 伺服器停止,我們的任務仍將失去!當 RabbitMQ 退出或者崩潰,將會丟失佇列和訊息。除非你不要佇列和訊息。兩件事兒必須保證訊息不被丟失:我們必須把“佇列”和“訊息”設為持久化。

  1. boolean durable = true;

  2. channel.queueDeclare("test_queue_work", durable, false, false, null);那麼我們直接將程式裡面的 false 改成 true 就行了?? 不可以會 報異常 channel error; protocol method: #method<channel.close>(reply-code=406, replytext=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue_work'

  儘管這行程式碼是正確的,他不會執行成功。因為我們已經定義了一個名叫 test_queue_work 的未持久化的佇列。RabbitMQ 不允許使用不同的引數設定重新定義已經存在的佇列,並且會返回一個錯誤。一個快速的解決方案——就是宣告一個不同名字的佇列,比如 task_queue。或者我們登入控制檯將佇列刪除就可