1. 程式人生 > >RabbitMQ實現RPC(java)

RabbitMQ實現RPC(java)

如果我們需要在遠端計算機上執行一個函式並等待結果,這種模式通常被稱為遠端過程呼叫或RPC。

在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個RPC伺服器。我們將建立一個返回斐波那契數字的模擬RPC服務。

整個過程示意圖如下:
這裡寫圖片描述
客戶端將請求傳送至rpc_queue(我們定義的訊息佇列),然後等待響應;服務端獲取請求,並處理請求,然後將請求結果返回給佇列,客戶端得知請求被響應後獲取結果。

在結果被響應之前,客戶端是被阻塞的,主執行緒會等待RPC響應

如果每個RPC請求都建立一個回撥佇列。這是非常低效,我們建立一個單一的客戶端回撥佇列。

這引發了一個新的問題,在該佇列中收到回覆時,不清楚回覆屬於哪個請求。這就需要用到 correlationId屬性。我們為沒有請求設定唯一的correlationId值。然後,當我們在回撥佇列中收到一條訊息時,我們將獲取這個值,將響應與請求的進行correlationId匹配。如果我們一致就是我們需要的結果,否則就不是。

客戶端代RPCClient 碼如下:

package com.adtec.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import
java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private
String replyQueueName; public RPCClient() throws IOException, TimeoutException { //建立一個連線和一個通道,併為回撥宣告一個唯一的'回撥'佇列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); //定義一個臨時變數的接受佇列名 replyQueueName = channel.queueDeclare().getQueue(); } //傳送RPC請求 public String call(String message) throws IOException, InterruptedException { //生成一個唯一的字串作為回撥佇列的編號 String corrId = UUID.randomUUID().toString(); //傳送請求訊息,訊息使用了兩個屬性:replyto和correlationId //服務端根據replyto返回結果,客戶端根據correlationId判斷響應是不是給自己的 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName) .build(); //釋出一個訊息,requestQueueName路由規則 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); //由於我們的消費者交易處理是在單獨的執行緒中進行的,因此我們需要在響應到達之前暫停主執行緒。 //這裡我們建立的 容量為1的阻塞佇列ArrayBlockingQueue,因為我們只需要等待一個響應。 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); // String basicConsume(String queue, boolean autoAck, Consumer callback) channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //檢查它的correlationId是否是我們所要找的那個 if (properties.getCorrelationId().equals(corrId)) { //如果是,則響應BlockingQueue response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (IOException _ignore) { } } } } }


服務端代RPCServer 碼如下:

package rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    //具體處理方法
    private static int fib(int n) {
        if (n == 0)
            return 0;
        if (n == 1)
            return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
         //建立連線、通道,並宣告佇列 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();

                    String response = "";

                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);

                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        // 返回處理結果佇列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        //  確認訊息,已經收到後面引數 multiple:是否批量.true:將一次性確認所有小於envelope.getDeliveryTag()的訊息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            //取消自動確認
            boolean autoAck = false ;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {
                }
        }
    }
}

測試時先執行服務端,再執行客戶端

為了方便觀察結果,最好將客戶端和服務端在不同workspace實現


客戶端結果

相關推薦

RabbitMQ實現RPC(java)

如果我們需要在遠端計算機上執行一個函式並等待結果,這種模式通常被稱為遠端過程呼叫或RPC。 在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個RPC伺服器。我們將建立一個返回斐波那契數字的模擬RPC服務。 整個過程示意圖如下:

Spring-rabbitmq 實現RPC 風格呼叫例項

1.  背景 專案中原來利用rabbitmq的RPC實現遠端方法呼叫,比較簡陋,封裝的比較差,而且topic等模式均為阻塞,如今我負責進行改造,今天重點看看如何利用spring-rabbitmq實現RPC風格的呼叫 簡單說來,RPC,主要目的是利用message實現遠端方

rabbitmq實現RPC例項

