1. 程式人生 > >RabbitMQ學習(六).NET Client之RPC

RabbitMQ學習(六).NET Client之RPC

Remote procedure call (RPC)

(using the .NET client)

在第二個教程second tutorial 中我們已經瞭解到了工作佇列如何將耗時任務分配給多個workers。

但是假如我們需要在遠端機器上面執行一個函式並且等待結果返回呢?這通常叫做RPC,即遠端過程呼叫。

這裡我們將用RabbitMQ構造一個RPC系統,客戶端請求呼叫服務端的計算斐波納契數列值得一個函式,並等待計算結果。

Client interface(客戶端呼叫介面)

首先看一下客戶端介面,我們定義一個RPC呼叫類,其中提供了一個叫做Call的介面,這個介面內部所做的事情就是將呼叫服務端計算斐波那契數列的請求(包含引數)傳送到指定的訊息佇列,然後再另一個臨時佇列阻塞等待服務端將計算結果放入到這個臨時佇列。

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close
();

Callback queue(回撥佇列)

下面程式碼是Call介面內部實現的一部分,為了等待服務端RPC呼叫的結果,我們需要告訴服務端將計算結果放到哪個佇列中,這裡props引數就已經制定了計算結果的存放佇列名稱,同時還附上了每個RPC請求的ID,方便讀取response的時候能夠知道對應於哪個請求:

var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId
; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", "rpc_queue", props, messageBytes); // ... then code to read a response message from the callback_queue ...

Correlation Id(RPC請求ID)

很顯然我們不可能為每個RPC呼叫都建立一個存放呼叫結果的回撥佇列,我們可以為每個client端都建立一個。

至於每個RPC請求發出去之後,收到迴應時如何知道這個response是對應於哪個RPC請求,就需要用到 correlationId 屬性. 

這個ID值可以有多重方法生成,比如客戶端IP+計數值,或者一個唯一的GUID等都可以。

Summary(總結)

RPC呼叫工作流程:

  • 客戶端啟動時候建立一個匿名獨佔的回撥佇列。
  • 進行RPC呼叫時,傳送帶有兩個屬性的訊息(RPC請求)到指定佇列,這兩個屬性是指定回撥佇列名稱的 replyTo 屬性,以及唯一標識一個RPC請求的 correlationId 屬性。
  • 將RPC請求傳送到約定好的 rpc_queue 佇列。
  • 服務端的RPC worker收到RPC請求之後開始呼叫函式,執行完成之後將執行結果放到 replyTo 指定的回撥佇列中。
  • 客戶端在回撥佇列上等待呼叫結果,當收到訊息之後檢視 correlationId 屬性時候是剛才的RPC請求的response,如果是就返回。

Putting it all together(程式碼總覽)

斐波那契數列計算函式:

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}
class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("rpc_queue", false, false, false, null);
                channel.BasicQos(0, 1, false);
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("rpc_queue", false, consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");

                while (true)
                {
                    string response = null;
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes);
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }

    private static int fib(int n)
    {
        if (n == 0 || n == 1) return n;
        return fib(n - 1) + fib(n - 2);
    }
}

The server code is rather straightforward:

  • 和往常一樣建立連線,channel,以及申明佇列。
  • 你可能想要執行多個server程序,為了讓請求均勻地負載到多個servers上面,我們需要呼叫 channel.basicQos. 告訴RabbitMQ不要將超過一個的訊息同時分配給同一個worker,詳見:http://blog.csdn.net/jiyiqinlovexx/article/details/38946955
  • 呼叫basicConsume 訪問佇列. 
  • 然後進入迴圈,等待訊息,執行函式呼叫,返回response。.
class RPCClient
{
    private IConnection connection;
    private IModel channel;
    private string replyQueueName;
    private QueueingBasicConsumer consumer;

    public RPCClient()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare();
        consumer = new QueueingBasicConsumer(channel);
        channel.BasicConsume(replyQueueName, true, consumer);
    }

    public string Call(string message)
    {
        var corrId = Guid.NewGuid().ToString();
        var props = channel.CreateBasicProperties();
        props.ReplyTo = replyQueueName;
        props.CorrelationId = corrId;

        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish("", "rpc_queue", props, messageBytes);

        while (true)
        {
            var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
            if (ea.BasicProperties.CorrelationId == corrId)
            {
                return Encoding.UTF8.GetString(ea.Body);
            }
        }
    }

    public void Close()
    {
        connection.Close();
    }
}

