1. 程式人生 > >RabbitMQ學習總結(5)——釋出和訂閱例項詳解

RabbitMQ學習總結(5)——釋出和訂閱例項詳解

一、Publish/Subscribe(釋出/訂閱)(using the Java Client)

在前面的教程中,我們建立了一個work Queue(工作佇列)。工作佇列背後的假設是每個任務是交付給一個工作者(worker) 也就是均勻分給每個消費者。在本部分,我們將做一些完全不同的事情,我們將提供一個訊息到多個消費者。這種模式被稱為“釋出/訂閱”。
	為了說明這個模式,我們將構建一個簡單的日誌系統。它將包括兩個專案:
  1. 第一個將發出日誌訊息
  2. 第二個將接收並列印它們。
	在我們的日誌系統,每執行一次,接收器專案將得到訊息的副本。這樣我們能夠執行一個接收機並且可以直接記錄到磁碟,同時我們可以執行另一個接收器,看到螢幕上的日誌。
注:從本質上講,發表日誌訊息廣播給所有的接收者。
	下面讓我們腦中帶幾個問題,讓我們一步一步去解決:
  • 如果我把訊息分配給所有的消費者,我們將怎麼做呢?

二、Exchanges(交換機)

在前部分的教程中,我們從一個佇列傳送和接收訊息。現在是時候讓Rabbit推出完整的訊息模型。 讓我們快速複習我們前面的教程::
  • 生產者是一個使用者傳送訊息的應用程式。
  • 一個佇列是儲存訊息的緩衝區。
  • 消費者是一個使用者應用程式接收訊息。
RabbitMQ的訊息模型的核心思想是,生產者從未直接向佇列傳送任何訊息。實際上,經常生產者甚至不知道訊息是否會被運送到任何佇列。
相反,生產者只能傳送Exchanges(訊息交換區)
。交換是一個非常簡單的事情。一方面它從生產者那收到訊息並推他們到另一邊佇列。交換區必須知道如何處理它收到一條訊息:
  1. 它應該被加到一個特定的佇列嗎?
  2. 它應該被加到多佇列?
  3. 或者它應該丟棄嗎?
交換的規則定義的型別。 如上圖所示:X表示Exchange(交換機); 有一些可用的交換型別directtopicheaders and fanout。我們將專注於最後一個——fanout。讓我們建立一個這種型別的交換,稱之為日誌:
channel.exchangeDeclare("logs", "fanout");
fanout交換非常簡單。你大概可以猜到的名字,只是廣播所有的訊息接收佇列它知道。而這正是我們需要為我們的記錄器。 問題:
① exchange list 列出所有 (交換機)列表
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.
在此列表中有一些amq* 交換器 與預設(匿名)交換。這些都是預設建立的,但可能你不需要使用它們。 ② 預設名字的 exchange(交換機)
在前部分的教程中我們對exchange 一無所知,,但仍然能夠將訊息傳送到佇列。這是可能的,因為我們是使用一個預設的交換,我們確定的空字串(" ") 記得之前我們釋出一個訊息:
    channel.basicPublish("", "hello", null, message.getBytes());
第一個引數是該交換區的名稱;空字串表示預設或無名的交換,:如果routingKey存在的話,訊息路由到指定的佇列的名稱。 現在,我們可以釋出我們的交換器:
    channel.basicPublish( "logs", "", null, message.getBytes());

三、Temporary queues(臨時佇列)

你可能記得以前我們使用的佇列都是指定名稱的(還記得hello和task_queue嗎?)。對我們來說命名一個佇列是至關重要的,
當你想在生產者和消費者中分享佇列的時候,給一個佇列的名稱是必須的。	
    但是那些都不是日誌記錄系統所需要的,我們希望能夠獲得所有的日誌資訊,而不只是其中的一部分,而且我們只對當前正在傳遞的資訊感興趣,
    對舊的日誌資訊不感興趣,要解決這些問題,我們需要分兩個步驟:
  • 首先當我們連結到RabbitMQ伺服器的時候,需要一個新的、空的佇列,為了做到這點,可以建立一個隨機名的佇列,
或者更好的方法就是讓伺服器選擇一個隨機的佇列名。
  • 其次,當斷開與佇列的連線時,消費者應該被自動刪除掉
在Java客戶端,我們通過一個無引數的queueDeclare()方法為我們建立一個非持久的、唯一的、能自動刪除的佇列與佇列名稱
 String queueName = channel.queueDeclare().getQueue();
在這點上,queueName包含了一個隨機佇列名稱。例如它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

四、Bindings(繫結)

我們已經建立了一個fanout exchange和一個佇列,現在我們需要告訴exchange去傳送訊息到佇列中,exchange和佇列之間的關係被稱為一個繫結(binding)
	channel.queueBind(queueName, "logs", "");
注意:從現在開始我們從logs exchange將被新增訊息到佇列中,使用rabbitmqctl list_bingdins能列出所有的繫結。

五、Putting it all together(釋出者/訂閱者 實現)

生產者程式碼和之前的傳送訊息的程式碼並沒有太大的區別,最重要的變化是,我們現在要將釋出的訊息傳遞給logs exchange來代替無名的exchange(之前的是"") 在傳送訊息時需要提供一個routingKey,它對於fanout exchange是非常重要的,不能被忽視的,這裡的EmitLog.java程式碼如下
[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. </pre><pre name="code"class="java">import java.io.IOException;  
  2. import com.rabbitmq.client.ConnectionFactory;  
  3. import com.rabbitmq.client.Connection;  
  4. import com.rabbitmq.client.Channel;  
  5. publicclass EmitLog {  
  6.     privatestaticfinal String EXCHANGE_NAME = "logs";  
  7.     publicstaticvoid main(String[] argv)  
  8.                   throws java.io.IOException {  
  9.         ConnectionFactory factory = new ConnectionFactory();  
  10.         factory.setHost("localhost");  
  11.         Connection connection = factory.newConnection();  
  12.         Channel channel = connection.createChannel();  
  13.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  14.         String message = getMessage(argv);  
  15.         channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes());  
  16.         System.out.println(" [x] Sent '" + message + "'");  
  17.         channel.close();  
  18.         connection.close();  
  19.     }  
  20.     //...
  21. }  

接收端: [java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. import java.io.IOException;  
  2. import com.rabbitmq.client.ConnectionFactory;  
  3. import com.rabbitmq.client.Connection;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.QueueingConsumer;  
  6. publicclass ReceiveLogs {  
  7.     privatestaticfinal String EXCHANGE_NAME = "logs";  
  8.     publicstaticvoid main(String[] argv)  
  9.                   throws java.io.IOException,  
  10.                   java.lang.InterruptedException {  
  11.         ConnectionFactory factory = new ConnectionFactory();  
  12.         factory.setHost("localhost");  
  13.         Connection connection = factory.newConnection();  
  14.         Channel channel = connection.createChannel();  
  15.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  16.         String queueName = channel.queueDeclare().getQueue();  
  17.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
  18.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  19.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  20.         channel.basicConsume(queueName, true, consumer);  
  21.         while (true) {  
  22.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  23.             String message = new String(delivery.getBody());  
  24.             System.out.println(" [x] Received '" + message + "'");  
  25.         }  
  26.     }  
  27. }  


像以前一樣,我們開始做編譯
$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java
如果你想將日誌儲存到一個檔案,開啟一個控制檯並執行
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
如果你想看到日誌在你的螢幕上,產生一個新的終端並執行:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs
釋出日誌型別:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
使用rabbitmqctl list_bindings實際上您可以驗證繫結和佇列的程式碼是否是我們想要的? 有兩個ReceiveLogs。
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.