1. 程式人生 > >[譯]RabbitMQ教程C#版

[譯]RabbitMQ教程C#版

先決條件
本教程假定 RabbitMQ 已經安裝,並執行在localhost標準埠(5672)。如果你使用不同的主機、埠或證書,則需要調整連線設定。

從哪裡獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯絡我們

在第 教程[2] 中,我們學習瞭如何使用工作佇列在多個工作單元之間分配耗時任務。

但是如果我們想要執行一個在遠端計算機上的函式並等待其結果呢?這將是另外一回事了。這種模式通常被稱為 遠端過程呼叫RPC

在本篇教程中,我們將使用 RabbitMQ 構建一個 RPC 系統:一個客戶端和一個可擴充套件的 RPC 伺服器。由於我們沒有什麼耗時任務值得分發,那乾脆就建立一個返回斐波那契數列的虛擬 RPC 服務吧。

客戶端介面

為了說明如何使用 RPC 服務,我們將建立一個簡單的客戶端類。該類將暴露一個名為Call的方法,用來發送 RPC 請求並且保持阻塞狀態,直到接收到應答為止。

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

關於 RPC 的說明

儘管 RPC 在計算機中是一種很常見的模式,但它經常受到批評。問題出現在當程式設計師不知道一個函式是本地呼叫還是一個耗時的 RPC 請求。這樣的混淆,會導致系統不可預測,以及給除錯增加不必要的複雜性。誤用 RPC 可能會導致不可維護的混亂程式碼,而不是簡化軟體。

牢記這些限制,請考慮如下建議:

  • 確保可以明顯區分哪些函式是本地呼叫,哪些是遠端呼叫。
  • 為您的系統編寫文件,明確元件之間的依賴關係。
  • 捕獲異常,當 RPC 服務長時間宕機時客戶端該如何應對。

當有疑問的時候可以先避免使用 RPC。如果可以的話,考慮使用非同步管道 - 而不是類似 RPC 的阻塞,其會將結果以非同步的方式推送到下一個計算階段。

回撥佇列

一般來講,基於 RabbitMQ 進行 RPC 通訊是非常簡單的,客戶端傳送一個請求訊息,然後服務端用一個響應訊息作為應答。為了能接收到響應,我們需要在傳送請求過程中指定一個'callback'佇列地址。

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);

// ... then code to read a response message from the callback_queue ...

訊息屬性

AMQP 0-9-1 協議在訊息中預定義了一個包含 14 個屬性的集合,大多數屬性很少使用,但以下情況除外:
Persistent:將訊息標記為持久的(值為2)或者瞬時的(其他值),可以參考 教程[2]
DeliveryMode:熟悉 AMQP 協議的人可以選擇此屬性而不是熟悉協議的人可以選擇使用此屬性而不是Persistent,它們控制的東西是一樣的。
ContentType:用於描述編碼的 mime 型別。例如,對於經常使用的 JSON 編碼,將此屬性設定為:application/json是一種很好的做法。
ReplyTo:通常用於命名回撥佇列。
CorrelationId:用於將 RPC 響應與請求相關聯。

關聯ID

在上面介紹的方法中,我們建議為每個 RPC 請求建立一個回撥佇列,但是這種方式效率低。幸運的是我們有一種更好的方式,那就是為每個客戶端建立一個獨立的回撥佇列。

這種方式會引出一個新的問題,在收到響應的回撥佇列中,它無法區分響應屬於哪一個請求,此時便是CorrelationId屬性的所用之處。我們將為每個請求的CorrelationId設定一個唯一值。之後當我們在回撥佇列接收到響應的時候,再去檢查下這個屬性是否和請求中的值匹配,如此一來,我們就可以把響應和請求關聯起來了。如果出現一個未知的CorrelationId值,我們可以安全的銷燬這個訊息,因為這個訊息不屬於我們的請求。

你可能會問,為什麼我們應該忽略回撥佇列中的未知的訊息,而不是用錯誤來標識失敗呢?這是因為於伺服器端可能存在競爭條件。雖然不太可能,但是 RPC 伺服器可能在僅傳送了響應訊息而未傳送訊息確認的情況下掛掉,如果出現這種情況,RPC 伺服器重啟之後將會重新處理該請求。這就是為什麼在客戶端上我們必須優雅地處理重複的響應,並且理想情況下 RPC 應該是冪等的。

