1. 程式人生 > >RabbitMQ: RPC 遠端過程呼叫

RabbitMQ: RPC 遠端過程呼叫

RabbitMQ RPC 就是通過訊息佇列(Message Queue)來實現rpc的功能,就是,客戶端向服務端傳送定義好的Queue訊息,其中攜帶的訊息就應該是服務端將要呼叫的方法的引數 ,並使用Propertis告訴服務端將結果返回到指定的Queue。

1.RabbitMQ RPC的特點

  • Message Queue把所有的請求訊息儲存起來,然後處理,和客戶端解耦。
  • Message Queue引入新的結點,系統的可靠性會受Message Queue結點的影響。
  • Message Queue是非同步單向的訊息。傳送訊息設計成是不需要等待訊息處理的完成。

所以對於有同步返回需求,Message Queue是個不錯的方向。

2.普通PRC的特點

  • 同步呼叫,對於要等待返回結果/處理結果的場景,RPC是可以非常自然直覺的使用方式。當然RPC也可以是非同步呼叫。
  • 由於等待結果,客戶端會有執行緒消耗。

如果以非同步RPC的方式使用,客戶端執行緒消耗可以去掉。但不能做到像訊息一樣暫存訊息請求,壓力會直接傳導到服務端。

3.適用場合說明

  • 希望同步得到結果的場合,RPC合適。
  • 希望使用簡單,則RPC;RPC操作基於介面,使用簡單,使用方式模擬本地呼叫。非同步的方式程式設計比較複雜。
  • 不希望客戶端受限於服務端的速度等,可以使用Message Queue。

4.RabbitMQ RPC工作流程:

 

基本概念:

Callback queue 回撥佇列

客戶端向伺服器傳送請求,伺服器端處理請求後,將其處理結果儲存在一個儲存體中。而客戶端為了獲得處理結果,那麼客戶在向伺服器傳送請求時,同時傳送一個回撥佇列地址reply_to。

Correlation id 關聯標識客戶端可能會發送多個請求給伺服器,當伺服器處理完後,客戶端無法辨別在回撥佇列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在傳送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回撥佇列中根據correlation_id欄位的值就可以分辨此響應屬於哪個請求。

流程說明

  • 當客戶端啟動的時候,它建立一個匿名獨享的回撥佇列。
  • 在 RPC 請求中,客戶端傳送帶有兩個屬性的訊息:一個是設定回撥佇列的 reply_to 屬性,另一個是設定唯一值的 correlation_id 屬性。
  • 將請求傳送到一個 rpc_queue 佇列中。
  • 伺服器等待請求傳送到這個佇列中來。當請求出現的時候,它執行他的工作並且將帶有執行結果的訊息傳送給 reply_to 欄位指定的佇列。
  • 客戶端等待回撥佇列裡的資料。當有訊息出現的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應用

 5.完整程式碼:

  1. 建立兩個控制檯程式,作為RPC Server和RPC Client, 引用 RabbitMQ.Client,

  2. RPC Server

    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: "rpc_queue",
                                     noAck: false,
                                     consumer: consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");

                while (true)
                {
                    string response = null;
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "",
                                             routingKey: props.ReplyTo,
                                             basicProperties: replyProps,
                                             body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                         multiple: false);
                    }
                }
            }
        }

        /// <summary>
        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers,
        /// and it's probably the slowest recursive implementation possible.
        /// </summary>
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            Thread.Sleep(1000 * 10);

            return n;
        }
    }

  3. RPC Client

    class Program
    {
        static void Main(string[] args)
        {
            for (int i = 0; i < 10; i++)
            {
                Stopwatch watch = new Stopwatch();

                watch.Start();

                var rpcClient = new RPCClient();

                Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));

                var response = rpcClient.Call(i.ToString());

                Console.WriteLine(" [.] Got '{0}'", response);

                rpcClient.Close();

                watch.Stop();

                Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
            }

            Console.WriteLine(" complete!!!! ");


            Console.ReadLine();
        }
    }

    class RPCClient
    {
        private IConnection connection;
        private IModel channel;
        private string replyQueueName;
        private QueueingBasicConsumer consumer;

        public RPCClient()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue: replyQueueName,
                                 noAck: true,
                                 consumer: consumer);
        }

        public string Call(string message)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = channel.CreateBasicProperties();
            props.ReplyTo = replyQueueName;
            props.CorrelationId = corrId;

            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "",
                                 routingKey: "rpc_queue",
                                 basicProperties: props,
                                 body: messageBytes);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId == corrId)
                {
                    return Encoding.UTF8.GetString(ea.Body);
                }
            }
        }

        public void Close()
        {
            connection.Close();
        }
    }

  4.分別執行Server和Client

