1. 程式人生 > >RPC框架原理探討

RPC框架原理探討

摘要:

  本文闡述了RPC框架與遠端呼叫的產生背景,介紹了RPC的基本概念和使用背景,之後手動實現了簡易的RPC框架並佐以例項進行演示,以便讓各位看官對RPC有一個感性、清晰和完整的認識,最後討論了RPC框架幾個較為重要問題。總之,RPC框架的精髓在於動態代理和反射,通過它們使得遠端呼叫“本地化”,對使用者透明且友好。


一. 引子

  上學時我們寫得應用大都比較簡單,基本上都屬於單體應用,服務呼叫也侷限於本地,如下所示:

// 服務介面
public interface HelloService {

    String hello(String name);

    String hi(String msg);
}

// 服務本地實現
public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {
        return "Hello " + name;
    }

    @Override
    public String hi(String msg) {
        return "Hi, " + msg;
    }
}

// 服務本地呼叫
public class Main {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        helloServiceProxy.hello("Panda");
        helloServiceProxy.hi("Panda");
    }/** Output
           hello : Hello rico
           hi : Hi, panda
    **/  
}
  • 我們寫這樣的單體應用來學習、做實驗正常且合理,但是在生產環境中,單體應用在各方面的效能上和可維護性方面就遠遠不能滿足需求了。應用內各項業務互相糾纏、耦合性太大,不利於後期的維護和升級,主要表現在以下兩點上:
  • 可用性低。所有雞蛋都放在同一個籃子裡,一旦有問題導致單體應用掛掉,所有業務都不能訪問,穩定性要求難以滿足;

  • 不利於各業務團隊進行合作,開發效率低。單體應用各業務耦合度太高,不同業務團隊開發進度和實現細節不盡相同,難以高效協作。


  將不同的業務拆分到多個應用中,讓不同的應用分別承擔不同的功能是解決這些問題的必殺技。將不同業務分拆到不同的應用後,不但可以大幅度提升系統的穩定性還有助於豐富技術選型,進一步保證系統的效能。總的來說,從單體應用到分散式多體應用是系統升級必經之路。

  當一個單體應用演化成多體應用後,遠端呼叫就粉墨登場了。在一個應用時,相互通訊直接通過本地呼叫就可完成,而變為多體應用時,相互通訊就得依賴遠端呼叫了,這時一個高效穩定的RPC框架就顯得非常必要了。可能有的同學會覺得,沒必要非得用RPC框架啊,簡單的HTTP呼叫不是也可以實現遠端通訊嗎?確實,簡單的HTTP呼叫確實也可以實現遠端通訊,但是它不是那麼的合適,原因有二:

  • RPC遠端呼叫像本地呼叫一樣乾淨簡潔,但其他方式對程式碼的侵入性就比較強;

  • 一般使用RPC框架實現遠端通訊效率比其他方式效率要高一些。


  當我們踏入公司尤其是大型網際網路公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各服務部署在不同的機器上,由不同的團隊負責。這時就會有兩個很關鍵的問題:

  • 要搭建一個新服務,免不了需要依賴已有的服務,而現在已有的服務都在遠端,怎麼呼叫?

  • 其它團隊想使用我們的新服務,我們的服務該怎麼釋出以便他人呼叫?


  下文將對RPC框架的基本原理進行介紹,並對這兩個問題展開探討,同時參考前輩的博文《RPC框架幾行程式碼就夠了》手寫一個簡易RPC框架以加深對PRC原理的理解。


