1. 程式人生 > >RabbitMQ入門:遠端過程呼叫(RPC)

RabbitMQ入門:遠端過程呼叫(RPC)

假如我們想要呼叫遠端的一個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(Remote Procedure Call)。怎麼辦?

今天我們就用RabbitMQ來實現一個簡單的RPC系統:客戶端傳送一個請求訊息,服務端以一個響應訊息迴應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送一個回撥佇列用來告訴服務端響應訊息傳送到哪個佇列裡面。也就是說每個訊息一個回撥佇列,在此基礎上我們變下,將回調佇列定義成類的屬性,這個每個客戶端一個佇列,同一個客戶端的請求共用一個佇列。那麼接下來有個問題,怎麼知道這個佇列裡面的響應訊息是屬於哪個佇列的呢?

我們會用到關聯標識(correlationId),每個請求我們都會生成一個唯一的值作為correlationId,這樣每次有響應訊息來的時候,我們就去看correlationId來確定到底是哪個請求的響應訊息,將請求和響應關聯起來。如果收到一個不知道的correlationId,就可以確定不是這個客戶端的請求的響應,可以直接丟棄掉。

一、工作模型

  1. 客戶端傳送啟動後,會建立獨特的回撥佇列。對於一個請求傳送配置了兩個屬性的訊息:一個是回撥佇列(圖中的replay_to),一個是correlation。 這個請求會發送到rpc_queue佇列,然後到達服務端處理。
  2. 服務端等待rpc_queue佇列的請求。當有請求到來時,它就會開始幹活並將結果通過傳送訊息來返回,該返回訊息傳送到replyTo指定的佇列。
  3. 客戶端將等待回撥佇列返回資料。當返回的訊息到達時,它將檢查correlation id屬性。如果該屬性值和請求匹配,就將響應返回給程式。

二、程式碼實現

接下來看程式碼實現:

  1.  客戶端
    public
    class RpcClient { Connection connection = null; Channel channel = null; //回撥佇列:用來接收服務端的響應訊息 String queueName = ""; // 定義RpcClient public RpcClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(
    "localhost"); connection = factory.newConnection(); channel = connection.createChannel(); queueName = channel.queueDeclare().getQueue(); } // 真正的處理邏輯 public String call(String msg) throws IOException, InterruptedException { final String uuid = UUID.randomUUID().toString(); //後續,服務端根據"replyTo"來指定將返回資訊寫入到哪個佇列 //後續,服務端根據關聯標識"correlationId"來指定返回的響應是哪個請求的 AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().replyTo(queueName).correlationId(uuid).build(); channel.basicPublish("", RpcServer.QUEUE_NAME, prop, msg.getBytes()); final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(uuid)) { String msg = new String(body, "UTF-8"); blockQueue.offer(msg); System.out.println("**** rpc client reciver response :[" + msg + "]"); } } }); return blockQueue.take(); } //關閉連線 public void close() throws IOException { connection.close(); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { RpcClient client = new RpcClient(); client.call("4"); client.close(); } }

    傳送請求的時候,它是生產者;接受響應的時候,它是消費者。

  2. 服務端
    public class RpcServer {
    
        //RPC佇列名
        public static final String QUEUE_NAME = "rpc_queue";
    
        //斐波那契數列,用來模擬工作任務
        public static int fib(int num) {
            if (num == 0)
                return 0;
            if (num == 1)
                return 1;
            return fib(num - 1) + fib(num - 2);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            try {
                // 1.connection & channel
                connection = factory.newConnection();
                final Channel channel = connection.createChannel();
    
                // 2.declare queue
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
                System.out.println("****** rpc server waiting for client request ......");
    
                // 3.每次只接收一個訊息(任務)
                channel.basicQos(1);
                //4.獲取消費例項
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        BasicProperties prop = new BasicProperties().builder().correlationId(properties.getCorrelationId())
                                .build();
                        String resp = "";
                        try {
                            String msg = new String(body, "UTF-8");
                            resp = fib(Integer.valueOf(msg)) + "";
                            System.out.println("*** will response to rpc client :" + resp);
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        } finally {
                            channel.basicPublish("", properties.getReplyTo(), prop, resp.getBytes());
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
    
                    }
                };
                // 5.消費訊息(處理任務)
                channel.basicConsume(QUEUE_NAME, false, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

    接受請求的時候,它是消費者;傳送響應的時候,它是生產者。

  3. 執行服務端,開始等待請求
  4. 然後執行客戶端,控制檯log:
    服務端(多了一條列印):
    ****** rpc server waiting for client request ......
    *** will response to rpc client :3
    
    客戶端:
    **** rpc client reciver response :[3]

三、小插曲

剛開始我在寫demo的時候,client中沒有用到阻塞佇列final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1);,而是直接這樣寫:

@Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(uuid)) {
                    String msg = new String(body, "UTF-8");

                    //blockQueue.offer(msg);
                    System.out.println("**** rpc client reciver response :[" + msg + "]");
                }
            }

