RabbitMQ (十二) 遠程過程調用(RPC)
在遠程計算機上運行一個函數並等待結果,我們通常叫這種模式為遠程過程調用或者RPC.
通過 RabbitMQ 進行 RPC 很容易,客戶端發送請求消息,服務器回復響應消息.為了接收響應,我們需要發送帶有“回調”隊列地址的請求.
同時,這裏面涉及到幾個比較重要的消息屬性:
消息屬性
- Durable : 將消息標記為持久或者非持久;
- DeliveryMode:熟悉 AMQP 0-9-1協議的人可以選擇使用此屬性而不是Persistent,他們控制著同樣的事情;
- ContentType:用於描述編碼的mime類型.例如,對於經常使用的JSON編碼,將此屬性設置為:application / json是一種很好的做法;
- ReplyTo:通常用於命名回調隊列;
- CorrelationId:用於將RPC響應與請求相關聯;
相關ID
在上面介紹的參數中,可以看 ReplyTo 屬性可以定義該消息的回調隊列,也就是說我們可以為每個RPC請求創建一個回調隊列。但這是非常低效的,更好的方法是為每個客戶端(多個消費者)創建一個回調隊列。
這引發了一個新問題,在該隊列中收到響應後,不清楚響應屬於哪個請求。
這時候, CorrelationId 屬性就發揮它的作用了 。
我們為每個請求的 CorrelationId 屬性設置為唯一值。然後,當我們在回調隊列中收到消息時,我們將查看此屬性,並根據該屬性,我們將能夠將響應與請求進行匹配。
為什麽我們應該忽略回調隊列中的未知消息,而不是因為錯誤而失敗?
這是由於服務器端存在競爭條件的可能性。盡管不太可能,但是在向我們發送答案之後,發送請求的確認消息之前,RPC服務器可能會死亡。如果發生這種情況,重新啟動的RPC服務器將再次處理請求。這就是為什麽在客戶端上我們必須優雅地處理重復的響應,理想情況下RPC應該是冪等的。
摘要
RPC工作流程:
- 當客戶端啟動時,創建一個匿名的獨占回調隊列.(匿名最好,當然也可以不是匿名的)
- 對於RPC請求,客戶端發送帶有兩個屬性的消息: ReplyTo
- 請求被發送到 rpc_queue隊列。
- RPC worker(aka:server)正在等待該隊列上的請求。當請求出現時,它會執行函數並使用ReplyTo屬性中的隊列將結果返回給客戶端。
- 客戶端等待回調隊列上的數據。出現消息時,它會檢查CorrelationId屬性。如果它與請求中的值匹配,則將響應返回給應用程序。
思路的轉換
在進行RPC通信時,我們不再叫"生產者","消費者"了,而是改叫"客戶端","服務器".因為在 RPC 中,
客戶端即是一個生產者,因為它要發送請求消息給服務器,同時,它也是一個消費者,因為它還要接收服務器發送過來的響應消息.
而服務器即是一個消費者,因為它要接收客戶端發送過來的請求消息,同時,它也是一個生產者,因為它執行完函數後,還需要發送響應消息給客戶端.
我們把上面的圖一分為二來看:
服務器代碼
internal class Program { private const string RequestQueueName = "rpc_queue"; private static void Main(string[] args) { using (RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection()) using (RabbitMQ.Client.IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: RequestQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: RequestQueueName, autoAck: false, consumerTag: "", noLocal: false, exclusive: false, arguments: null, consumer: consumer); Console.WriteLine("server 開始等待 RPC 請求"); consumer.Received += (s, e) => { string response = null; byte[] bytes = e.Body; RabbitMQ.Client.IBasicProperties pros = e.BasicProperties;//拿到這條請求消息的屬性 RabbitMQ.Client.IBasicProperties replyPros = channel.CreateBasicProperties();//創建響應消息的屬性 replyPros.CorrelationId = pros.CorrelationId;//將請求消息的id賦值給響應消息,這個id就相當於請求消息的身份證 try { string msg = Encoding.UTF8.GetString(bytes); int n = int.Parse(msg); Console.WriteLine($"執行函數 Fib(int n) , 入參為 {msg}"); response = Fib(n).ToString();//運行函數,拿到結果 } catch (Exception exception) { Console.WriteLine(exception); response = string.Empty; } finally { byte[] responseBytes = Encoding.UTF8.GetBytes(response);//創建響應消息的字節碼 //將響應消息發送到請求消息的屬性中指定的響應隊列 channel.BasicPublish(exchange: "", routingKey: pros.ReplyTo, mandatory: false, basicProperties: replyPros, body: responseBytes); //發送響應消息後,手動確認已經收到請求消息 channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); } }; Console.WriteLine("按 enter 退出"); Console.ReadLine(); } } /// <summary> /// 服務器的函數 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int Fib(int n) { if (n == 0 || n == 1) { return n; } return Fib(n - 1) + Fib(n - 2); } }
服務器代碼非常簡單:
- 像往常一樣,我們首先建立連接,通道和聲明隊列。
- 我們可能希望運行多個服務器進程。為了在多個服務器上平均分配負載,我們需要在channel.BasicQos中設置 prefetchCount設置。
- 我們使用BasicConsume來訪問隊列。然後我們註冊一個交付處理程序,我們在其中完成工作並發回響應。
客戶端代碼
public class MyClient { private readonly IConnection connection; private readonly IModel channel; private readonly IBasicProperties pros;//請求消息屬性 private readonly EventingBasicConsumer consumer; private readonly string replyQueueName;//響應隊列名稱 private const string requestQueueName = "rpc_queue";//請求隊列名稱 private readonly BlockingCollection<string> responseQueue = new BlockingCollection<string>();//存儲響應消息 public MyClient() { connection = ConnectionHelper.GetConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName;//聲明一個隨機的,獨占的,自動刪除的,非持久化的響應隊列 consumer = new EventingBasicConsumer(channel);//創建一個消費者 pros = channel.CreateBasicProperties(); string correlationId = Guid.NewGuid().ToString();//創建一個"身份證" pros.CorrelationId = correlationId; pros.ReplyTo = replyQueueName;//設置回調隊列 consumer.Received += (s, e) => { string response = Encoding.UTF8.GetString(e.Body);//拿到響應消息 if (e.BasicProperties.CorrelationId.Equals(correlationId))//確認身份 { responseQueue.Add(response); } }; } /// <summary> /// 發起請求 /// </summary> /// <param name="msg">請求消息</param> /// <returns>請求的結果</returns> public string Call(string msg) { byte[] bytes = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: requestQueueName, basicProperties: pros, body: bytes);//向請求隊列發送請求消息. //在發送請求消息(發起請求)後,再定義客戶端需要消費的回復隊列,並且設置應答模式為 自動應答.因為RPC中,服務器不用關心客戶端是否收到了響應 channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer); return responseQueue.Take();//返回本次請求的結果 } /// <summary> /// 關閉客戶端 /// </summary> public void Close() { channel.Close(); connection.Close(); } } internal class Program { private static void Main(string[] args) { MyClient client = new MyClient(); while (true) { Console.WriteLine("請輸入您要發送的請求消息 : "); string request = Console.ReadLine(); if (string.IsNullOrWhiteSpace(request)) { continue; } if (request.ToLower().Equals("q")) { break; } string response = client.Call(request); Console.WriteLine("請求的結果 : " + response); } client.Close(); } }
客戶端代碼稍微復雜一些:
- 我們建立一個連接和通道,並為回復聲明一個獨有的“回調”隊列。
- 我們訂閱了‘回調‘隊列,以便我們可以接收RPC響應。
- 我們的Call方法生成實際的RPC請求。
- 在這裏,我們首先生成一個唯一的CorrelationId 數並保存它 - 客戶端將使用該值來捕獲適當的響應。
- 接下來,我們發布請求消息,其中包含兩個屬性: ReplyTo和CorrelationId。
- 在這一點上,我們可以坐下來等待正確的響應到來。
- 客戶端正在做一個非常簡單的工作,對於每個響應消息,它檢查CorrelationId 是否是我們正在尋找的那個。如果是這樣,它會保存響應。
- 最後,我們將響應返回給用戶。
此處介紹的設計並不是RPC服務的唯一可能實現,但它具有一些重要優勢:
- 如果RPC服務器太慢,您可以通過運行另一個服務器來擴展。嘗試在新控制臺中運行第二個RPCServer。
- 在客戶端,RPC只需要發送和接收一條消息。不需要像QueueDeclare這樣的同步調用 。因此,對於單個RPC請求,RPC客戶端只需要一次網絡往返。
我們的代碼仍然相當簡單,並不試圖解決更復雜(但重要)的問題,例如:
- 如果沒有運行服務器,客戶應該如何反應?
- 客戶端是否應該為RPC設置某種超時?
- 如果服務器出現故障並引發異常,是否應將其轉發給客戶端?
- 在處理之前防止無效的傳入消息(例如檢查邊界,類型)。
RabbitMQ (十二) 遠程過程調用(RPC)