1. 程式人生 > >RabbitMQ (十二) 遠程過程調用(RPC)

RabbitMQ (十二) 遠程過程調用(RPC)

false name const 計算 包含 而不是 true 希望 helper

在遠程計算機上運行一個函數並等待結果,我們通常叫這種模式為遠程過程調用或者RPC.

通過 RabbitMQ 進行 RPC 很容易,客戶端發送請求消息,服務器回復響應消息.為了接收響應,我們需要發送帶有“回調”隊列地址的請求.

同時,這裏面涉及到幾個比較重要的消息屬性:

消息屬性

  • Durable : 將消息標記為持久或者非持久;
  • DeliveryMode:熟悉 AMQP 0-9-1協議的人可以選擇使用此屬性而不是Persistent,他們控制著同樣的事情;
  • ContentType:用於描述編碼的mime類型.例如,對於經常使用的JSON編碼,將此屬性設置為:application / json是一種很好的做法;

  • ReplyTo:通常用於命名回調隊列;
  • CorrelationId:用於將RPC響應與請求相關聯;

相關ID

在上面介紹的參數中,可以看 ReplyTo 屬性可以定義該消息的回調隊列,也就是說我們可以為每個RPC請求創建一個回調隊列。但這是非常低效的,更好的方法是為每個客戶端(多個消費者)創建一個回調隊列。

這引發了一個新問題,在該隊列中收到響應後,不清楚響應屬於哪個請求。

這時候, CorrelationId 屬性就發揮它的作用了

我們為每個請求的 CorrelationId 屬性設置為唯一值。然後,當我們在回調隊列中收到消息時,我們將查看此屬性,並根據該屬性,我們將能夠將響應與請求進行匹配。

如果我們看到未知的 CorrelationId 值,我們可以安全地丟棄該消息,因為它不屬於我們的請求。

為什麽我們應該忽略回調隊列中的未知消息,而不是因為錯誤而失敗?

這是由於服務器端存在競爭條件的可能性。盡管不太可能,但是在向我們發送答案之後,發送請求的確認消息之前,RPC服務器可能會死亡。如果發生這種情況,重新啟動的RPC服務器將再次處理請求。這就是為什麽在客戶端上我們必須優雅地處理重復的響應,理想情況下RPC應該是冪等的。

摘要

技術分享圖片

RPC工作流程:

  • 當客戶端啟動時,創建一個匿名的獨占回調隊列.(匿名最好,當然也可以不是匿名的)
  • 對於RPC請求,客戶端發送帶有兩個屬性的消息: ReplyTo
    (設置為回調隊列)和 CorrelationId(設置為每個請求的唯一值)。
  • 請求被發送到 rpc_queue隊列。
  • RPC worker(aka:server)正在等待該隊列上的請求。當請求出現時,它會執行函數並使用ReplyTo屬性中的隊列將結果返回給客戶端
  • 客戶端等待回調隊列上的數據。出現消息時,它會檢查CorrelationId屬性。如果它與請求中的值匹配,則將響應返回給應用程序。

思路的轉換

在進行RPC通信時,我們不再叫"生產者","消費者"了,而是改叫"客戶端","服務器".因為在 RPC 中,

客戶端即是一個生產者,因為它要發送請求消息給服務器,同時,它也是一個消費者,因為它還要接收服務器發送過來的響應消息.

而服務器即是一個消費者,因為它要接收客戶端發送過來的請求消息,同時,它也是一個生產者,因為它執行完函數後,還需要發送響應消息給客戶端.

我們把上面的圖一分為二來看:

技術分享圖片

服務器代碼

    internal class Program
    {
        private const string RequestQueueName = "rpc_queue";

        private static void Main(string[] args)
        {
            using (RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection())
            using (RabbitMQ.Client.IModel channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: RequestQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(0, 1, false);

                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: RequestQueueName, autoAck: false, consumerTag: "", noLocal: false, exclusive: false, arguments: null, consumer: consumer);
                Console.WriteLine("server 開始等待 RPC 請求");

                consumer.Received += (s, e) =>
                {
                    string response = null;
                    byte[] bytes = e.Body;
                    RabbitMQ.Client.IBasicProperties pros = e.BasicProperties;//拿到這條請求消息的屬性
                    RabbitMQ.Client.IBasicProperties replyPros = channel.CreateBasicProperties();//創建響應消息的屬性
                    replyPros.CorrelationId = pros.CorrelationId;//將請求消息的id賦值給響應消息,這個id就相當於請求消息的身份證

                    try
                    {
                        string msg = Encoding.UTF8.GetString(bytes);
                        int n = int.Parse(msg);
                        Console.WriteLine($"執行函數 Fib(int n) , 入參為 {msg}");
                        response = Fib(n).ToString();//運行函數,拿到結果
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine(exception);
                        response = string.Empty;
                    }
                    finally
                    {
                        byte[] responseBytes = Encoding.UTF8.GetBytes(response);//創建響應消息的字節碼
                        //將響應消息發送到請求消息的屬性中指定的響應隊列
                        channel.BasicPublish(exchange: "", routingKey: pros.ReplyTo, mandatory: false, basicProperties: replyPros, body: responseBytes);

                        //發送響應消息後,手動確認已經收到請求消息
                        channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
                    }
                };

                Console.WriteLine("按 enter 退出");
                Console.ReadLine();
            }
        }

        /// <summary>
        /// 服務器的函數
        /// </summary>
        /// <param name="n"></param>
        /// <returns></returns>
        private static int Fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }
            return Fib(n - 1) + Fib(n - 2);
        }
    }