期望能打印出結果來,但是執行後發現並沒有列印:**** rpc client reciver response :[" + msg + "]的值。

原因是handleDelivery()這個方法是在子執行緒中執行的,這個子執行緒執行的時候,主執行緒會繼續往後執行直到執行了client.close();方法而結束了。

由於主執行緒終止了,導致沒有打印出結果。加了阻塞佇列之後將主執行緒阻塞不執行close()方法,問題就解決了。

相關推薦

RabbitMQ入門遠端過程呼叫(RPC)

假如我們想要呼叫遠端的一個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(Remote Procedure Call)。怎麼辦? 今天我們就用RabbitMQ來實現一個簡單的RPC系統:客戶端傳送一個請求訊息,服務端以一個響應訊息迴應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送一個回撥佇列用來

[譯]RabbitMQ教程C#版 - 遠端過程呼叫(RPC)

原文: [譯]RabbitMQ教程C#版 - 遠端過程呼叫(RPC) 先決條件 本教程假定 RabbitMQ 已經安裝,並執行在localhost標準埠(5672)。如果你使用不同的主機、埠或證書,則需要調整連線設定。 從哪裡獲得幫助 如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯絡我們。

輕鬆搞定RabbitMQ(七)——遠端過程呼叫RPC

翻譯:http://www.rabbitmq.com/tutorials/tutorial-six-java.html在第二篇博文中,我們已經瞭解到瞭如何使用工作佇列來向多個消費者分散耗時任務。但是付過我們需要在遠端電腦上執行一個方法然後等待結果,該怎麼辦?這是不同的需求。

.Net下RabbitMQ的使用(8) -- 遠端過程呼叫RPC

RPC是在計算中是一種常見的模式,是通常我要用訊息佇列來實現RPC有3個關鍵點: 1. 服務的定址 2. 訊息的接收 3. 訊息的關聯 在RabbitMQ的.net客戶端裡,提供了2個類:SimpleRpcClient 和 SimpleRpcServer

奇怪的資料插入異常傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。

前天完成了手頭的工作後,經理交給我一個bug讓我看下,我接過後看了下Bug資訊,是從未見過的異常,但根據異常資訊提示又很容易判斷出異常原因。 異常資訊:傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。引數 7 ("@ExchangeRate"): 提供的值不

.Net下RabbitMQ的使用(7) -- 遠端過程呼叫RPC

RPC是在計算中是一種常見的模式,是通常我要用訊息佇列來實現RPC有3個關鍵點: 1. 服務的定址 2. 訊息的接收 3. 訊息的關聯 在RabbitMQ的.net客戶端裡,提供了2個類:SimpleRpcClient 和 SimpleRpcServer 來讓我們

遠端過程呼叫 RPC 及其協議

遠端過程呼叫 簡介 RPC是遠端過程呼叫(Remote Procedure Call)的縮寫形式。SAP系統RPC呼叫的原理其實很簡單,有一些類似於三層構架的C/S系統,第三方的客戶程式通過介面呼叫SAP內部的標準或自定義函式,獲得函式返回的資料進行處理後顯示或列印。

PHP實現遠端過程呼叫RPC

一、初識RPC RPC(Remote Procedure Call)—遠端過程呼叫,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。 二、工作原理 執行時,一次客戶機對伺服器的RPC呼叫,其內部操作大致有如下十步: 1.呼叫客戶端控制代碼;執行傳送引數

