RabbitMQ(四):RPC的實現
一、RPC
RPC(Remote Procedure Call)—遠端過程呼叫,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。有很多方式可以實現,譬如UNIX RPC、REST API、WCF和SOAP。這些傳統的RPC實現方法有共同之處:那就是客戶端和伺服器端緊密相連,客戶端直接連線上伺服器,傳送一個請求,然後就停下來等待伺服器的應答。
這種點對點的性質模式有很多好處,它使得在小範圍內的拓撲變得簡單。但是當有眾多伺服器的時候,客戶端如何發現在那臺伺服器上可以找到其他想要的服務就變的麻煩,SOAP和大多數的企業RPC已經採用複雜的補充協議和服務目錄,但也帶來了額外的複雜度和眾多故障點。
但是,用RabbitMQ來實現RPC可以無需關心由那臺伺服器來處理,也不必擔心伺服器奔潰,只需要簡單的傳送訊息,然後等待響應即可。一般接觸RabbitMQ的都是用發後即忘模型,用於傳送郵件等通知或者處理其他並行處理事件,也就是AMQP的訊息是單向的。如何才能讓伺服器將處理結果返回給原始的客戶端呢?
二、訊息應答和私有佇列
RabbitMQ有一個優雅的解決方案:使用訊息來發迴應答。在每個AMQP訊息頭裡有個欄位reply_to .訊息的生產者可以通過該欄位來確定佇列的名稱,並監聽應答佇列等待應答。然後接收訊息的RPC伺服器能偶檢查reply_to欄位,並建立包含應答內容的新的訊息,並以佇列名稱為路由鍵,通過應答佇列將處理結果發回給生產者。這裡我們不需要建立應答佇列的名字也不需要將應答佇列繫結到交換器上,這是因為沒有宣告佇列的名稱RabbitMQ會自動申明,訊息釋出到RabbitMQ在沒有指名交換器的時候,RabbitMQ就會讓位目的地是應答佇列,而路由鍵就是應答佇列名稱。
所以RabbitMQ實現RPC需要比一般的訊息通訊多以下幾個步驟:
- 生產者建立一個應答佇列,並監聽該佇列。
- 生產者為訊息頭中的Reply_to和CorrelationId欄位賦值。reply_to是應答佇列的名稱,CorrelationId是相關標識由消費者返回後對比確認是返回我們的結果。
- 消費者返回生產者傳送的訊息頭,並且不需要繫結交換器,並將Reply_to引數作為路由鍵傳送訊息到應答佇列。
三、自己實現簡單的RPC
其實簡單的講就是生產者在傳送訊息後接收訊息,消費者在接受訊息後傳送訊息,生產者多了一步接收處理訊息,消費者多了一步傳送訊息。我這裡簡化了一些操作,爭取用最少的程式碼實現,具體程式碼如下:
生產者:
private static void MySelfRPCProducer() { var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672}; using (var conn = conn_factory.CreateConnection()) { using (var channel = conn.CreateModel()) { IBasicProperties pro = channel.CreateBasicProperties(); pro.ReplyTo = channel.QueueDeclare().QueueName;//建立應答佇列並返回佇列名稱,這個方法建立的佇列exclusive和auto_delete都是true,這樣可以確保沒有人能竊取資訊 pro.ContentType = "text/plain"; string corrId = Guid.NewGuid().ToString(); pro.CorrelationId = corrId; channel.BasicPublish("", "rpc_queue", pro, Encoding.UTF8.GetBytes("小黃")); var consumer = new EventingBasicConsumer(channel); consumer.Received += (ea, ch) => { //比較CorrelationId確認是返回的我們的訊息 if (ch.BasicProperties.CorrelationId == corrId) { //處理返回結果 string msg = Encoding.UTF8.GetString(ch.Body); Console.WriteLine(msg); } }; string consumer_tag = channel.BasicConsume(pro.ReplyTo, true, consumer);//監聽應答佇列 channel.BasicCancel(consumer_tag); } } Console.ReadLine(); }
消費者:
private static void MySelfRPCCousmer() { var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672}; using (var conn = conn_factory.CreateConnection()) { using (var channel = conn.CreateModel()) { channel.QueueDeclare("rpc_queue", false, false, false, null); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(0, 1, false); consumer.Received += (ea, ch) => { string msg = Encoding.UTF8.GetString(ch.Body); Console.WriteLine("接收到訊息:" + msg); //傳送處理結果 channel.BasicPublish("", ch.BasicProperties.ReplyTo, ch.BasicProperties, Encoding.UTF8.GetBytes(msg + "給我回電話了")); channel.BasicAck(ch.DeliveryTag, false); }; string consumer_tag = channel.BasicConsume("rpc_queue", false, consumer); Console.ReadLine();//這裡先停止執行下面的程式碼,因為需要持續監聽,通道斷開就監聽不了了 channel.BasicCancel(consumer_tag); } } }
四、RabbitMQ封裝好的RPC
其實RabbitMQ已經封裝好了RPC相應的物件,分別是SimpleRpcClient和SimpleRpcServer。客戶端在初始化SimpleRpcClient後主要可以通過Call方法傳送訊息並返回服務端處理結果。服務端的SimpleRpcServer內部定義了很多虛方法,具體的訊息處理是我們自己決定的,所以需要繼承SimpleRpcServer後實現相應方法,通過實現重寫HandleSimpleCall方法可以返回給客戶端資料。具體程式碼如下所示:
客戶端:
private static void RabbitMQRPCProducer() { var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 }; using (var conn = conn_factory.CreateConnection()) { using (var channel = conn.CreateModel()) { //建立client的rpc SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "rpc_queue")); bool flag = true; var sendmsg = ""; while (flag) { Console.WriteLine("請輸入要傳送的訊息"); sendmsg = Console.ReadLine(); if (string.IsNullOrWhiteSpace(sendmsg)) { Console.Write("請輸入訊息"); continue; } var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg)); Console.WriteLine(Encoding.UTF8.GetString(msg)); } Console.ReadKey(); } } }
服務端:
private static void RabbitMQRPCCousmer() { var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 }; using (var conn = conn_factory.CreateConnection()) { //建立返回一個新的頻道 using (var channel = conn.CreateModel()) { channel.QueueDeclare("rpc_queue", false, false, false, null);//建立一個rpc queue SimpleRpcServer rpc = new MySimpleRpcServer(new Subscription(channel, "rpc_queue")); Console.WriteLine("服務端啟動成功"); rpc.MainLoop(); Console.ReadKey(); } } }
繼承實現方法:
class MySimpleRpcServer : SimpleRpcServer { public MySimpleRpcServer(Subscription subscription) : base(subscription) { } /// <summary> /// 執行完成後進行回撥 /// </summary> public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; return Encoding.UTF8.GetBytes($"給{Encoding.UTF8.GetString(body)}傳送簡訊成功"); } }
五、小結
以上就是RabbitMQ對於RPC的最簡單的實現,與大家共勉。