服務器代碼非常簡單:

  • 像往常一樣,我們首先建立連接,通道和聲明隊列。
  • 我們可能希望運行多個服務器進程。為了在多個服務器上平均分配負載,我們需要channel.BasicQos中設置 prefetchCount設置
  • 我們使用BasicConsume來訪問隊列。然後我們註冊一個交付處理程序,我們在其中完成工作並發回響應。

客戶端代碼

    public class MyClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly IBasicProperties pros;//請求消息屬性
        private readonly EventingBasicConsumer consumer;
        private readonly string replyQueueName;//響應隊列名稱
        private const string requestQueueName = "rpc_queue";//請求隊列名稱
        private readonly BlockingCollection<string> responseQueue = new BlockingCollection<string>();//存儲響應消息

        public MyClient()
        {
            connection = ConnectionHelper.GetConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;//聲明一個隨機的,獨占的,自動刪除的,非持久化的響應隊列
            consumer = new EventingBasicConsumer(channel);//創建一個消費者
            pros = channel.CreateBasicProperties();
            string correlationId = Guid.NewGuid().ToString();//創建一個"身份證"
            pros.CorrelationId = correlationId;
            pros.ReplyTo = replyQueueName;//設置回調隊列

            consumer.Received += (s, e) =>
            {
                string response = Encoding.UTF8.GetString(e.Body);//拿到響應消息
                if (e.BasicProperties.CorrelationId.Equals(correlationId))//確認身份
                {
                    responseQueue.Add(response);
                }
            };
        }


        /// <summary>
        /// 發起請求
        /// </summary>
        /// <param name="msg">請求消息</param>
        /// <returns>請求的結果</returns>
        public string Call(string msg)
        {
            byte[] bytes = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchange: "", routingKey: requestQueueName, basicProperties: pros, body: bytes);//向請求隊列發送請求消息.

            //在發送請求消息(發起請求)後,再定義客戶端需要消費的回復隊列,並且設置應答模式為 自動應答.因為RPC中,服務器不用關心客戶端是否收到了響應
            channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);

            return responseQueue.Take();//返回本次請求的結果
        }


        /// <summary>
        /// 關閉客戶端
        /// </summary>
        public void Close()
        {
            channel.Close();
            connection.Close();
        }
    }


    internal class Program
    {
        private static void Main(string[] args)
        {
            MyClient client = new MyClient();
            while (true)
            {
                Console.WriteLine("請輸入您要發送的請求消息 : ");
                string request = Console.ReadLine();
                if (string.IsNullOrWhiteSpace(request))
                {
                    continue;
                }
                if (request.ToLower().Equals("q"))
                {
                    break;
                }
                string response = client.Call(request);
                Console.WriteLine("請求的結果 : " + response);
            }
            client.Close();
        }
    }

客戶端代碼稍微復雜一些:

  • 我們建立一個連接和通道,並為回復聲明一個獨有的“回調”隊列。
  • 我們訂閱了‘回調‘隊列,以便我們可以接收RPC響應。
  • 我們的Call方法生成實際的RPC請求。
  • 在這裏,我們首先生成一個唯一的CorrelationId 數並保存它 - 客戶端將使用該值來捕獲適當的響應。
  • 接下來,我們發布請求消息,其中包含兩個屬性: ReplyToCorrelationId
  • 在這一點上,我們可以坐下來等待正確的響應到來。
  • 客戶端正在做一個非常簡單的工作,對於每個響應消息,它檢查CorrelationId 是否是我們正在尋找的那個。如果是這樣,它會保存響應。
  • 最後,我們將響應返回給用戶。

此處介紹的設計並不是RPC服務的唯一可能實現,但它具有一些重要優勢:

  • 如果RPC服務器太慢,您可以通過運行另一個服務器來擴展。嘗試在新控制臺中運行第二個RPCServer
  • 在客戶端,RPC只需要發送和接收一條消息。不需要像QueueDeclare這樣的同步調用因此,對於單個RPC請求,RPC客戶端只需要一次網絡往返。

我們的代碼仍然相當簡單,並不試圖解決更復雜(但重要)的問題,例如:

  • 如果沒有運行服務器,客戶應該如何反應?
  • 客戶端是否應該為RPC設置某種超時?
  • 如果服務器出現故障並引發異常,是否應將其轉發給客戶端?
  • 在處理之前防止無效的傳入消息(例如檢查邊界,類型)。

RabbitMQ (十二) 遠程過程調用(RPC)