com.microsoft.sqlserver.jdbc.SQLServerException: 傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。此 RPC 請求中提供了過多的引數。

sqlserver在做批量插入的時候出現這個錯誤: com.microsoft.sqlserver.jdbc.SQLServerException: 傳入的表格格式資料流(TDS)遠端過程呼叫(RPC)協議流不正確。此 RPC 請求中提供了過多的引數。最多應為 2100。

遠端過程呼叫(RPC)詳解

本文介紹了什麼是遠端過程呼叫(RPC),RPC 有哪些常用的方法,RPC 經歷了哪些發展階段,以及比較了各種 RPC 技術的優劣。 什麼是 RPC RPC 是遠端過程呼叫(Remote Procedure Call)的縮寫形式,Birrell 和 N

第2部分 啟用遠端過程呼叫RPC

1 第2部分 啟用遠端過程呼叫——讓我們構建一個烤麵包機 烤麵包機樣例的第二部分將會加入一些烤麵包行為,為了完成這個任務,我們將會在toaster yang 資料模型中定義一個RPC(遠端過程呼叫)並且會寫一個實現。 1.1 定義yang RPC 編輯現有的toaster.

RabbitMQ的學習(四)RPC-遠端過程呼叫(純demo-可直接使用)

前言:在前面三篇文章中,分別對rabbitmq的搭建,並對rabbitmq常用的四種不同的交換機結合路由鍵編寫了各自的demo,可以參考: 1. RabbitMQ的學習(一):Windows下安裝及配置RabbitMQ,erlang環境變數; 2. RabbitMQ的學習(二):簡單的j

Java分散式RPC遠端過程呼叫

Java分散式:RPC(遠端過程呼叫) 引入RPC   比如我們有一個查詢的介面IDBQuery,以及其實現類DBQueryImp,如果我們執行IDBQuery查詢方法,只需要new一個DBQueryImp然後呼叫request方法即可,這就是本地函式呼叫,因為在同一個地址空間或者同一塊記憶體,通過方法棧

RabbitMQ: RPC 遠端過程呼叫

RabbitMQ RPC 就是通過訊息佇列(Message Queue)來實現rpc的功能,就是,客戶端向服務端傳送定義好的Queue訊息,其中攜帶的訊息就應該是服務端將要呼叫的方法的引數 ,並使用Propertis告訴服務端將結果返回到指定的Queue。 1.Rabbit

RPC 協議 Remote process call 遠端過程呼叫

RPC資訊協議由兩個不同結構組成:呼叫資訊和答覆資訊。 簡單的說,RPC就是從一臺機器(客戶端)上通過引數傳遞的方式呼叫另  一臺機器(伺服器)上的一個函式或方法(可以統稱為服務)並得到返回的結果。 RPC 會隱藏底層的通訊細節(不需要直接處理Socket通訊或Http通訊

RPC遠端過程呼叫)簡介

RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。 之前聽過這個名詞,但是也只是大概記住了“遠端呼叫”之類的關鍵詞,而其他並沒有太多瞭解。 來到TX實習,確實如別人所說的那樣

C++ RPC遠端過程呼叫

目的 最近由於摩爾定律已經不太適用,隨著大資料、計算量不斷增加,導致單機處理能力不能滿足需求,所以需要分散式計算,這就需要RPC(遠端過程呼叫),下面簡單介紹一下這個demo,來自於GitHub上的一個專案 client程式碼 #include <stri

【圖文詳細 】Scala——RPC 遠端過程呼叫

1、RPC 遠端過程呼叫    1.1、RPC 概念  RPC(Remote Procedure Call)—遠端過程呼叫,它是一種通過網路從遠端計算機程式上請 求服務,而不需要了解底層網路技術的協議。RPC 協議假定某些傳輸協議的存在,如 TCP 或

用C程式碼簡要模擬實現一下RPC(遠端過程呼叫)並談談它在程式碼調測中的重要應用

        說明: 本文僅僅是一種模擬的RPC實現, 真正的RPC實現還是稍微有點複雜的。         我們來看看下面這個常見的場景: 在某系統中,我們要對某一函式進行調測, 但是, 很難很難構造出這個函式被呼叫的實際場景, 怎麼辦?         雖然很難構造