總結

我們的 RPC 會是這樣工作:

  • 客戶端啟動時,會建立一個匿名的獨佔回撥佇列。
  • 對於 RPC 請求,客戶端傳送帶有兩個屬性的訊息:ReplyTo(設定為回撥佇列)和CorrelationId(為每個請求設定唯一值)。
  • 請求被髮送到rpc_queue佇列。
  • RPC 工作執行緒(或者叫:伺服器)正在等待該佇列上的請求。當出現請求時,它會執行該作業,並使用ReplyTo屬性設定的佇列將帶有結果的訊息傳送回客戶端。
  • 客戶端等待回撥佇列上的資料。出現訊息時,它會檢查CorrelationId屬性。如果它與請求中的值匹配,則返回對應用程式的響應。

組合在一起

斐波納契 任務:

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

我們宣佈我們的斐波那契函式。並假定只允許有效的正整數輸入。 (不要期望這個適用於大數字,它可能是最慢的遞迴實現)。

我們的 RPC 服務端程式碼 RPCServer.cs 看起來如下所示:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    /// 

    /// Assumes only valid positive integer input.
    /// Don't expect this one to work for big numbers, and it's
    /// probably the slowest recursive implementation possible.
    /// 
    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

服務端程式碼非常簡單:

  • 像往常一樣,首先建立連線,通道和宣告佇列。
  • 我們可能希望執行多個伺服器程序。為了在多個伺服器上平均分配負載,我們需要設定channel.BasicQos中的prefetchCount值。
  • 使用BasicConsume訪問佇列,然後註冊一個交付處理程式,並在其中完成工作併發迴響應。
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

public RpcClient()
{
        var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); ;
    }

    public void Close()
    {
        connection.Close();
    }
}

public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got '{0}'", response);
        rpcClient.Close();
    }
}

客戶端程式碼稍微複雜一些:

  • 建立連線和通道,併為響應宣告一個獨有的 'callback' 佇列。
  • 訂閱這個 'callback' 佇列,以便可以接收到 RPC 響應。
  • Call方法用來生成實際的 RPC 請求。
  • 在這裡,我們首先生成一個唯一的CorrelationId編號並儲存它,while 迴圈會使用該值來捕獲匹配的響應。
  • 接下來,我們釋出請求訊息,其中包含兩個屬性:ReplyToCorrelationId
  • 此時,我們可以坐下來稍微一等,直到指定的響應到來。
  • while 迴圈做的工作非常簡單,對於每個響應訊息,它都會檢查CorrelationId是否是我們正在尋找的那一個。如果是這樣,它就會儲存該響應。
  • 最後,我們將響應返回給使用者。

客戶發出請求:

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

像往常一樣設定(請參見 教程[1]]):

我們的 RPC 服務現已準備就緒,現在可以啟動服務端:

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

要請求斐波納契數,請執行客戶端:

cd RPCClient
dotnet run
# => [x] Requesting fib(30)

這裡介紹的設計並不是 RPC 服務的唯一可能實現,但它仍具有一些重要優勢:

  • 如果 RPC 伺服器太慢,您可以通過執行另一個伺服器來擴充套件。嘗試在新開一個控制檯,執行第二個 RPCServer。
  • 在客戶端,RPC 只需要傳送和接收一條訊息。不需要像QueueDeclare一樣同步呼叫。因此,對於單個 RPC 請求,RPC 客戶端只需要一次網路往返。

我們的程式碼很簡單,也並沒有嘗試去解決更復雜(但很重要)的問題,比如就像:

  • 如果服務端沒有執行,客戶端應該如何反應?
  • 客戶端是否應該為 RPC 設定某種超時機制?
  • 如果服務端出現故障並引發異常,是否應將其轉發給客戶端?
  • 在處理之前防止無效的傳入訊息(例如:檢查邊界、型別)。

如果您想進行實驗,您可能會發現 管理 UI 對於檢視佇列非常有用。

寫在最後

本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容為準。水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。

  • 實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
  • 最後更新:2018-11-17