1. 程式人生 > >RabbitMQ 官方NET教程(四)【路由選擇】

RabbitMQ 官方NET教程(四)【路由選擇】

在上一個教程中,我們構建了一個簡單的日誌記錄系統。 我們能夠廣播日誌訊息給所有你的接收者。

在本教程中,我們將為其新增一個功能 - 我們將讓日誌接收者可以僅訂閱一部分訊息。 例如,我們將能夠僅將關鍵的錯誤訊息寫入到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。

繫結(Bindings)

在以前的例子中,我們已經使用過繫結。類似下面的程式碼:

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

繫結表示轉發器與佇列之間的關係。我們也可以簡單的認為:佇列對該轉發器上的訊息感興趣。

繫結可以附帶一個額外的引數routingKey。 為了避免與BasicPublish引數混淆,我們將其稱為binding key。 這就是我們如何用一個鍵建立一個繫結:

channel.QueueBind(queue: queueName,
                  exchange: "direct_logs",
                  routingKey: "black");

繫結鍵的意義依賴於轉發器的型別。對於fanout型別,忽略此引數。

直接轉發(Direct exchange)

我們從上一個教程的日誌記錄系統向所有消費者廣播所有訊息。 我們希望將其擴充套件為允許基於其嚴重性進行過濾日誌訊息。 例如,我們可能希望將日誌訊息寫入磁碟的指令碼僅接收嚴重錯誤,而不會在警告或資訊日誌訊息上浪費磁碟空間。

我們正在使用一個fanout的交換機,它不給我們很大的靈活性 - 它只能無意識地轉發。

我們會使用direct轉發器。 direct型別的轉發器背後的路由演算法很簡單 - 訊息傳遞到binding key與訊息的routing key完全匹配的佇列。

為了說明,請考慮以下設定:
這裡寫圖片描述
在這個設定中,我們可以看到direct 型別的轉發器X與兩個佇列繫結。 第一個佇列與繫結鍵orange繫結,第二個佇列與轉發器間有兩個繫結,一個與繫結鍵black繫結,另一個與green繫結鍵繫結。

在這樣的設定中,釋出附帶一個選擇鍵(routing key) orange的訊息至交換機,將被導向到佇列Q1。 訊息附帶一個選擇鍵 (routing key)black

或者green將會被導向到Q2。 所有其他訊息將被丟棄。

多重繫結(multiple bindings)

這裡寫圖片描述
使用相同的繫結鍵繫結多個佇列是完全合法的。 在我們的示例中,我們可以在XQ1之間新增繫結鍵black。 在這種情況下,direct交換將表現得像fanout,並將訊息廣播到所有匹配的佇列。 附帶選擇鍵black的訊息將傳送到Q1和Q2。

傳送日誌(Emittinglogs)

我們將此模型用於日誌記錄系統。我們將訊息傳送到direct型別的轉發器而不是fanout型別。這樣的話, 接收程式可以根據嚴重性來選擇接收。 我們首先關注傳送日誌的程式碼:

一如以往,我們需要先建立一個轉發器:

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

然後我們準備傳送一條訊息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

為了簡化程式碼,我們假定severityinfowarningerror中的一個。

訂閱

接收訊息將像上一個教程類似,只有一點不同 - 我們將為每個我們感興趣的嚴重性型別的日誌建立一個新的繫結。

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

完整的例項

這裡寫圖片描述

EmitLogDirect.cs 類的程式碼:

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");

            var severity = (args.Length > 0) ? args[0] : "info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip( 1 ).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs",
                                 routingKey: severity,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

ReceiveLogsDirect.cs的程式碼:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var severity in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs",
                                  routingKey: severity);
            }

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

如果您只想將warningerror(而不是info)儲存到檔案中,只需開啟控制檯並鍵入:

cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log

如果您想檢視螢幕上的所有日誌訊息,請開啟一個新終端,然後執行以下操作:

cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C

而且,例如,要發出error日誌訊息,只需鍵入:

cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'