6.最後

相關推薦

RabbitMQ: RPC 遠端過程呼叫

RabbitMQ RPC 就是通過訊息佇列(Message Queue)來實現rpc的功能,就是,客戶端向服務端傳送定義好的Queue訊息,其中攜帶的訊息就應該是服務端將要呼叫的方法的引數 ,並使用Propertis告訴服務端將結果返回到指定的Queue。 1.Rabbit

RabbitMQ的學習(四):RPC-遠端過程呼叫(純demo-可直接使用)

前言:在前面三篇文章中,分別對rabbitmq的搭建,並對rabbitmq常用的四種不同的交換機結合路由鍵編寫了各自的demo,可以參考: 1. RabbitMQ的學習(一):Windows下安裝及配置RabbitMQ,erlang環境變數; 2. RabbitMQ的學習(二):簡單的j

【圖文詳細 】Scala——RPC 遠端過程呼叫

1、RPC 遠端過程呼叫    1.1、RPC 概念  RPC(Remote Procedure Call)—遠端過程呼叫,它是一種通過網路從遠端計算機程式上請 求服務,而不需要了解底層網路技術的協議。RPC 協議假定某些傳輸協議的存在,如 TCP 或

用C程式碼簡要模擬實現一下RPC(遠端過程呼叫)並談談它在程式碼調測中的重要應用

        說明: 本文僅僅是一種模擬的RPC實現, 真正的RPC實現還是稍微有點複雜的。         我們來看看下面這個常見的場景: 在某系統中,我們要對某一函式進行調測, 但是, 很難很難構造出這個函式被呼叫的實際場景, 怎麼辦?         雖然很難構造

RPC遠端過程呼叫詳解

今天看了兩篇關於RPC遠端過程呼叫的,寫的很好,分享一下。 http://blog.csdn.net/mindfloating/article/details/39473807 http://blog.csdn.net/mindfloating/article/detail

淺談rpc(遠端過程呼叫)

資訊來源:邪惡八進位制資訊保安團隊 RPC協議2:這個協議是一個夠年頭的協議本文介紹用於ONC RPC協議規範。此協議使用XDR語言進行描述,並文不打算描述具體的使用細節而只介紹RPC協議本身。 ONC RPC是基於RPC呼叫模型,此模型和本地過程呼叫(LPC)類似。對於LPC而言,呼叫方只需要將參加放入一些

RPC(遠端過程呼叫協議)簡介

RPC框架解釋 誰能用通俗的語言解釋一下什麼是 RPC 框架? - 遠端過程呼叫協議RPC(Remote Procedure Call Protocol) 首先了解什麼叫RPC,為什麼要RPC,RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想

RPC遠端過程呼叫之我的理解(附帶專案希望有人交流)

最近在學習開發過程中使用到了阿里開發的dubbo框架,將專案進行分散式。 最近的學習瞭解到了一些關於RPC的原理,心血來潮就試著實現了一下自己的RPC功能。 專案主要分為三個子專案 API 專案 定義了通訊的資料模型和序列化反序列化所使用的工具以及專案測

RPC遠端過程呼叫原理及模擬RPC的Demo