二. RPC 框架介紹

  對於多體應用,由於各服務部署在不同機器,服務間的呼叫免不了網路通訊過程,服務消費方每呼叫一個服務都要寫一坨網路通訊相關的程式碼,不僅複雜而且極易出錯。如果有一種方式能讓我們像呼叫本地服務一樣呼叫遠端服務,而讓呼叫者對網路通訊這些細節透明,那麼將大大解放程式設計師的雙手,大幅度提高生產力。比如,服務消費方在執行helloService.hi(“Panda”)時,實質上呼叫的是遠端的服務。這種方式其實就是RPC(Remote Procedure Call Protocol),在各大網際網路公司中被廣泛使用,如阿里巴巴的HSF、Dubbo(開源)、Facebook的Thrift(開源)、Google GRPC(開源)、Twitter的Finagle(開源)等。

  RPC的主要功能目標是讓構建分散式計算(應用)更容易,在提供強大的遠端呼叫能力時不損失本地呼叫的語義簡潔性。為實現該目標,RPC框架需提供一種透明呼叫機制讓使用者不必顯式的區分本地呼叫和遠端呼叫。要讓網路通訊細節對使用者透明,我們需要對通訊細節進行封裝,下面是一個RPC的經典呼叫的流程,並且反映了所涉及到的一些通訊細節:

 

   這裡寫圖片描述

 (1). 服務消費方(client)以本地呼叫方式呼叫服務; 
 (2). client stub接收到呼叫後負責將方法、引數等組裝成能夠進行網路傳輸的訊息體; 
 (3). client stub找到服務地址,並將訊息傳送到服務端; 
 (4). server stub收到訊息後進行解碼; 
 (5). server stub根據解碼結果 反射呼叫 本地的服務; 
 (6). 本地服務執行並將結果返回給server stub; 
 (7). server stub將返回結果打包成訊息併發送至消費方; 
 (8). client stub接收到訊息,並進行解碼; 
 (9). 服務消費方得到最終結果。

  RPC框架就是要將2~8這些步驟封裝起來,讓使用者對這些細節透明,使得遠端方法呼叫看起來像呼叫本地方法一樣。


三. RPC框架簡易實現及其例項分析

(1).服務端

  服務端提供客戶端所期待的服務,一般包括三個部分:服務介面,服務實現以及服務的註冊暴露三部分,如下:

  • 服務介面
public interface HelloService {

    String hello(String name);