class RPC
{
    public static void Main()
    {
        var rpcClient = new RPCClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");
        Console.WriteLine(" [.] Got '{0}'", response);

        rpcClient.Close();
    }
}

The client code is slightly more involved:

  • 建立connection和channel,申明一個獨佔的臨時回撥佇列。
  • 訂閱回撥佇列,以便能夠接收RPC請求的response。
  • 實現Call方法,用於真正傳送RPC請求
  • 首先生成唯一的 correlationId 值儲存下來,while迴圈中將會用這個值來匹配response。
  • 建立RPC請求訊息,包含兩個屬性: replyTo and correlationId.
  • 發出RPC請求,等待迴應.
  • while迴圈對於每一個response都會檢查 thecorrelationId 是否匹配,如果匹配就儲存下來這個response.
  • 最後返回response給使用者.

測試程式,開始進行RPC呼叫:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

Now is a good time to take a look at our full example source code (which includes basic exception handling) for RPCClient.cs and RPCServer.cs.

Compile as usual (see tutorial one):

$ csc /r:"RabbitMQ.Client.dll" RPCClient.cs
$ csc /r:"RabbitMQ.Client.dll" RPCServer.cs

Our RPC service is now ready. We can start the server:

$ RPCServer.exe
 [x] Awaiting RPC requests

To request a fibonacci number run the client:

$ RPCClient.exe
 [x] Requesting fib(30)

The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in a new console.
  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queueDeclare are required. As a result the RPC client needs only one network round trip for a single RPC request.

Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:

  • How should the client react if there are no servers running?
  • Should a client have some kind of timeout for the RPC?
  • If the server malfunctions and raises an exception, should it be forwarded to the client?
  • Protecting against invalid incoming messages (eg checking bounds, type) before processing.

If you want to experiment, you may find the rabbitmq-management plugin useful for viewing the queues.

相關推薦

RabbitMQ學習.NET ClientRPC

6 RPC Remote procedure call implementation Python | Java | Ruby | PHP| C# Remote procedure call (RPC) (using the .NET client) 在第二

RabbitMQ學習:遠程結果調用

cells actor ble 隨機 get getenv all 求和 int 場景:我們需要在傳輸消息時得到結果 客服端在發送請求時會發送回調隊列,服務端處理事情完成後會將結果返回到回調隊列中,在增加關聯標誌關聯每個請求和服務返回 客戶端代碼: public

MyBatis的學習——關聯對映一對一關聯

一對一關聯中主要需要在介面實現配置檔案中使用到標籤元素association 需求: 根據班級id查詢班級資訊(帶老師的資訊) 建立一張教師表和班級表,這裡我們假設一個老師只負責教一個班,那麼老師和班級之間的關係就是一種一對一的關係 建立teacher實體類:

訊息中介軟體--RabbitMQ學習

Fanout Exchange學習 Fanout Exchange介紹 不處理路由鍵,只需要簡單的將佇列繫結到交換機上 傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上 Fanout交換機轉發訊息是最快的 只要交換機跟佇列有繫結,就能夠傳送訊息過去。

Hive學習Hive SQL數據類型和存儲格式

OS big api 而且 好的 存儲 array 文本文件 字符串 一、數據類型 1、基本數據類型 Hive 支持關系型數據中大多數基本數據類型 類型描述示例 boolean true/false TRUE tinyint 1字

【JMeter4.0學習邏輯控制器說明

style var pan cond png 是否 AD lse sample 簡述一些遇到問題的。 一、如果控制器 Interpret Condition as Variable Expression?:選中這一項時表示:判斷變量值是否等於字符串true

C++學習 輸出

分享 2.3 span pri hello 2.4 小數位 bubuko 整型 輸出學習時的筆記(其實也沒什麽用,留著給自己看的) printf 用於輸出內容 控制臺黑窗口printf("要輸出的內容"); //可以是任意內容-->如果要輸出變量 1、格式占