什麼是 RPC由於各服務部署在不同機器,服務間的呼叫免不了網路通訊過程,服務消費方每呼叫一個服務都要寫一坨網路通訊相關的程式碼,不僅複雜而且極易出錯。如果有一種方式能讓我們像呼叫本地服務一樣呼叫遠端服務,而讓呼叫者對網路通訊這些細節透明,那麼將大大提高生產力,比如服務消費方在

RPC (遠端過程呼叫)

1、簡單元件介紹: 以Dubbo為例 RPC中可以認為有四個角色,消費者(Consumer),提供者(Provider),註冊中心(Registry),監控中心(Monitor),這個還是很好理解的,以前在同一系統的方法的呼叫者因為網路的存在,變

[譯]RabbitMQ教程C#版 - 遠端過程呼叫(RPC)

原文: [譯]RabbitMQ教程C#版 - 遠端過程呼叫(RPC) 先決條件 本教程假定 RabbitMQ 已經安裝,並執行在localhost標準埠(5672)。如果你使用不同的主機、埠或證書,則需要調整連線設定。 從哪裡獲得幫助 如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯絡我們。

輕鬆搞定RabbitMQ(七)——遠端過程呼叫RPC

翻譯:http://www.rabbitmq.com/tutorials/tutorial-six-java.html在第二篇博文中,我們已經瞭解到瞭如何使用工作佇列來向多個消費者分散耗時任務。但是付過我們需要在遠端電腦上執行一個方法然後等待結果,該怎麼辦?這是不同的需求。

.Net下RabbitMQ的使用(8) -- 遠端過程呼叫RPC

RPC是在計算中是一種常見的模式,是通常我要用訊息佇列來實現RPC有3個關鍵點: 1. 服務的定址 2. 訊息的接收 3. 訊息的關聯 在RabbitMQ的.net客戶端裡,提供了2個類:SimpleRpcClient 和 SimpleRpcServer

RabbitMQ入門:遠端過程呼叫(RPC)

假如我們想要呼叫遠端的一個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(Remote Procedure Call)。怎麼辦? 今天我們就用RabbitMQ來實現一個簡單的RPC系統:客戶端傳送一個請求訊息,服務端以一個響應訊息迴應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送一個回撥佇列用來

.Net下RabbitMQ的使用(7) -- 遠端過程呼叫RPC

RPC是在計算中是一種常見的模式,是通常我要用訊息佇列來實現RPC有3個關鍵點: 1. 服務的定址 2. 訊息的接收 3. 訊息的關聯 在RabbitMQ的.net客戶端裡,提供了2個類:SimpleRpcClient 和 SimpleRpcServer 來讓我們

RPC 協議 Remote process call 遠端過程呼叫

RPC資訊協議由兩個不同結構組成:呼叫資訊和答覆資訊。 簡單的說,RPC就是從一臺機器(客戶端)上通過引數傳遞的方式呼叫另  一臺機器(伺服器)上的一個函式或方法(可以統稱為服務)並得到返回的結果。 RPC 會隱藏底層的通訊細節(不需要直接處理Socket通訊或Http通訊

RPC遠端過程呼叫)簡介

RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。 之前聽過這個名詞,但是也只是大概記住了“遠端呼叫”之類的關鍵詞,而其他並沒有太多瞭解。 來到TX實習,確實如別人所說的那樣

遠端過程呼叫 RPC 及其協議

遠端過程呼叫 簡介 RPC是遠端過程呼叫(Remote Procedure Call)的縮寫形式。SAP系統RPC呼叫的原理其實很簡單,有一些類似於三層構架的C/S系統,第三方的客戶程式通過介面呼叫SAP內部的標準或自定義函式,獲得函式返回的資料進行處理後顯示或列印。

C++ RPC遠端過程呼叫

目的 最近由於摩爾定律已經不太適用,隨著大資料、計算量不斷增加,導致單機處理能力不能滿足需求,所以需要分散式計算,這就需要RPC(遠端過程呼叫),下面簡單介紹一下這個demo,來自於GitHub上的一個專案 client程式碼 #include <stri