1. 程式人生 > >RabbitMQ指南之二:工作佇列(Work Queues)

RabbitMQ指南之二:工作佇列(Work Queues)

在上一章的指南中,我們寫了一個命名佇列:生產者往該命名佇列傳送訊息、消費從從該命名佇列中消費訊息。在本章中,我們將建立一個工作佇列,用於在多個工作者之間分配耗時的任務。工作佇列(即任務佇列)的主要思想是避免立即執行那些需要等他們執行完成的資源密集型任務。相反,我們將任務安排在稍後完成。我們將任務封裝為訊息並將其傳送到佇列,後臺執行的工作程序將取出任務並執行完成。如果你啟動了多個工作者,這些任務將在多個工作者之間分享。

這個概念也即我們說的非同步,在專案中,有時候一個簡單的Web請求,後臺要做一系統的操作,這時候,如果後臺執行完成之後再給前臺返回訊息將會導致瀏覽器頁面等待從而出現假死狀態。因此,通常的做法是,在這個Http請求到後臺,後臺獲取到正確的引數等資訊後立即給前臺返回一個成功標誌,然後後臺非同步地進行後續的操作。

1、準備

  本章中,我們將傳送字串訊息來模擬複雜的任務。這裡因為沒有一個真實的複雜任務,因此用Thread.sleep()方法來模擬複雜耗時的任務。我們用字串中的含點(“.")的數量來表示任務的複雜程度,一個點表示一秒鐘的耗時,例如:一個傳送”Hello ...“字串的任務將會耗時3秒鐘。

  我們可以直接將上一章中的Send.java程式碼拿過來修改,允許從命令列傳送訊息。本程式將會把任務除錯到工作佇列,因此,我們將類名改為NewTask.java:

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

   此時完整的NewTask.java程式碼為:

 1 public class NewTask {
 2 
 3     private final static String QUEUE_NAME = "hello";
 4 
 5     public static void main(String[] argv) throws IOException, TimeoutException {
 6 
 7         ConnectionFactory connectionFactory = new ConnectionFactory();
 8         connectionFactory.setHost("HOST");
 9 
10         try(Connection connection = connectionFactory.newConnection();
11             Channel channel = connection.createChannel()) {
12 
13             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
14 
15             String message = String.join(" ", argv);
16             
17             channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
18             System.out.println(" [x] Sent '" + message + "'");
19         }
20     }
21 }

  之前的Recv.java也要做一些修改:模擬字串訊息中的每個點耗時1秒鐘,它將處理傳送過來的訊息並執行任務,因此,我們修改為Work.java:

 1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 2   String message = new String(delivery.getBody(), "UTF-8");
 3 
 4   System.out.println(" [x] Received '" + message + "'");
 5   try {
 6     doWork(message);
 7   } finally {
 8     System.out.println(" [x] Done");
 9   }
10 };
11 boolean autoAck = true; // acknowledgment is covered below
12 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  我們模擬執行過程中耗時的偽任務:

1 private static void doWork(String task) throws InterruptedException {
2     for (char ch: task.toCharArray()) {
3         if (ch == '.') Thread.sleep(1000);
4     }
5 }

  此時完整的Work.java為:

 1 public class Worker {
 2     private final static String TASK_QUEUE_NAME = "hello";
 3 
 4     public static void main(String[] args) throws Exception {
 5 
 6         ConnectionFactory connectionFactory = new ConnectionFactory();
 7         connectionFactory.setHost("HOST");
 8 
 9         Connection connection = connectionFactory.newConnection();
10         Channel channel = connection.createChannel();
11         channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
12 
13         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
14             String message = new String(delivery.getBody(), "UTF-8");
15 
16             System.out.println(" [x] Received '" + message + "'");
17             try {
18                 doWork(message);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             } finally {
22                 System.out.println(" [x] Done");
23             }
24         };
25 
26         boolean autoAck = true; // acknowledgment is covered below
27         channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
28     }
29 
30     private static void doWork(String task) throws InterruptedException {
31         for (char ch: task.toCharArray()) {
32             if (ch == '.') Thread.sleep(1000);
33         }
34     }
35 }