QT學習QT貪吃蛇

1、開始介面 對話方塊設定:設定對話方塊控制元件以及標題   1.        GameStart::GameStart(QDialog*parent) 2.&nbs

spring深入學習 IOC 解析 bean 標籤:開啟解析程序

import 標籤解析完畢了,再看 Spring 中最複雜也是最重要的標籤 bean 標籤的解析過程。 在方法 parseDefaultElement() 中,如果遇到標籤 為 bean 則呼叫 processBeanDefinition() 方法進行 b

rabbitmq學習rabbitmq扇形交換機、主題交換機

 前言 上篇我們學習了rabbitmq的作用以及直連交換機的程式碼實現,這篇我們繼續看如何用程式碼實現扇形交換機和主題交換機 一、扇形交換機   1.生產者    /** * 生產者 */ public class LogProducer { //交換機名稱 pu

rabbitmq學習rabbitmq消息隊列的作用以及rabbitmq直連交換機

tde pub 假設 代碼 持久化 tor 安裝 live 服務 前言   上篇介紹了AMQP的基本概念,組成及其與rabbitmq的關系。了解了這些東西後,下面我們開始學習rabbitmq(消息隊列)的作用以及用java代碼和rabbitmq通訊進行消息發布和接收。因為消

演算法工程師修仙路:吳恩達機器學習

吳恩達機器學習筆記及作業程式碼實現中文版 第五章 正則化 過擬合問題 線性迴歸和邏輯迴歸能夠有效地解決許多問題,但是當將它們應用到某些特定的機器學習應用時,會遇到過擬合(over-fitting)的問題,可能會導致它們效果很差。 正則化(regulari

床頭筆記Android開發學習

初識Acitivity 目錄: 認識acitivity 建立一個acitivity專案 新增控制元件,實現想要的功能及介面 認識acitivity: Activity 是一個應用元件。 每個 Activity 都會獲得一個用於繪製其使用者介面的視窗。 使用者

nginx學習——nginx的配置系統3upstream_module

upstream upstream可以用來定義一組伺服器,這些伺服器可以通過proxy_pass, fastcgi_pass, uwsgi_pass, scgi_pass, and memcached_pass這些指令指定的域名關聯起來,比如下面這組配置,backend就

spring深入學習 IOC 解析 bean 標籤:開啟解析程序

import 標籤解析完畢了,再看 Spring 中最複雜也是最重要的標籤 bean 標籤的解析過程。 在方法 parseDefaultElement() 中,如果遇到標籤 為 bean 則呼叫 processBeanDefinition() 方法進行 bean 標籤解析,

PowerMock學習Mock Final的使用

Mock Final mockfinal相對來說就比較簡單了,使用powermock來測試使用final修飾的method或class,比較簡單,介面呼叫部分,還是service呼叫dao。 對於介面及場景這裡就不細說了,特別簡單。 service層 具體程式碼示例如下: package com

SpringBoot學習—— springboot快速整合RabbitMQ

目錄 Rabbit MQ訊息佇列 簡介 Rabbit MQ工作模式 交換機模式 引入RabbitMQ佇列 程式碼實戰 Rabbi

java學習面向對象 final關鍵字

hello int java學習 xtend 最終 .sh 方法 div ext 1.被fnial修飾的方法不能被重寫,常見的為修飾類,方法,變量 /* final可以修飾類,方法,變量 特點: final可以修飾類,該類不能被繼

線程學習--單例和多線程、ThreadLocal

pen single cal final ride args ash public 線程 一、ThreadLocal 使用wait/notify方式實現的線程安全,性能將受到很大影響。解決方案是用空間換時間,不用鎖也能實現線程安全。 來看一個小例子,在線程內的set、get

Linux 網卡驅動學習應用層、tcp 層、ip 層、設備層和驅動層作用解析

local acc 每次 letter auto sizeof style article inode 本文將介紹網絡連接建立的過程、收發包流程,以及當中應用層、tcp層、ip層、設備層和驅動層各層發揮的作用。 1、應用層 對於使用socket進行網絡連接的serv