1. 程式人生 > >RabbitMQ入門:工作佇列(Work Queue)

RabbitMQ入門:工作佇列(Work Queue)

假設有這一些比較耗時的任務,按照上一次的那種方式,我們要一直等前面的耗時任務完成了之後才能接著處理後面耗時的任務,那要等多久才能處理完?別擔心,我們今天的主角--工作佇列就可以解決該問題。我們將圍繞下面這個索引展開:

  1. 什麼是工作佇列
  2. 程式碼準備
  3. 迴圈分發
  4. 訊息確認
  5. 公平分發
  6. 訊息持久化

廢話少說,直接展開。

一、什麼是工作佇列

工作佇列--用來將耗時的任務分發給多個消費者(工作者),主要解決這樣的問題:處理資源密集型任務,並且還要等他完成。有了工作佇列,我們就可以將具體的工作放到後面去做,將工作封裝為一個訊息,傳送到佇列中,一個工作程序就可以取出訊息並完成工作。如果啟動了多個工作程序,那麼工作就可以在多個程序間共享。

二、程式碼準備

  1. 生產者類:NewTask.java
    public class NewTask {
        //佇列名稱
        public static final String QUEUE_NAME = "TASK_QUEUE";
        //佇列是否需要持久化
        public static final boolean DURABLE = false;
        
        //需要傳送的訊息列表
        public static final String[] msgs = {"task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};
        
        
    public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.connection & channel connection = factory.newConnection(); channel
    = connection.createChannel(); // 2.queue channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null); // 3.publish msg for (int i = 0; i < msgs.length; i++) { channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes()); System.out.println("** new task ****:" + msgs[i]); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
  2. 消費者類:Work.java
    public class Work {
    
        public static void main(String[] args) {
            System.out.println("*** Work ***");
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try {
                //1.connection & channel
                final Channel channel = factory.newConnection().createChannel();
                
                //2.queue
                channel.queueDeclare(NewTask.QUEUE_NAME, NewTask.DURABLE, false, false, null);
    
                //3. consumer instance
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        //deal task
                        doWork(msg);
    
                    }
                };
                
                //4.do consumer
                boolean autoAck = true;
                channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        private static void doWork(String msg) {
            try {
                System.out.println("**** deal task begin :" + msg);
                
                //假裝task比較耗時,通過sleep()來模擬需要消耗的時間
                if ("sleep".equals(msg)) {
                    Thread.sleep(1000 * 60);
                } else {
                    Thread.sleep(1000);
                }
    
                System.out.println("**** deal task finish :" + msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
  3. 再來一個消費者類:Work2.java,程式碼同Work.java一模一樣。

三、迴圈分發

我們先啟動Work和Work2,然後啟動NewTask,執行結果如下:

NewTask執行結果:

Work執行結果:

Work2執行結果:

 

我們發現,訊息生產者傳送了6條訊息,消費者work和work2分別分到了3個訊息,而且是迴圈輪流分發到的,這種分發的方式就是迴圈分發。

四、訊息確認

假如我們在傳送的訊息裡面新增“sleep"

//需要傳送的訊息列表
    public static final String[] msgs = {"sleep", "task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};

根據程式碼中的實現,這個sleep要耗時1分鐘,萬一在這1分鐘之內,工作程序崩潰了或者被kill了,會發生什麼情況呢?根據上面的程式碼:

//4.do consumer
            boolean autoAck = true;
            channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);

自動確認為true,每次RabbitMQ向消費者傳送訊息之後,會自動發確認訊息(我工作你放心,不會有問題),這個時候訊息會立即從記憶體中刪除。如果工作者掛了,那將會丟失它正在處理和未處理的所有工作,而且這些工作還不能再交由其他工作者處理,這種丟失屬於客戶端丟失。

我們來驗證下,和剛才的步驟一樣執行程式:

1.NewTask的控制檯列印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯列印結果:
**** deal task begin :sleep

3.Work2的控制檯列印結果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

根據上面的內容,訊息生產者傳送了7條訊息, work2消費了1、3、5 三條,那剩下的sleep、2、4、6 這四條訊息肯定是work來處理,只是sleep耗時一分鐘 ,時間差後面的還沒來得及處理,這個時候我們kill掉work,去看下RabbitMQ 管理頁面,沒有未處理的訊息,訊息隨著work被kill也跟著丟失了。

是不是很可怕?

為了應對這種情況,RabbitMQ支援訊息確認。消費者處理完訊息之後,會發送一個確認訊息告訴RabbitMQ,訊息處理完了,你可以刪掉它了。

程式碼修改(Work.java和Work2.java同步修改):1.將自動確認改為false,2.訊息處理之後再通過channel.basicAck進行訊息確認

 修改完後,執行程式:

1.NewTask的控制檯列印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯列印結果:
**** deal task begin :sleep

3.Work2的控制檯列印結果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

然後kill掉work,去看RabbitMQ管理頁面,會發現有4條未確認:

再去看下work2的控制檯,work2將work未處理完和未來得及處理的訊息都給處理了:

等work2處理完後,你再去看RabbitMQ管理頁面,會發現頁面的訊息數值也都變成0 了。

五、公平分發

按照上面那種迴圈分發的方式,每個消費者會分到相同數量的任務,這樣會有一個問題:假如有一些task非常耗時,之前的任務還沒有完成,後面又來了那麼多工,來不及處理,那咋辦? 有的消費者忙的不可開交,有的消費者卻很快處理完事情然後無所事事浪費資源,那咋整?答案就是:公平分發。 怎麼實現呢?

 發生上述問題的原因就是RabbitMQ收到訊息後就立即分發出去,而沒有確認各個工作者未返回確認的訊息數量。因此我們可以使用basicQos方法,並將引數prefetchCount設為1,告訴RabbitMQ 我每次值處理一條訊息,你要等我處理完了再分給我下一個。這樣RabbitMQ就不會輪流分發了,而是尋找空閒的工作者進行分發。

程式碼修改(work和Work2同步修改):

執行程式碼:

1.NewTask的控制檯列印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯列印結果:
**** deal task begin :sleep
**** deal task finish :sleep

3.Work2的控制檯列印結果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 2
**** deal task finish :task 2
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 4
**** deal task finish :task 4
**** deal task begin :task 5
**** deal task finish :task 5
**** deal task begin :task 6
**** deal task finish :task 6

Work只處理了sleep,Work2處理了1、2、3、4、5、6 這個六條訊息。

六、訊息持久化

上面說到訊息確認的時候,提到了工作者被kill的情況。那如果RabbitMQ被stop掉了呢?我們來看下:

這次只啟動Work和NewTask,不啟動Work2,所有訊息都交給Work來處理,控制檯列印資訊:

1.NewTask的控制檯列印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯列印結果:
**** deal task begin :sleep

在work處理sleep的過程中,我們停掉RabbitMQ服務

然後重新start服務並執行rabbitmq-plugins enable rabbitmq_management命令,然後檢視管理頁面:

你會發現,所有訊息都將被清空了。這種丟失屬於服務端丟失

因此需要將訊息進行持久化來應對這種情況。

持久化需要做兩件事情:

  1. 佇列持久化,在宣告佇列的時候,將第二個引數設為true

          

     另外,由於RabbitMQ不允許重新定義已經存在的佇列,否則就會報錯(上一篇部落格中已經提到過了),因此我們將這次的佇列名改下:

  2. 訊息持久化,在傳送訊息的時候,將第三個引數設為2

然後執行程式碼,在work處理sleep的時候將服務停掉,並重新啟動且執行rabbitmq-plugins enable rabbitmq_management命令,然後檢視管理頁面:

 

一共7條訊息,未確認的1條(sleep)和ready的6條(1、2、3、4、5、6)。訊息被儲存了下來。

 重新啟動Work,所有訊息被消費:

相關推薦

RabbitMQ入門工作佇列(Work Queue)

假設有這一些比較耗時的任務,按照上一次的那種方式,我們要一直等前面的耗時任務完成了之後才能接著處理後面耗時的任務,那要等多久才能處理完?別擔心,我們今天的主角--工作佇列就可以解決該問題。我們將圍繞下面這個索引展開: 什麼是工作佇列 程式碼準備 迴圈分發 訊息確認 公平分發 訊息持久化

Linux中的工作佇列(work queue)

工作佇列(work queue)是Linux kernel中將工作推後執行的一種機制。這種機制和BH(bottom half)或Tasklets不同之處在於工作佇列是把推後的工作交由一個核心執行緒去執行,因此工作佇列的優勢就在於它允許重新排程甚至睡眠。 linux 2.6.

中斷下半部_工作佇列(work queue)

1>work_queue:<linux/workqueue.h> __3.0.4 2>description: 中斷下半部,在核心執行緒的程序上下文中執行推後的工作. 它是唯一能在程序上下文執行的中斷下半部實現機制,也只有它才可以睡眠. 3>建

RabbitMQ指南之二工作佇列Work Queues)

在上一章的指南中,我們寫了一個命名佇列:生產者往該命名佇列傳送訊息、消費從從該命名佇列中消費訊息。在本章中,我們將建立一個工作佇列

RabbitMQ3.7.2入門到進階之工作佇列Work queues公平分發fair dispatch

大綱 1.訊息中介軟體概述,使用場景(日誌處理,非同步,系統解耦,流量削鋒) 2.Rabbitmq3.7.2安裝,控制檯簡介,管理員新增 3.使用者vhost配置以及介紹 4.java操作簡單佇列,生產者傳送訊息到佇列,消費者接收訊息 5.

rabbitmq分散式工作佇列work queues

開發十年,就只剩下這套架構體系了! >>>   

RabbitMQ入門主題路由器(Topic Exchange)

AI orange topic 都是 erro col host nfa 匹配 上一篇博文中,我們使用direct exchange 代替了fanout exchange,這次我們來看下topic exchange。 一、Topic Exchange介紹 topic e

工作佇列work queues 公平分發(fair dispatch) And 訊息應答與訊息持久化

生產者 1 package cn.wh.work; 2 3 import cn.wh.util.RabbitMqConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Co

Python 訊息佇列rabbitmq使用之工作佇列使用多個worker接收訊息

前面已經介紹過怎麼安裝rabbitmq以及要使用的三方庫 因此這裡直接進入例項 1、釋出端程式碼 # new_task.py import pika # 匯入pika import sys

RabbitMQ入門與訊息佇列模式詳解

1.RabbitMQ概述 簡介  MQ全稱為Message Queue,訊息佇列是應用程式和應用程式之間的通訊方法; RabbitMQ是開源的,實現了AMQP協議的,採用Erlang(面向併發程式語言)編寫的,可複用的企業級訊息系統; AMQP(高階訊息佇列協議)是一個非

RabbitMQ】3.工作佇列及釋出訂閱

一、工作佇列 (一個任務只發給一個消費者,根據設定,若消費者異常,才可轉發給另一個消費者) 當有的消費者(Consumer)需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance(平衡)每個Consumer(生產者)的load,即負載均衡。通過建立一個工作佇列用來

RabbitMQ入門釋出/訂閱(Publish/Subscribe)

在前面的兩篇部落格中 遇到的例項都是一個訊息只發送給一個消費者(工作者),他們的訊息模型分別為(P代表生產者,C代表消費者,紅色代表隊列): 這次我們來看下將一個訊息傳送給多個消費者(工作者),這種模式一般被稱為“釋出/訂閱”模式。其工作模型為(P代表生產者,X代表Exchange(路由器/交

RabbitMQ入門Hello RabbitMQ 程式碼例項

在之前的一篇部落格RabbitMQ入門:認識並安裝RabbitMQ(以Windows系統為例)中,我們安裝了RabbitMQ並且對其也有的初步的認識,今天就來寫個入門小例子來加深概念理解並瞭解程式碼怎麼實現。 本篇部落格圍繞下面幾個方面展開: 程式碼前的理論熱身 程式碼例項:Hello Rabbit

RabbitMQ入門路由(Routing)

在上一篇部落格《RabbitMQ入門:釋出/訂閱(Publish/Subscribe)》中,我們認識了fanout型別的exchange,它是一種通過廣播方式傳送訊息的路由器,所有和exchange建立的繫結關係的佇列都會接收到訊息。但是有一些場景只需要訂閱到一部分訊息,這個時候就不能使用fanout 型別的

RabbitMQ入門認識並安裝RabbitMQ(以Windows系統為例)

專案需求剛剛遞交,新需求還沒來。閒下來了,寫寫部落格放鬆下。 ===========華麗的分割線================= 最近在學習Spring Cloud,其中訊息匯流排Spring Cloud Bus是必不可少的,但是Spring Cloud Bus目前只支援RabbitMQ和kafka,因

RabbitMQ入門總結

隨著上一篇博文的釋出,RabbitMQ的基礎內容我也學習完了,RabbitMQ入門系列的部落格跟著收官了,以後有機會的話再寫一些在實戰中的應用分享,多謝大家一直以來的支援和認可。 RabbitMQ入門系列一共有8篇隨筆: (adsbygoogle = window.ad

RabbitMQ入門在Spring Boot 應用中整合RabbitMQ

在上一篇隨筆中我們認識並安裝了RabbitMQ,接下來我們來看下怎麼在Spring Boot 應用中整合RabbitMQ。 先給出最終目錄結構: 搭建步驟如下: 新建maven工程amqp 修改pom檔案,引入spring-boot-starter-amqp和spring-boot-sta

RabbitMQ入門遠端過程呼叫(RPC)

假如我們想要呼叫遠端的一個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(Remote Procedure Call)。怎麼辦? 今天我們就用RabbitMQ來實現一個簡單的RPC系統:客戶端傳送一個請求訊息,服務端以一個響應訊息迴應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送一個回撥佇列用來

棧和佇列2.佇列Queue)及其C語言實現

佇列是線性表的一種,在操作資料元素時,和棧一樣,有自己的規則:使用佇列存取資料元素時,資料元素只能從表的一端進入佇列,另一端出佇列,如圖1。 圖1 佇列示意圖 稱進入佇列的一端為“隊尾”;出佇列的一端為“隊頭”。資料元素全部由隊尾陸續進佇列,由隊頭陸續出佇列。 佇

linux work queue及併發可管理工作佇列

慢工作機制 為什麼說是“提供過核心中還曾短暫出現過慢工作機制 (slow work mechanism)”,原因是在 mainline核心中,曾經出現過慢工作機制 (slow work mechanism),但隨著併發管理工作佇列 (cmwq) 的出現,它已經全部被