2、迴圈排程

  使用工作佇列的優點之一是能夠輕鬆地進行並行化操作。假設我們在做一個後臺日誌收集系統,我們可以很容易地增加更多的Worker從而提高系統性能。

  首先,我們同時啟動兩個Worker,同樣地,我這裡也放到IDEA中啟動:

  

  接下來,我們先後啟動5個Task,並分別通過main()引數傳入五個字串訊息:

1 First message.
2 Second message..
3 Third message...
4 Fourth message....
5 Fifth message.....

  

   執行五個傳送任務之後,來看一下兩個Worker都接收到了什麼樣的訊息:

  

        

  預設情況下,RabbitMQ將按順序將每個訊息傳送給下一個使用者。平均每個消費者將得到相同數量的訊息。這種訊息的排程方式稱之為迴圈排程,你可以開啟更多的Worker來進行測試。

3、訊息回執

  因為消費者執行一個任務會有時間耗時,假設一個消費者在執行一個任務執行一半的時候掛掉了將會怎樣?訊息會不會因此丟失?在我們目前的程式碼裡,一旦RabbitMq將一條訊息轉發給了一個消費者後,將會立即將訊息刪除(注意Worker.java裡的autoAck),因此,在我們上面例子裡,如kill掉一個正在處理資料的Worker,那麼該資料將會丟失。不僅如此,所有那些指派給該Worker的還未處理的訊息也會丟失。

  但在實際工作的,我們並不希望一個Worker掛掉之後就會丟失資料,我們希望的是:如果該Worker掛掉了,所有轉發給該Worker的訊息將會重新轉發給其他Worker進行處理(包括處理了一半的訊息)。為了確保一條訊息永不丟失,RabbitMq支援訊息回執。消費者在接收到一條訊息,並且成功處理完成之後會給RabbitMq回發一條確認ack確認訊息,RabbitMq此時才會刪除該條訊息。

  如果一個Worker正在處理一條訊息時掛掉了(通道關閉、連線關閉、TCP連線丟失),它將沒有機會發送ack回執,RabbitMq就認為該訊息沒有消費成功,於是便會將該訊息重新放到佇列中,如果此時有其他消費者還是線上狀態,RabbitMq會立即將該條訊息再轉發給其他線上的消費者。這種機制可以保證任何訊息都不會丟失。

  預設情況下,需要手動進行訊息確認,在前面的例子裡,我們通過autoAck=true顯示地關閉了手動訊息確認,因此,RabbitMq將採用自動訊息確認的機制。現在,我們修改我們的程式,採用手動傳送回執的方式,當我們完成對訊息的處理後,再手動傳送回執確認:

 1 channel.basicQos(1); // accept only one unack-ed message at a time (see below)
 2 
 3 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 4   String message = new String(delivery.getBody(), "UTF-8");
 5 
 6   System.out.println(" [x] Received '" + message + "'");
 7   try {
 8     doWork(message);
 9   } finally {
10     System.out.println(" [x] Done");
11     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
12   }
13 };
14 boolean autoAck = false;
15 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  ack傳送通道必須和接收訊息的通道(channel)是同一個,如果嘗試通過一個不同的通道傳送ack回執,將會丟擲channel等級協議異常(官網說會丟擲異常,但是我在實際測試中並沒有拋異常,只是該條訊息得不到回執,從而也無法刪除)。

  一個常見的錯誤是忘了手動回執,雖然只是一個簡單的錯誤,但是帶來的後果卻是嚴重的,它將導致已經消費掉的消費不會被刪除,並且當消費該訊息的消費者在退出之後,RabbitMq會將該條訊息重新進行轉發,記憶體將被慢慢耗盡。我們可以通過正面的命令來檢查這種錯誤:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

  該命令有三列內容,第一列是在監聽的佇列名稱,第二列是Ready狀態的訊息數量,第三列是Unacked的訊息數量。

