1. 程式人生 > >【c#】RabbitMQ學習文檔(三)Publish/Subscribe(發布/訂閱)

【c#】RabbitMQ學習文檔(三)Publish/Subscribe(發布/訂閱)

信息 factory ges 客戶端 直接 運行 com mit 在屏幕上

原文:【c#】RabbitMQ學習文檔(三)Publish/Subscribe(發布/訂閱)

(本教程是使用Net客戶端,也就是針對微軟技術平臺的)

在前一個教程中,我們創建了一個工作隊列。工作隊列背後的假設是每個任務會被交付給一個【工人】。在這一部分我們將做一些完全不同的事情--我們將向多個【消費者】傳遞信息。這種模式被稱為“發布/訂閱”。

為了說明這種模式,我們將構建一個簡單的日誌系統。它將包括兩個程序,第一個將發出日誌消息,第二個將接收並打印它們。

在我們的日誌系統中每個接收程序的運行副本都會得到消息。這樣我們就可以運行一個接收者程序,將日誌記錄到磁盤;同時我們可以運行另一個接收者程序,並在屏幕上看到打印出來的日誌。

從本質上講,已發布的日誌消息將被廣播到所有的接收者程序。

1、消息交換機【Exchange】



在教程的前面部分,我們從隊列中發送和接收消息。在RabbitMQ中,現在是時候引入全消息模型。

讓我們快速看看我們以前的教程講了什麽:

【生產者】:就是一個用於發送消息的用戶程序

【消費者】:就是一個用於接收和使用消息的用戶程序

【隊列】:是一個暫存消息的緩存區

RabbitMQ消息傳遞模型的核心思想是,【生產者】不直接發送任何信息到隊列。事實上,【生產者】根本就不知道消息是否會被傳送到任何隊列。

相反,【生產者】只能發送消息到【消息交換機】。交換是件很簡單的事。一方面它接收來自【生產者】的消息,另一方面是將接收到消息推送到隊列中。【消息交換機】必須知道它如何處理接收消息的確切方法。是否應該發送到特定隊列?它應該被發送到多個隊列呢?或者它應該被丟棄。該規則由【消息交換機】的類型來定義。
技術分享圖片

這裏有一些可用的【消息交換機】的類型:【Direct】直接,【Topic】主題,【Headers】標題和【Fanout】扇出。我們將集中關註最後一個-【Fanout】扇出。讓我們創建一個這種類型的【消息交換機】,並給它命名為Logs:

channel.ExchangeDeclare("logs", "fanout");


【Fanout】類型的【消息交換機】非常簡單。正如你從名字可能猜出的,它只是傳播它收到的所有消息去它知道所有的隊列中。這正是我們需要我們的日誌記錄器。

顯示【消息交換機】的列表:

使用Rabbitmqctl列出在服務器上可以運行的最有用的【消息交換機】

 sudo rabbitmqctl list_exchanges   

在這個列表中會有一些amq.*【消息交換機】和默認(未命名)消息交換機。這些都是默認創建的,但現在不太可能需要使用它們。

默認的消息交換機

在教程前面的部分我們隊【消息交換機】是一無所知,但是我依然可以發送消息去想去的隊列,那是因為我們使用了默認的【消息交換機】,這些默認的消息 交換機我用使用兩個雙引號“”來標識。

我們回憶一下以前是如何發送消息的:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

第一個參數是【消息交換機】的名稱。空字符串表示默認或未命名的消息交換機:消息會被路由到指定的routingkey名稱的隊列,如果它存在的話。

現在,我們可以發布到我們命名的【消息交換機】:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "",  basicProperties: null, body: body);


2、臨時隊列

也許你還記得以前我們使用的隊列所指定的名稱(記得Hello和task_queue嗎?)對我們來說,能夠給一個隊列指定名稱是至關重要的--因為我們需要把【Worker】指向同一個隊列。如果要在【生產者】和【消費者】之間共享隊列,給隊列命名是很重要的。

但這不是我們的日誌記錄器的情況。我們想聽到所有的日誌消息,而不僅僅是其中的一個子集。我們也只對當前剛剛收到的消息感興趣,而不是對舊的。為了解決上述問題,我們需要做兩件事。

首先,無論何時當我們連接到Rabbit的時候,我們都需要一個新的並且是空的隊列。要做到這一點,我們可以創建一個具有隨機名稱的隊列,或者,甚至更好一點-讓服務器為我們選擇一個隨機隊列名稱。

其次,一旦我們斷開與【消費者】的隊列就應該自動刪除該隊列。

