1. 程式人生 > >【c#】RabbitMQ學習文檔(四)Routing(路由)

【c#】RabbitMQ學習文檔(四)Routing(路由)

sin chan command 接收 color prop html 記錄 blank

原文:【c#】RabbitMQ學習文檔(四)Routing(路由)

(使用Net客戶端)

在上一個教程中,我們構建了一個簡單的日誌系統,我們能夠向許多消息接受者廣播發送日誌消息。

在本教程中,我們將為其添加一項功能 ,這個功能是我們將只訂閱消息的一個子集成為可能。 例如,我們可以只將關鍵的錯誤消息輸出到日誌文件(以節省磁盤空間),同時仍然可以在控制臺上打印所有日誌消息。

1、綁定

在以前的例子中,我們已經創建了綁定。 你可能會記得如下代碼:

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");


【綁定】是【消息交換機】和【隊列】之間的關系紐帶,通過綁定把二者關聯起來。 這可以簡單地理解為:隊列可以接收來自此【消息交換機】的消息。

【綁定】可以占用額外的路由選擇參數。 為了避免與BasicPublish參數混淆,我們將其稱為【綁定鍵】。 以下代碼就是如何用一個鍵值來創建一個綁定:

channel.QueueBind(queue: queueName,
                  exchange: "direct_logs",
                  routingKey: "black");


【綁定鍵】的含義取決於交換類型。 以前我們使用的【Fanout】類型的【消息交換機】忽略了它的取值。

2、直接交換

在上一個教程中,我們的日誌記錄系統向所有【消費者】發送所有消息。 我們希望將其擴展為允許基於其嚴重性過濾消息。 例如,我們可能希望將寫入磁盤日誌消息的腳本僅接受嚴重錯誤,而不會在警告或信息日誌消息上浪費磁盤空間。

我們正在使用一個【Fanout】類型的【消息交換機】,它不會給我們帶來很大的靈活性 - 它只能無意識地發送。

我們將使用一個【Direct】類型的【消息交換機】。 直接轉換路由的背後的算法其實是很簡單的 - 把消息傳遞到【綁定鍵 binding key

】和消息的【路由鍵 routing key】完全匹配的隊列中。

為了說明,請考慮以下設置:

技術分享圖片

在這個設置中,我們可以看到【Direct】類型的【消息交換機】X與兩個隊列相綁定。 第一個隊列與【綁定鍵】的值是Orange相綁定的,第二個隊列有兩個綁定,一個【綁定鍵】的值是black,另一個【綁定鍵】的值是green。

在這樣的設置中,發布到具有【路由鍵】為orange的【消息交換機】的消息將被路由到隊列Q1。 具有black或green【路由鍵】的消息將轉到Q2。 所有其他消息將被丟棄。


3、多重綁定

技術分享圖片

使用相同的【綁定鍵】綁定多個隊列是完全合法的。 在我們的示例中,我們可以在X和Q1之間添加【綁定鍵】是black的綁定。 在這種情況下,【direct】類型的【消息交換機】將表現得像【Fanout】類型的【消息交換機】,並將消息發送到所有匹配的隊列。 具有【路由鍵】是black的消息將傳送到Q1和Q2。


4、發出日誌



我們將發送消息到【Direct】類型的【消息交換機】來替換【fanout】類型的【消息交換機】,在我們現在的日誌系統將使用此模型。 我們將提供日誌嚴重性作為【路由鍵】。 這樣接收腳本就能夠選擇想要接收的嚴重性。 我們首先關註發出日誌。

像以前一樣,我們首先要建立一個【消息交換機】:

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

現在,我們準備發送消息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);


為了簡化事情,我們假設“嚴重性”可以是“信息”,“警告”,“錯誤”之一。

5、訂閱

接收消息將像上一個教程一樣工作,除了一個例外 - 我們將為每個我們感興趣的嚴重性創建一個新的綁定。

技術分享圖片
var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}
技術分享圖片


6、整合

技術分享圖片

以下是EmitLogDirect.cs類的代碼:

技術分享圖片
 1 using System;
 2 using System.Linq;
 3 using RabbitMQ.Client;
 4 using System.Text;
 5 
 6 class EmitLogDirect
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "direct_logs",
15                                     type: "direct");
16 
17             var severity = (args.Length > 0) ? args[0] : "info";
18             var message = (args.Length > 1)
19                           ? string.Join(" ", args.Skip( 1 ).ToArray())
20                           : "Hello World!";
21             var body = Encoding.UTF8.GetBytes(message);
22             channel.BasicPublish(exchange: "direct_logs",
23                                  routingKey: severity,
24                                  basicProperties: null,
25                                  body: body);
26             Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", severity, message);
27         }
28 
29         Console.WriteLine(" Press [enter] to exit.");
30         Console.ReadLine();
31     }
32 }
技術分享圖片


以下是ReceiveLogsDirect.cs類的代碼:

技術分享圖片
 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class ReceiveLogsDirect
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "direct_logs",
15                                     type: "direct");
16             var queueName = channel.QueueDeclare().QueueName;
17 
18             if(args.Length < 1)
19             {
20                 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
21                                         Environment.GetCommandLineArgs()[0]);
22                 Console.WriteLine(" Press [enter] to exit.");
23                 Console.ReadLine();
24                 Environment.ExitCode = 1;
25                 return;
26             }
27 
28             foreach(var severity in args)
29             {
30                 channel.QueueBind(queue: queueName,
31                                   exchange: "direct_logs",
32                                   routingKey: severity);
33             }
34 
35             Console.WriteLine(" [*] Waiting for messages.");
36 
37             var consumer = new EventingBasicConsumer(channel);
38             consumer.Received += (model, ea) =>
39             {
40                 var body = ea.Body;
41                 var message = Encoding.UTF8.GetString(body);
42                 var routingKey = ea.RoutingKey;
43                 Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘",
44                                   routingKey, message);
45             };
46             channel.BasicConsume(queue: queueName,
47                                  noAck: true,
48                                  consumer: consumer);
49 
50             Console.WriteLine(" Press [enter] to exit.");
51             Console.ReadLine();
52         }
53     }
54 }
技術分享圖片


如果您只想將“警告”和“錯誤”(而不是“信息”)保存到文件中,只需打開控制臺並鍵入:

cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log


如果您想查看屏幕上的所有日誌消息,請打開一個新終端,然後執行以下操作:

cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C


而且,例如,要發出錯誤日誌消息,只需鍵入:

cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent error:Run. Run. Or it will explode.


以下是原文地址:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

今天這篇文章終於翻譯完了,整個系列還有幾篇沒翻譯。英文水平有限,錯誤在所難免,歡迎大家提出來,共同學習。

【c#】RabbitMQ學習文檔(四)Routing(路由)