4、訊息的持久化

  在3中我們講解了如何保證當消費者掛掉之後訊息不被丟失,但是,如果RabbitMq服務或者部署RabbitMq的伺服器掛掉了之後,訊息仍然會丟失。當RabbitMq崩潰之後,它將會忘記所有的佇列和訊息,除非,有什麼機制讓RabbitMq將佇列資訊和訊息儲存下來。

  要確保訊息和佇列不會丟失,我們必須要確保兩件事情。

  首先,我們要確保RabbitMq永遠不丟失佇列,要做到這點,我們在定義的時候就需要告訴RabbitMq它是需要持久化的,通過指定durable引數實現:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

  雖然這個命令本身是正確的,但是在我們目前它不能工作。因為我們前面已經定義了一個非持久化的hello佇列,RabbitMq不允許重新定義一個已經存在的佇列(用不同的引數),否則會丟擲異常:

Exception in thread "main" java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
    at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:23)
    Suppressed: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10)
        at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:396)
        at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:292)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:607)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:541)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
        at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:29)

  要麼重啟RabbitMq讓該臨時佇列消失,要麼在控制檯將該佇列刪除,或者重新建立一個新的佇列:

1 boolean durable = true;
2 channel.queueDeclare("task_queue", durable, false, false, null);

  生產者和消費者要做同步修改。

  上面這一步,我們保證了佇列(task_quee)的持久化,此時,即便RabbitMq崩潰了也不會丟失該佇列,當RabbitMq重啟後將自動重新載入該佇列。

  其次,我們需要確保我們的訊息也被持久化,要做到這一點,在生產者釋出訊息的時候需要指定訊息的屬性為:PERSISTENT_TEXT_PLAIN。

1 import com.rabbitmq.client.MessageProperties;
2 
3 channel.basicPublish("", "task_queue",
4             MessageProperties.PERSISTENT_TEXT_PLAIN,
5             message.getBytes());

  注意,即便設定了訊息的持久化屬性也不能保證訊息會被100%地寫入到磁碟中,因為RabbitMq在接收到訊息和寫入到磁碟不是同步的,有可能訊息只是被寫入到快取中而還沒來和及寫入磁碟的時候,RabbitMq崩潰了,此時也會丟失訊息。但無論如何,比前面簡單的訊息佇列已經強大了很多。

5、公平排程

  您可能已經注意到,任務排程仍然不能完全按照我們希望的方式工作。舉個例子,在只有兩個Worker的環境中,奇數的訊息比較重,偶數的訊息比較輕時,一個Worker將會一直處於忙碌狀態,而另一個Worker將會一直處於空閒狀態,但RabbitMq並不知道這種情況,它會依然均衡地向兩個Worker傳遞訊息。

  發生這種情況是因為,當一個訊息進入佇列之後,RabbitMq只是盲目地將該第n個訊息轉發給第n個消費者,它並不關注每個消費者發了多少個回執。

  為了解決這個問題,我們可以通過呼叫basicQos方法,給它傳入1。這將告訴RabbitMq不要同時給一個佇列轉發多於1條的訊息,換句話說,在一個消費者沒有完成並回執前一條訊息時,不要再給它轉發其他訊息。

1 int prefetchCount = 1;
2 channel.basicQos(prefetchCount);

6、完整的程式碼

  一、NewTask.java

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.MessageProperties;
 5 
 6 public class NewTask {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     try (Connection connection = factory.newConnection();
14          Channel channel = connection.createChannel()) {
15         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
16 
17         String message = String.join(" ", argv);
18 
19         channel.basicPublish("", TASK_QUEUE_NAME,
20                 MessageProperties.PERSISTENT_TEXT_PLAIN,
21                 message.getBytes("UTF-8"));
22         System.out.println(" [x] Sent '" + message + "'");
23     }
24   }
25 
26 }

