1. 程式人生 > >RabbitMQ:入門(五)——程式碼編寫

RabbitMQ:入門(五)——程式碼編寫

一:入門

1.RabbitMQ介紹

1.1、RabbitMQ簡介

RabbitMQ是一個訊息代理:它接受和轉發訊息。你可以把它想象成一個郵局:當你把你想要釋出的郵件放在郵箱中時,你可以確定郵差先生最終將郵件傳送給你的收件人。在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。 
RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受,儲存和轉發二進位制資料塊 - 訊息。引自(https://www.rabbitmq.com/tutorials/tutorial-one-java.html)官網介紹。 
儘管訊息流經RabbitMQ,但它們只能儲存在佇列中。一個佇列只受主機記憶體和磁碟限制的約束,它本質上是一個很大的訊息緩衝區。許多生產者可以傳送進入一個佇列的訊息,並且許多消費者可以嘗試從一個佇列接收資料。實質上是生產者——消費者關係。

1.2、什麼叫訊息佇列

訊息(Message)是指在應用間傳送的資料。訊息可以非常簡單,比如只包含文字字串,也可以更復雜,可能包含嵌入物件。 
訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息釋出者只管把訊息釋出到 MQ 中而不用管誰來取,訊息使用者只管從 MQ 中取訊息而不管是誰釋出的。這樣釋出者和使用者都不用知道對方的存在。 
(注:該段引用來源:https://www.jianshu.com/p/79ca08116d57

1.3、為何用訊息佇列

從上面的描述中可以看出訊息佇列是一種應用間的非同步協作機制,那什麼時候需要使用 MQ 呢? 
以常見的訂單系統為例,使用者點選【下單】按鈕之後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發簡訊通知。在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的效能,這時可以將一些不需要立即生效的操作拆分出來非同步執行,比如發放紅包、發簡訊通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之後傳送一條訊息到 MQ 讓主流程快速完結,而由另外的單獨執行緒拉取MQ的訊息(或者由 MQ 推送訊息),當發現 MQ 中有發紅包或發簡訊之類的訊息時,執行相應的業務邏輯。 
以上是用於業務解耦的情況,其它常見場景包括最終一致性、廣播、錯峰流控等等。

1.4、RabbitMQ 特點

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

AMQP :Advanced Message Queue,高階訊息佇列協議。它是應用層協議的一個開放標準,為面向訊息的中介軟體設計,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受產品、開發語言等條件的限制。

RabbitMQ 最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。具體特點包括:

(1)可靠性(Reliability) 
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。

(2)靈活的路由(Flexible Routing) 
在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ 已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 繫結在一起,也通過外掛機制實現自己的 Exchange 。

(3)訊息叢集(Clustering) 
多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。

(4)高可用(Highly Available Queues) 
佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。

(5)多種協議(Multi-protocol) 
RabbitMQ 支援多種訊息佇列協議,比如 STOMP、MQTT 等等。

(6)多語言客戶端(Many Clients) 
RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。

(7)管理介面(Management UI) 
RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息 Broker 的許多方面。

(8)跟蹤機制(Tracing) 
如果訊息異常,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什麼。

(9)外掛機制(Plugin System) 
RabbitMQ 提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛。

(注:該段引用來源:https://www.jianshu.com/p/79ca08116d57

1.5、RabbitMQ 中的概念模型——訊息模型

所有 MQ 產品從模型抽象上來說都是一樣的過程: 
消費者(consumer)訂閱某個佇列。生產者(producer)建立訊息,然後釋出到佇列(queue)中,最後將訊息傳送到監聽的消費者。 
這裡寫圖片描述

1.6、RabbitMQ 基本概念

上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念: 
這裡寫圖片描述

(1)Message 
訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。 
(2)Publisher 
訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。 
(3)Exchange 
交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。 
(4)Binding 
繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。 
(5)Queue 
訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。 
(6)Connection 
網路連線,比如一個TCP連線。 
(7)Channel 
通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內地虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。 
(8)Consumer 
訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。 
(9)Virtual Host 
虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。 
(10)Broker 
表示訊息佇列伺服器實體。

(注:該段引用來源:https://www.jianshu.com/p/79ca08116d57


2.安裝Erlang

首先,您需要安裝支援的 Windows 版Erlang。下載並執行Erlang for Windows 安裝程式。下載地址http://www.erlang.org/downloads,我是64位的所以下載的64位版本。官網下載速度很慢,可以通過我雲盤下載:https://pan.baidu.com/s/1eTkk5BO 密碼:wo1b,下載完成後直接安裝,一直NEXT就行。

這裡寫圖片描述

3.安裝RabbitMQ

執行RabbitMQ安裝程式rabbitmq-server-3.7.3.exe(下載地址 http://www.rabbitmq.com/install-windows.html )注意版本,當前最新版本為3.7.3。它將RabbitMQ安裝為Windows服務並使用預設配置啟動它。同樣,一直NEXT就行。 
這裡寫圖片描述

4.配置

4.1、自定義huanjingbian環境變數

該服務將使用其預設設定正常執行。你可以自定義RabbitMQ環境或編輯配置。 
(1)erl環境變數配置

ERLANG_HOME=C:\Program Files\erl9.2
  • 1

這裡寫圖片描述
在Path中加入

 %ERLANG_HOME%\bin;

這裡寫圖片描述
測試erl配置是否正確,開始-執行-cmd,輸入erl,顯示如下,證明配置正確 
這裡寫圖片描述 
(2)RabbitMQ環境變數配置 
這裡注意,看好你RabbitMQ的安裝位置,以及安裝的版本,我的版本為3.7.3

RABBITMQ_SERVER=C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3

這裡寫圖片描述 
在Path中加入

%RABBITMQ_SERVER%\sbin;

這裡寫圖片描述


4.2、啟用 RabbitMQ’s Management Plugin

在CMD中鍵入如下命令

"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3\sbin\rabbitmq-plugins.bat" enable rabbitmq_management

顯示如下: 
這裡寫圖片描述

4.3、啟動RabbitMQ服務

直接在命令列介面鍵入如下命令

net start RabbitMQ

這裡寫圖片描述 
因為RabbitMQ預設啟動的,當鍵入啟動命令時,會出現如下情況,直接關閉RabbitMQ服務,在啟動就行, 
這裡寫圖片描述 
關閉RabbitMQ服務命令如下:

net stop RabbitMQ

這裡寫圖片描述

賬號密碼:guest

4.4、測試

測試地址 http://localhost:15672/ 
預設的使用者名稱:guest 
預設的密碼為:guest

這裡寫圖片描述



5.下載maven

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.springboot.rabbitmq</groupId>
  <artifactId>springboot-rabbitmq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>springboot-rabbitmq</name>
  <description>springboot-rabbitmq</description>
  
  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.1.RELEASE</version>
  </parent>
  <dependencies>
     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <dependency>  
            <groupId>org.springframework.boot</groupId>  
            <artifactId>spring-boot-starter-test</artifactId>  
            <scope>test</scope>  
     </dependency>  
     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  </dependencies>
</project>

6.建立傳送者(訊息生產者)

操作步驟:
  1. 建立連線工廠ConnectionFactory
  2. 獲取連線Connection
  3. 通過連接獲取通訊通道Channel
  4. 傳送訊息
public class Send {

    //佇列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException {
        /**
         * 建立連線連線到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //設定MabbitMQ所在主機ip或者主機名
        factory.setHost("localhost");
        //建立一個連線
        Connection connection = factory.newConnection();
        //建立一個頻道
        Channel channel = connection.createChannel();
        //指定一個佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //傳送的訊息
        String message = "hello world!";
        //往佇列中發出一條訊息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //關閉頻道和連線
        channel.close();
        connection.close();
    }
}

列印

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
 [x] Sent 'hello world!'

Process finished with exit code 0

7.建立接受者(訊息消費者)

操作步驟:
  1. 建立連線工廠ConnectionFactory
  2. 獲取連線Connection
  3. 通過連接獲取通訊通道Channel
  4. 宣告交換機Exchange:交換機型別分為四類

        FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念

            HeadersExchange :通過新增屬性key-value匹配

            DirectExchange:按照routingkey分發到指定佇列

            TopicExchange:多關鍵字匹配

  5. 宣告佇列Queue

  6. 將佇列和交換機繫結

  7. 建立消費者

  8. 執行訊息的消費

public class Rec {
    //佇列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {
        //開啟連線和建立頻道,與傳送端一樣
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //建立佇列消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消費佇列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true)
        {
            //nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }

    }
}

列印

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hello world!'

二:工作佇列

1.傳送訊息

public class NewTask
{
    //佇列名稱
    private final static String QUEUE_NAME = "workqueue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //建立連工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定ip
        factory.setHost("localhost");
        //建立連線
        Connection connection = factory.newConnection();
        //建立佇列
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //傳送10條訊息,依次在訊息後面附加1-10個點
        for (int i = 0; i < 10; i++)
        {
            String dots = "";
            for (int j = 0; j <= i; j++)
            {
                dots += ".";
            }
            //拼資料
            String message = "helloworld" + dots+dots.length();
            //推送到rabbitmq中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            //推送完成,列印結束語句
            System.out.println(" [x] Sent '" + message + "'");
        }
        //關閉佇列
        channel.close();
        //關閉訊息
        connection.close();

    }


}
 [x] Sent 'helloworld.1'
 [x] Sent 'helloworld..2'
 [x] Sent 'helloworld...3'
 [x] Sent 'helloworld....4'
 [x] Sent 'helloworld.....5'
 [x] Sent 'helloworld......6'
 [x] Sent 'helloworld.......7'
 [x] Sent 'helloworld........8'
 [x] Sent 'helloworld.........9'
 [x] Sent 'helloworld..........10'

2.接收訊息

執行兩個Work類

public class Work
{
    //佇列名稱
    private final static String QUEUE_NAME = "workqueue";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {

        //區分不同工作程序的輸出
        int hashCode = Work.class.hashCode();

        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定ip
        factory.setHost("localhost");
        //建立連線
        Connection connection = factory.newConnection();
        //建立佇列
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(hashCode
                + " [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
       // 指定消費佇列
        //關閉應答機制,會丟失訊息
        channel.basicConsume(QUEUE_NAME, true, consumer);
        //開啟應答機制,不會丟失訊息
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(hashCode + " [x] Received '" + message + "'");
//            doWork(message);
            System.out.println(hashCode + " [x] Done");

        }

    }

    /**
     * 每個點耗時1s
     * @param task
     * @throws InterruptedException
     */
    private static void doWork(String task) throws InterruptedException
    {
        for (char ch : task.toCharArray())
        {
            if (ch == '.')
                Thread.sleep(1000);
        }
    }
}
746292446 [x] Received 'helloworld.1'
746292446 [x] Done
746292446 [x] Received 'helloworld...3'
746292446 [x] Done
746292446 [x] Received 'helloworld.....5'
746292446 [x] Done
746292446 [x] Received 'helloworld.......7'
746292446 [x] Done
746292446 [x] Received 'helloworld.........9'
746292446 [x] Done
242131142 [x] Received 'helloworld..2'
242131142 [x] Done
242131142 [x] Received 'helloworld....4'
242131142 [x] Done
242131142 [x] Received 'helloworld......6'
242131142 [x] Done
242131142 [x] Received 'helloworld........8'
242131142 [x] Done
242131142 [x] Received 'helloworld..........10'
242131142 [x] Done

可以看到,預設的,RabbitMQ會一個一個的傳送資訊給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,並非一個一個分配。平均的每個消費者將會獲得相等數量的訊息。這樣分發訊息的方式叫做round-robin。

3.訊息應答(message acknowledgments)

我們首先開啟兩個任務,然後執行傳送任務的程式碼(NewTask.java),然後立即關閉第二個任務,兩個加起來打印出來的資料會有缺失

一旦RabbItMQ交付了一個資訊給消費者,會馬上從記憶體中移除這個資訊。在這種情況下,如果殺死正在執行任務的某個工作者,我們會丟失它正在處理的資訊。我們也會丟失已經轉發給這個工作者且它還未執行的訊息。

為了保證訊息永遠不會丟失,RabbitMQ支援訊息應答(message acknowledgments)。

  • 消費者傳送應答給RabbitMQ,告訴它資訊已經被接收和處理,然後RabbitMQ可以自由的進行資訊刪除。
  • 如果消費者被殺死而沒有傳送應答,RabbitMQ會認為該資訊沒有被完全的處理,然後將會重新轉發給別的消費者。通過這種方式,你可以確認資訊不會被丟失,即使消者偶爾被殺死。
  • 這種機制並沒有超時時間這麼一說,RabbitMQ只有在消費者連線斷開是重新轉發此資訊。如果消費者處理一個資訊需要耗費特別特別長的時間是允許的。

訊息應答預設是開啟的。上面的程式碼中我們通過顯示的設定autoAsk=true關閉了這種機制。

boolean ack = false ; //開啟應答機制  
channel.basicConsume(QUEUE_NAME, ack, consumer);  
//另外需要在每次處理完成一個訊息後,手動傳送一次應答。  
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
public class Work  
{  
    //佇列名稱  
    private final static String QUEUE_NAME = "workqueue";  

    public static void main(String[] argv) throws java.io.IOException,  
            java.lang.InterruptedException  
    {  
        //區分不同工作程序的輸出  
        int hashCode = Work.class.hashCode();  
        //建立連線和頻道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //宣告佇列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        System.out.println(hashCode  
                + " [*] Waiting for messages. To exit press CTRL+C");  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消費佇列  
        boolean ack = false ; //開啟應答機制  
        channel.basicConsume(QUEUE_NAME, ack, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  

            System.out.println(hashCode + " [x] Received '" + message + "'");  
            doWork(message);  
            System.out.println(hashCode + " [x] Done");  
            //傳送應答  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

        }  

    }  
} 

4.訊息持久化(Message durability)

我們已經學習了即使消費者被殺死,訊息也不會被丟失。但是如果此時RabbitMQ服務被停止,我們的訊息仍然會丟失

當RabbitMQ退出或者異常退出,將會丟失所有的佇列和資訊,除非你告訴它不要丟失。

我們需要做兩件事來確保資訊不會被丟失:我們需要給所有的佇列訊息設定持久化的標誌。

  • 第一, 我們需要確認RabbitMQ永遠不會丟失我們的佇列。為了這樣,我們需要宣告它為持久化的。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

注:RabbitMQ不允許使用不同的引數重新定義一個佇列,所以已經存在的佇列,我們無法修改其屬性。

  • 第二, 我們需要標識我們的資訊為持久化的。通過設定MessageProperties(implements BasicProperties)值為PERSISTENT_TEXT_PLAIN。
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

現在你可以執行一個傳送訊息的程式,然後關閉服務,再重新啟動服務,執行消費者程式做下實驗。

5.公平轉發(Fair dispatch)

對於兩個消費者,有一系列的任務,奇數任務特別耗時,而偶數任務卻很輕鬆,這樣造成一個消費者一直繁忙,另一個消費者卻很快執行完任務後等待。 
造成這樣的原因是因為RabbitMQ僅僅是當訊息到達佇列進行轉發訊息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。

int prefetchCount = 1;  
channel.basicQos(prefetchCount);  
public class NewTask  
{  
    // 佇列名稱  
    private final static String QUEUE_NAME = "workqueue_persistence";  

    public static void main(String[] args) throws IOException  
    {  
        // 建立連線和頻道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 宣告佇列  
        boolean durable = true;// 1、設定佇列持久化  
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);  
        // 傳送10條訊息,依次在訊息後面附加1-10個點  
        for (int i = 5; i > 0; i--)  
        {  
            String dots = "";  
            for (int j = 0; j <= i; j++)  
            {  
                dots += ".";  
            }  
            String message = "helloworld" + dots + dots.length();  
            // MessageProperties 2、設定訊息持久化  
            channel.basicPublish("", QUEUE_NAME,  
                    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
        }  
        // 關閉頻道和資源  
        channel.close();  
        connection.close();  

    }  

} 
public class Work  
{  
    // 佇列名稱  
    private final static String QUEUE_NAME = "workqueue_persistence";  

    public static void main(String[] argv) throws java.io.IOException,  
            java.lang.InterruptedException  
    {  
        // 區分不同工作程序的輸出  
        int hashCode = Work.class.hashCode();  
        // 建立連線和頻道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 宣告佇列  
        boolean durable = true;  
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);  
        System.out.println(hashCode  
                + " [*] Waiting for messages. To exit press CTRL+C");  
        //設定最大服務轉發訊息數量  
        int prefetchCount = 1;  
        channel.basicQos(prefetchCount);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消費佇列  
        boolean ack = false; // 開啟應答機制  
        channel.basicConsume(QUEUE_NAME, ack, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  

            System.out.println(hashCode + " [x] Received '" + message + "'");  
            doWork(message);  
            System.out.println(hashCode + " [x] Done");  
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

        }  

    }  

    /** 
     * 每個點耗時1s 
     *  
     * @param task 
     * @throws InterruptedException 
     */  
    private static void doWork(String task) throws InterruptedException  
    {  
        for (char ch : task.toCharArray())  
        {  
            if (ch == '.')  
                Thread.sleep(1000);  
        }  
    }  
}  

三:釋出/訂閱

工作佇列中的一個任務只會發給一個工作者

就是把一個訊息發給多個消費者,這種模式稱之為釋出/訂閱(類似觀察者模式)。

為了驗證這種模式,我們準備構建一個簡單的日誌系統。這個系統包含兩類程式,

一類程式發動日誌,另一類程式接收和處理日誌。

我們實現,一個接收者將接收到的資料寫到硬碟上,與此同時,另一個接收者把接收到的訊息展現在螢幕上。

1:轉發器(Exchanges)

RabbitMQ訊息模型的核心理念是生產者永遠不會直接傳送任何訊息給佇列,一般的情況生產者甚至不知道訊息應該傳送到哪些佇列。

相反的,生產者只能傳送訊息給轉發器(Exchange)。轉發器是非常簡單的,一邊接收從生產者發來的訊息,另一邊把訊息推送到佇列中。轉發器必須清楚的知道訊息如何處理它收到的每一條訊息。是否應該追加到一個指定的佇列?是否應該追加到多個佇列?或者是否應該丟棄?這些規則通過轉發器的型別進行定義。

可用的轉發器型別:

  • Direct
  • Topic
  • Headers
  • Fanout

宣告轉發器型別的