在.NET客戶端中,當我們沒有為queueDeclare()提供參數時,我們創建了一個具有生成名稱的非持久,排他,自動刪除隊列:

var queueName = channel.QueueDeclare().QueueName;

在這點上,QueueName包含隨機隊列名稱。例如,它可能看起來像amq.gen-jzty20brgko-hjmujj0wlg。

3、綁定【Binding】

技術分享圖片

我們已經創建了一個【Fanout】類型的【消息交換機】和隊列。現在我們需要告訴【消息交換機】向我們的隊列發送消息。【消息交換機】和【隊列】之間的關系稱為綁定。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");


從現在開始,日誌的【消息交換機】就可以將消息推送到我們定義的隊列中去了。

我們可以通過以下語句查看【binding】列表數據:

rabbitmqctl list_bindings

4、把所有的代碼整合到一起

【生產者】的程序,它發出的日誌消息,看起來並沒有和以前的教程有很大的不同。最重要的變化是,我們現在想發送的消息是到達我們指定名稱的日誌【消息交換機】,而不是無名的。我們在發送消息的時候需要提供一個routingkey表示的名稱,但【Fanout】類型的【消息交換機】會容忽視該routingKey的值的。這裏有EmitLog.cs文件代碼:

技術分享圖片
 1 using System;
 2 using RabbitMQ.Client;
 3 using System.Text;
 4 
 5 class EmitLog
 6 {
 7     public static void Main(string[] args)
 8     {
 9         var factory = new ConnectionFactory() { HostName = "localhost" };
10         using(var connection = factory.CreateConnection())
11         using(var channel = connection.CreateModel())
12         {
13             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
14 
15             var message = GetMessage(args);
16             var body = Encoding.UTF8.GetBytes(message);
17             channel.BasicPublish(exchange: "logs",
18                                  routingKey: "",
19                                  basicProperties: null,
20                                  body: body);
21             Console.WriteLine(" [x] Sent {0}", message);
22         }
23 
24         Console.WriteLine(" Press [enter] to exit.");
25         Console.ReadLine();
26     }
27 
28     private static string GetMessage(string[] args)
29     {
30         return ((args.Length > 0)
31                ? string.Join(" ", args)
32                : "info: Hello World!");
33     }
34 }
技術分享圖片

(EmitLog.cs 的源碼)

如你所見,在建立連接後,我們聲明了【消息交換機】。此步驟是必要的,因為對非存在【消息交換機】的發送是被禁止的。

如果沒有隊列綁定到【消息交換機】,消息將會丟失,但對我們來說沒有問題;如果沒有【消費者】正在偵聽,我們可以安全地丟棄消息。

以下是ReceiveLogs.cs的代碼:

技術分享圖片
 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class ReceiveLogs
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
15 
16             var queueName = channel.QueueDeclare().QueueName;
17             channel.QueueBind(queue: queueName,
18                               exchange: "logs",
19                               routingKey: "");
20 
21             Console.WriteLine(" [*] Waiting for logs.");
22 
23             var consumer = new EventingBasicConsumer(channel);
24             consumer.Received += (model, ea) =>
25             {
26                 var body = ea.Body;
27                 var message = Encoding.UTF8.GetString(body);
28                 Console.WriteLine(" [x] {0}", message);
29             };
30             channel.BasicConsume(queue: queueName,
31                                  noAck: true,
32                                  consumer: consumer);
33 
34             Console.WriteLine(" Press [enter] to exit.");
35             Console.ReadLine();
36         }
37     }
38 }
技術分享圖片


(ReceiveLogs.cs 的源碼)

按照教程一的安裝說明生成的EmitLogs和ReceiveLogs兩個項目文件。

如果要將日誌保存到文件,只需打開控制臺並鍵入:

cd ReceiveLogs
dotnet run > logs_from_rabbit.log

如果你希望看到你的屏幕上的日誌,生成一個新的終端和運行:

cd ReceiveLogs
dotnet run

當然,要發送日誌類型:

cd EmitLog
dotnet run

使用rabbitmqctl list_bindings可以驗證代碼確實創建了我們想要的【綁定】和【隊列】。運行兩個ReceiveLogs.cs程序時,您應該看到如下所示:

rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.


對結果的解釋很簡單:來自【消息交換機】日誌的數據轉到具有服務器分配名稱的兩個隊列。 這正是我們的意圖。

好了,終於翻譯了第三篇教程了,翻譯的不好,請見諒。如有大家英文比較好可以查看原文地址:http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

【c#】RabbitMQ學習文檔(三)Publish/Subscribe(發布/訂閱)