二、Worker.java

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.DeliverCallback;
 5 
 6 public class Worker {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     final Connection connection = factory.newConnection();
14     final Channel channel = connection.createChannel();
15 
16     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
17     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
18 
19     channel.basicQos(1);
20 
21     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
22         String message = new String(delivery.getBody(), "UTF-8");
23 
24         System.out.println(" [x] Received '" + message + "'");
25         try {
26             doWork(message);
27         } finally {
28             System.out.println(" [x] Done");
29             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
30         }
31     };
32     channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
33   }
34 
35   private static void doWork(String task) {
36     for (char ch : task.toCharArray()) {
37         if (ch == '.') {
38             try {
39                 Thread.sleep(1000);
40             } catch (InterruptedException _ignored) {
41                 Thread.currentThread().interrupt();
42             }
43         }
44     }
45   }
46 }

點關注,不迷路,這是一個程式設計師都想要關注的二維碼

相關推薦

RabbitMQ指南工作佇列Work Queues

在上一章的指南中,我們寫了一個命名佇列:生產者往該命名佇列傳送訊息、消費從從該命名佇列中消費訊息。在本章中,我們將建立一個工作佇列

RabbitMQ指南工作隊列Work Queues

chan 需要 講解 nts rec 並行化 之一 RoCE edge 原文:RabbitMQ指南之二:工作隊列(Work Queues)   在上一章的指南中,我們寫了一個命名隊列:生產者往該命名隊列發送消息、消費從從該命名隊列中消費消息。在本章中,我們將創建一個工作隊

RabbitMQ系列教程工作隊列Work Queues

我們 one 排隊 設置 gem 異步 actor 獲得 targe 原文:RabbitMQ系列教程之二:工作隊列(Work Queues) 今天開始RabbitMQ教程的第二講,廢話不多說,直接進入話題。 (使用.NET 客戶端 進行事例演示)

Appium+Python 自動化測試啟動APPAndroid篇

android版本 inf 環境 哪些 aapt code div port ons 前一篇寫了環境安裝,這篇記錄是記錄如何啟動APP以及啟動APP之前要做哪些事。 Appium 啟動APP至少需要5個參數,分別是‘platformName‘,‘platformVersio

RabbitMQ指南發布/訂閱模式Publish/Subscribe

問題 除了 消息 模型 server fan 以及 color let   在上一章中,我們創建了一個工作隊列,工作隊列模式的設想是每一條消息只會被轉發給一個消費者。本章將會講解完全不一樣的場景: 我們會把一個消息轉發給多個消費者,這種模式稱之為發布-訂閱模式。   為了

RabbitMQ指南主題交換器Topic Exchange

vmw 有效 組成 oot alt basic env serve .com   在上一章中,我們完善了我們的日誌系統,用direct交換器替換了fanout交換器,使得我們可以有選擇性地接收消息。盡管如此,仍然還有限制:不能基於多個標準進行路由。在我們的日誌系統中,我們可

RabbitMQ指南路由Routing和直連交換機Direct Exchange

on() call basic play logs ued void emit 依賴 原文:RabbitMQ指南之四:路由(Routing)和直連交換機(Direct Exchange)  在上一章中,我們構建了一個簡單的日誌系統,我們可以把消息廣播給很多的消費者。在本章中

RabbitMQ指南釋出/訂閱模式Publish/Subscribe

在上一章中,我們建立了一個工作佇列,工作佇列模式的設想是每一條訊息只會被轉發給一個消費者。本章將會講解完全不一樣的場景: 我們會把

python並發編程多進程()互斥鎖同步鎖&進程其他屬性&進程間通信queue&生產者消費者模型