    String hi(String msg);
}

  • 服務實現
  • public class HelloServiceImpl implements HelloService{
        @Override
        public String hello(String name) {
            return "Hello " + name;
        }
    
        @Override
        public String hi(String msg) {
            return "Hi, " + msg;
        }
    }

  • 服務暴露:只有把服務暴露出來,才能讓客戶端進行呼叫,這是RPC框架功能之一。
  • public class RpcProvider {
        public static void main(String[] args) throws Exception {
            HelloService service = new HelloServiceImpl();
            // RPC框架將服務暴露出來,供客戶端消費
            RpcFramework.export(service, 1234);
        }
    }

  • (2).客戶端

      客戶端消費服務端所提供的服務,一般包括兩個部分:服務介面和服務引用兩個部分,如下:

  • 服務介面:與服務端共享同一個服務介面
  • public interface HelloService {
    
        String hello(String name);
    
        String hi(String msg);
    }

  • 服務引用:消費端通過RPC框架進行遠端呼叫,這也是RPC框架功能之一
  • public class RpcConsumer {
        public static void main(String[] args) throws Exception {
            // 由RpcFramework生成的HelloService的代理
            HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
            String hello = service.hello("World");
            System.out.println("客戶端收到遠端呼叫的結果 : " + hello);
        }
    }
  • (3).RPC框架原型實現

      RPC框架主要包括兩大功能:一個用於服務端暴露服務,一個用於客戶端引用服務。

  • 服務端暴露服務
  •     /**
         * 暴露服務
         *
         * @param service 服務實現
         * @param port    服務埠
         * @throws Exception
         */
        public static void export(final Object service, int port) throws Exception {
            if (service == null) {
                throw new IllegalArgumentException("service instance == null");
            }
            if (port <= 0 || port > 65535) {
                throw new IllegalArgumentException("Invalid port " + port);
            }
            System.out.println("Export service " + service.getClass().getName() + " on port " + port);
            // 建立Socket服務端
            ServerSocket server = new ServerSocket(port);
            for (; ; ) {
                try {
                    // 監聽Socket請求
                    final Socket socket = server.accept();
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                try {
                                    /* 獲取請求流,Server解析並獲取請求*/
                                    // 構建物件輸入流,從源中讀取物件到程式中
                                    ObjectInputStream input = new ObjectInputStream(
                                        socket.getInputStream());
                                    try {
    
                                        System.out.println("\nServer解析請求 : ");
                                        String methodName = input.readUTF();
                                        System.out.println("methodName : " + methodName);
                                        // 泛型與陣列是不相容的,除了萬用字元作泛型引數以外
                                        Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                        System.out.println(
                                            "parameterTypes : " + Arrays.toString(parameterTypes));
                                        Object[] arguments = (Object[])input.readObject();
                                        System.out.println("arguments : " + Arrays.toString(arguments));
    
    
                                        /* Server 處理請求,進行響應*/
                                        ObjectOutputStream output = new ObjectOutputStream(
                                            socket.getOutputStream());
                                        try {
                                            // service型別為Object的(可以釋出任何服務),故只能通過反射呼叫處理請求
                                            // 反射呼叫,處理請求
                                            Method method = service.getClass().getMethod(methodName,
                                                parameterTypes);
                                            Object result = method.invoke(service, arguments);
                                            System.out.println("\nServer 處理並生成響應 :");
                                            System.out.println("result : " + result);
                                            output.writeObject(result);
                                        } catch (Throwable t) {
                                            output.writeObject(t);
                                        } finally {
                                            output.close();
                                        }
                                    } finally {
                                        input.close();
                                    }
                                } finally {
                                    socket.close();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

      從該RPC框架的簡易實現來看,RPC服務端邏輯是:首先建立ServerSocket負責監聽特定埠並接收客戶連線請求,然後使用Java原生的序列化/反序列化機制來解析得到請求,包括所呼叫方法的名稱、引數列表和實參,最後反射呼叫服務端對服務介面的具體實現並將得到的結果回傳至客戶端。至此,一次簡單PRC呼叫的服務端流程執行完畢。

  • 客戶端引用服務
  •     /**
         * 引用服務
         *
         * @param <T>            介面泛型
         * @param interfaceClass 介面型別
         * @param host           伺服器主機名
         * @param port           伺服器埠
         * @return 遠端服務,返回代理物件
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
            throws Exception {
            if (interfaceClass == null) {
                throw new IllegalArgumentException("Interface class == null");
            }
            // JDK 動態代理的約束,只能實現對介面的代理
            if (!interfaceClass.isInterface()) {
                throw new IllegalArgumentException(
                    "The " + interfaceClass.getName() + " must be interface class!");
            }
            if (host == null || host.length() == 0) {
                throw new IllegalArgumentException("Host == null!");
            }
            if (port <= 0 || port > 65535) {
                throw new IllegalArgumentException("Invalid port " + port);
            }
            System.out.println(
                "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
    
            // JDK 動態代理
            T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[] {interfaceClass}, new InvocationHandler() {
                    // invoke方法本意是對目標方法的增強,在這裡用於傳送RPC請求和接收響應
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] arguments)
                        throws Throwable {
                        // 建立Socket客戶端,並與服務端建立連結
                        Socket socket = new Socket(host, port);
                        try {
                            /* 客戶端像服務端進行請求,並將請求引數寫入流中*/
                            // 將物件寫入到物件輸出流,並將其傳送到Socket流中去
                            ObjectOutputStream output = new ObjectOutputStream(
                                socket.getOutputStream());
                            try {
                                // 傳送請求
                                System.out.println("\nClient傳送請求 : ");
                                output.writeUTF(method.getName());
                                System.out.println("methodName : " + method.getName());
                                output.writeObject(method.getParameterTypes());
                                System.out.println("parameterTypes : " + Arrays.toString(method
                                    .getParameterTypes()));
                                output.writeObject(arguments);
                                System.out.println("arguments : " + Arrays.toString(arguments));
    
    
                                /* 客戶端讀取並返回服務端的響應*/
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {
                                    Object result = input.readObject();
                                    if (result instanceof Throwable) {
                                        throw (Throwable)result;
                                    }
                                    System.out.println("\nClient收到響應 : ");
                                    System.out.println("result : " + result);
                                    return result;
                                } finally {
                                    input.close();
                                }
                            } finally {
                                output.close();
                            }
                        } finally {
                            socket.close();
                        }
                    }
                });
            return proxy;
        }
  • 從該RPC框架的簡易實現來看,RPC客戶端邏輯是:首先建立Socket客戶端並與服務端建立連結,然後使用Java原生的序列化/反序列化機制將呼叫請求傳送給客戶端,包括所呼叫方法的名稱、引數列表將服務端的響應返回給使用者即可。至此,一次簡單PRC呼叫的客戶端流程執行完畢。特別地,從程式碼實現來看,實現透明的PRC呼叫的關鍵就是 動態代理,這是RPC框架實現的靈魂所在。
  • RPC原型實現
  • public class RpcFramework {
        /**
         * 暴露服務
         *
         * @param service 服務實現
         * @param port    服務埠
         * @throws Exception
         */
        public static void export(final Object service, int port) throws Exception {
            if (service == null) {
                throw new IllegalArgumentException("service instance == null");
            }
            if (port <= 0 || port > 65535) {
                throw new IllegalArgumentException("Invalid port " + port);
            }
            System.out.println("Export service " + service.getClass().getName() + " on port " + port);
            // 建立Socket服務端
            ServerSocket server = new ServerSocket(port);
            for (; ; ) {
                try {
                    // 監聽Socket請求
                    final Socket socket = server.accept();
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                try {
                                    /* 獲取請求流,Server解析並獲取請求*/
                                    // 構建物件輸入流,從源中讀取物件到程式中
                                    ObjectInputStream input = new ObjectInputStream(
                                        socket.getInputStream());
                                    try {
    
                                        System.out.println("\nServer解析請求 : ");
                                        String methodName = input.readUTF();
                                        System.out.println("methodName : " + methodName);
                                        // 泛型與陣列是不相容的,除了萬用字元作泛型引數以外
                                        Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                        System.out.println(
                                            "parameterTypes : " + Arrays.toString(parameterTypes));
                                        Object[] arguments = (Object[])input.readObject();
                                        System.out.println("arguments : " + Arrays.toString(arguments));
    
    
                                        /* Server 處理請求,進行響應*/
                                        ObjectOutputStream output = new ObjectOutputStream(
                                            socket.getOutputStream());
                                        try {
                                            // service型別為Object的(可以釋出任何服務),故只能通過反射呼叫處理請求
                                            // 反射呼叫,處理請求
                                            Method method = service.getClass().getMethod(methodName,
                                                parameterTypes);
                                            Object result = method.invoke(service, arguments);
                                            System.out.println("\nServer 處理並生成響應 :");
                                            System.out.println("result : " + result);
                                            output.writeObject(result);
                                        } catch (Throwable t) {
                                            output.writeObject(t);
                                        } finally {
                                            output.close();
                                        }
                                    } finally {
                                        input.close();
                                    }
                                } finally {
                                    socket.close();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        /**
         * 引用服務
         *
         * @param <T>            介面泛型
         * @param interfaceClass 介面型別
         * @param host           伺服器主機名
         * @param port           伺服器埠
         * @return 遠端服務,返回代理物件
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
            throws Exception {
            if (interfaceClass == null) {
                throw new IllegalArgumentException("Interface class == null");
            }
            // JDK 動態代理的約束,只能實現對介面的代理
            if (!interfaceClass.isInterface()) {
                throw new IllegalArgumentException(
                    "The " + interfaceClass.getName() + " must be interface class!");
            }
            if (host == null || host.length() == 0) {
                throw new IllegalArgumentException("Host == null!");
            }
            if (port <= 0 || port > 65535) {
                throw new IllegalArgumentException("Invalid port " + port);
            }
            System.out.println(
                "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
    
            // JDK 動態代理
            T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[] {interfaceClass}, new InvocationHandler() {
                    // invoke方法本意是對目標方法的增強,在這裡用於傳送RPC請求和接收響應
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] arguments)
                        throws Throwable {
                        // 建立Socket客戶端,並與服務端建立連結
                        Socket socket = new Socket(host, port);
                        try {
                            /* 客戶端像服務端進行請求,並將請求引數寫入流中*/
                            // 將物件寫入到物件輸出流,並將其傳送到Socket流中去
                            ObjectOutputStream output = new ObjectOutputStream(
                                socket.getOutputStream());
                            try {
                                // 傳送請求
                                System.out.println("\nClient傳送請求 : ");
                                output.writeUTF(method.getName());
                                System.out.println("methodName : " + method.getName());
                                output.writeObject(method.getParameterTypes());
                                System.out.println("parameterTypes : " + Arrays.toString(method
                                    .getParameterTypes()));
                                output.writeObject(arguments);
                                System.out.println("arguments : " + Arrays.toString(arguments));
    
    
                                /* 客戶端讀取並返回服務端的響應*/
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {
                                    Object result = input.readObject();
                                    if (result instanceof Throwable) {
                                        throw (Throwable)result;
                                    }
                                    System.out.println("\nClient收到響應 : ");
                                    System.out.println("result : " + result);
                                    return result;
                                } finally {
                                    input.close();
                                }
                            } finally {
                                output.close();
                            }
                        } finally {
                            socket.close();
                        }
                    }
                });
            return proxy;
        }
    }
  •   以上是簡易RPC框架實現的簡易完整程式碼。


    四. 關於RPC框架的若干問題說明

    (1).RPC框架如何做到透明化遠端服務呼叫?

      如何封裝通訊細節才能讓使用者像以本地呼叫方式呼叫遠端服務呢?就Java而言,動態代理恰是解決之道。Java動態代理有JDK動態代理和CGLIB動態代理兩種方式。儘管位元組碼生成方式實現的代理更為強大和高效,但程式碼維護不易,因此RPC框架的大部分實現還是選擇JDK動態代理的方式。在上面的例子中,RPCFramework實現中的invoke方法封裝了與遠端服務通訊的細節,消費方首先從RPCFramework獲得服務提供方的介面,當執行helloService.hi(“Panda”)方法時就會呼叫invoke方法。


    (2).如何釋出自己的服務?

      如何讓別人使用我們的服務呢?難道就像我們上面的程式碼一樣直接寫死服務的IP以及埠就可以了嗎?事實上,在實際生產實現中,使用人肉告知的方式是不現實的,因為實際生產中服務機器上/下線太頻繁了。如果你發現一臺機器提供服務不夠,要再新增一臺,這個時候就要告訴呼叫者我現在有兩個IP了,你們要輪詢呼叫來實現負載均衡;呼叫者咬咬牙改了,結果某天一臺機器掛了,呼叫者發現服務有一半不可用,他又只能手動修改程式碼來刪除掛掉那臺機器的ip。這必然是相當痛苦的!

      有沒有一種方法能實現自動告知,即機器的上線/下線對呼叫方透明,呼叫者不再需要寫死服務提供方地址?當然可以,生產中的RPC框架都採用的是自動告知的方式,比如,阿里內部使用的RPC框架HSF是通過ConfigServer來完成這項任務的。此外,Zookeeper也被廣泛用於實現服務自動註冊與發現功能。不管具體採用何種技術,他們大都採用的都是 釋出/訂閱模式。


    (3).序列化與反序列化

      我們知道,Java物件是無法直接在網路中進行傳輸的。那麼,我們的RPC請求如何發給服務端,客戶端又如何接收來自服務端的響應呢?答案是,在傳輸Java物件時,首先對其進行序列化,然後在相應的終端進行反序列化還原物件以便進行處理。事實上,序列化/反序列化技術也有很多種,比如Java的原生序列化方式、JSON、阿里的Hessian和ProtoBuff序列化等,它們在效率上存在差異,但又有各自的特點。


      除上面提到的三個問題外,生產中使用的RPC框架要考慮的東西還有很多,在此就不作探討了。本文的目的就是為了讓各位看官對RPC框架有一個感性的、較為深入的瞭解,如果達到了這一目的,筆者的目的基本就算達到了。


    五. 總結

      本文闡述了遠端呼叫的產生背景,然後介紹了RPC的基本概念和要解決的問題,之後手動實現了簡易得RPC框架並佐以例項進行演示,使看官們對RPC有一個感性完整的認識,最後討論了RPC框架的幾個重要問題。總之,RPC框架的精髓在於動態代理和反射,通過它們使得遠端呼叫“本地化”,對使用者透明且友好。