最近閱讀《RabbitMq實戰指南》瞭解了rpc(remote procedure call 遠端過程呼叫)的實現。下面是測試的例子: 服務端 /** * <p> * * rpc伺服器,1、開啟佇列,2、消費訊息,3、把response傳送到回撥佇列。

(八) RabbitMQ實戰教程(面向Java開發人員)之RabbitMQ實現非同步RPC

RabbitMQ實現非同步RPC RabbitMQ Java Client 服務端步驟: 1.服務端監聽一個佇列,監聽客戶端傳送過來的訊息 2.收到訊息之後呼叫RPC服務得到呼叫結果 3.從訊息屬性中獲取reply_to,correlation_i

RabbitMq初探——用隊列實現RPC

await 生產 通過 empty 分享 qos load lose ima rabbitmq構造rpc 前言 rpc——remote procedure call 遠程調用。在我接觸的使用過http協議、thrift框架來實現遠程調用。其實消息隊列rabbitmq也

Python-RabbitMQ消息隊列實現rpc

llb author bject roc read uuid tin rip rabbit 客戶端通過發送命令來調用服務端的某些服務,服務端把結果再返回給客戶端 這樣使得RabbitMQ的消息發送端和接收端都能發送消息 返回結果的時候需要指定另一個隊列 服務器端 # -

RPC使用rabbitmq實現

bsp 本地服務 font clear 自動 配置 tran contain 學習 兩天時間重寫公司架構在本地實現測試學習 雙向連接客戶端和服務端配置: 連接rabbitmq服務器 定義消息隊列 配置發送請求的模板:交換機、消息隊列。 配置監聽處理:監聽的隊列、消息轉換處

Java實現RPC框架例項

一、RPC簡介 RPC,全稱為Remote Procedure Call,即遠端過程呼叫,它是一個計算機通訊協議。它允許像呼叫本地服務一樣呼叫遠端服務。它可以有不同的實現方式。如RMI(遠端方法呼叫)、Hessian、Http invoker等。另外,RPC是與語言無關的。 RPC示意圖

rabbitmq學習(四):利用rabbitmq實現遠端rpc呼叫

一、rabbitmq實現rpc呼叫的原理 ·rabbitmq實現rpc的原理是:客戶端向一個佇列中傳送訊息,並註冊一個回撥的佇列用於接收服務端返回的訊息,該訊息需要宣告一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到訊息(在需要時可以驗證correaltionId)後,

rabbitmq學習(四):利用rabbitmq實現遠程rpc調用

ext new urn trace cat ued 創建 exc false 一、rabbitmq實現rpc調用的原理 ·rabbitmq實現rpc的原理是:客戶端向一個隊列中發送消息,並註冊一個回調的隊列用於接收服務端返回的消息,該消息需要聲明一個叫做correaltio

Java實現RPC(服務物件使用註解並自動注入)

使用到的技術: 註解和反射機制 包掃描以及jar包掃描 CGlib動態代理 類似於spring框架的控制反轉依賴自動注入技術 目錄結構: RPCclass註解 @Retention(RetentionPolicy.RUNTIME) @Target(ElementTy

Java通過Hadoop實現RPC通訊簡單例項

一、定義server端程式碼 1.定義一個介面,該介面繼承org.apache.hadoop.ipc.VersionedProtocol介面 import org.apache.hadoop.ipc.VersionedProtocol; /** * 1.伺服器定義介面

RabbitMQ .NET訊息佇列使用入門(三)【MVC實現RPC例子】

每一個孤獨的靈魂都需要陪伴 RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之

Java基於String Boot、Thrift、Zookeeper實現RPC

Thrift的優勢是支援異構系統,相對於http協議效率較高,之前專案一直用的是dubbo,最近想了解一下thrift相關內容,以下是嘗試工程中的一些內容,做一些記錄 說明 1.程式碼基於JDK8、spring/boot實現 2.註冊發現採用zooke

java序列化和阻塞IO模型實現RPC

RPC是遠端過程呼叫,對於java而言,就是兩個JVM通訊,一個JVM a想要呼叫另一個JVM b中的類。b把執行結果在傳送給a的過程。好,我們就是要來實現這個過程。 兩個介面: public interface IDiff { double diff(double a,d

java+rabbitMQ實現一對一聊天

原始碼地址: https://download.csdn.net/download/weixin_40461281/10321780 上一篇文章講了RabbitMQ的安裝 接下來介紹一下具體的應用 使用java + rabbitMQ實現聊天功能的demo , 非常有助於

RabbitMQRPC實現

什麼是RPC? RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的函式/方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語義和傳達呼叫的資料。 為什麼RPC呢?就是無法在一個程序內,甚

RabbitMQRPC實現及其通信機制

pub elf tcl consumer 兩個 rabbit client margin result RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回復響應消息,為了接受響應response,客戶端需要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶

簡單的RPC java實現

 我也承認,RPC的名聲大噪之時是在2003年,那一個“衝擊波”病毒(Blaster Worm virus)襲捲全球的一年。而“衝擊波”正是用著RPC這把刀來敲開了遠端電腦的大門。當然RPC 有更多正面的應用,比如NFS、Web Service等等。 一、RPC的介紹

RabbitMQ系列-實現RPC非同步呼叫

使用Spring AMQP實現RPC非同步呼叫 示列 伺服器端 應用啟動類程式碼, import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org