互斥 數據 socket pan copy src too 如果 搶票 一,互斥鎖,同步鎖 進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的, 競爭帶來的結果就是錯亂,如何控制,就是加鎖處理 part1:多個進程共享同

Linux驅動字元裝置工作原理未完

字元裝置驅動工作原理 系統整體工作原理 應用層->API->裝置驅動->硬體? API:open、read、write、close等? 驅動原始碼中提供真正的open、read、write、close等函式實體? file_

Spark程式設計指南向Spark運算元傳遞函式

文章目錄 向Spark運算元傳遞函式 Java的兩種方法 匿名內部類 建立類實現Function介面 Scala的兩種方法 傳遞匿名函式 定義全域性單例物件中的靜態方法

敏捷外包工程系列人員結構敏捷外包工程,敏捷開發,產品負責人,客戶價值

本文是敏捷外包工程系列的第二篇。(之一,之二,之三,之四)敏捷開發整體上適合小團隊、產品研發(所以才有product owner一稱)的環境,而外包軟體開發中常常存在的則相反,因此在建立團隊的時候要充分認識到這一點。下文提到“企業”時指軟體開發公司即乙方,而“客戶”指政府、銀

Docker下RabbitMQ四部曲細說RabbitMQ映象製作

本章是《Docker下RabbitMQ四部曲》系列的第二篇,將詳細簡述Docker下製作RabbitMQ映象的技術細節,包括以下內容: 1. 列舉製作RabbitMQ映象時用到的所有材料; 2. 編寫Dockerfile; 3. 編寫容器啟動時執行的指令碼

RabbitMQ系列教程釋出\/訂閱Publish\/Subscribe

在前一個教程中,我們建立了一個工作佇列。工作佇列背後的假設是每個任務會被交付給一個【工人】。在這一部分我們將做一些完全不同的事情--我們將向多個【消費者】傳遞資訊。這種模式被稱為“釋出/訂閱”。   為了說明這種模式,我們將構建一個簡單的日誌系統。它將包括兩個程式,第一個將發

docker容器自動化部署落地實踐搭建gitlab

在自動化部署工程的時候 version control這裡會起到一個很重要的作用 當我們完成一段程式碼需要部署的時候 那麼push到version control 讓他幫我們去告訴持續整合工具 說需要構建部署了 那麼持續整合工具才會去弄 本篇文章的gitlab版本非最新版本

框架源碼系列八Spring源碼學習Spring核心工作原理很重要

ted pos avi Edito 重要 explicit mon alt 構造函數 目錄:一、搞清楚ApplicationContext實例化Bean的過程二、搞清楚這個過程中涉及的核心類三、搞清楚IOC容器提供的擴展點有哪些,學會擴展四、學會IOC容器這裏使用的設計模式

日誌收集使用rsyslog v5版本進行日誌匯總

rsyslog日誌匯總rsyslog相關: 一般系統默認安裝的都是舊版本,如果不升級,使用v5版本的配置語法v5配置參照:https://www.rsyslog.com/doc/v5-stable/監聽端口:514(使用UDP協議,減少系統負載)自定義設備號使用約定:local0 ~ local7 loca

資料結構實現迴圈佇列C++版

資料結構實現(四):迴圈佇列(C++版) 1. 概念及基本框架 2. 基本操作程式實現 2.1 入隊操作 2.2 出隊操作 2.3 查詢操作 2.4 其他操作 3. 演算法複雜度分析 3.1 入

資料結構實現陣列佇列C++版

資料結構實現(三):陣列佇列(C++版) 1. 概念及基本框架 2. 基本操作程式實現 2.1 入隊操作 2.2 出隊操作 2.3 查詢操作 2.4 其他操作 3. 演算法複雜度分析 3.1 入

Boost.Interprocess使用手冊翻譯同步機制Synchronization mechanisms

六.          同步機制 同步機制概述 互斥量 條件變數 訊號量 升級互斥量 通過移動語義轉移鎖 檔案鎖 訊息佇列   同步