1. 程式人生 > >從頭建立一個簡單的RPC服務框架

從頭建立一個簡單的RPC服務框架

  • 概念解釋

    RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC採用客戶機/伺服器模式。請求程式就是一個客戶機,而服務提供程式就是一個伺服器。首先,客戶機呼叫程序傳送一個有程序引數的呼叫資訊到服務程序,然後等待應答資訊。在伺服器端,程序保持睡眠狀態直到呼叫資訊到達為止。當一個呼叫資訊到達,伺服器獲得程序引數,計算結果,傳送答覆資訊,然後等待下一個呼叫資訊,最後,客戶端呼叫程序接收答覆資訊,獲得程序結果,然後呼叫執行繼續進行。

  • ServerSocket 與 Socket

    • 客戶端與服務端建立連線基本原理
      http://hi.csdn.net/attachment/201009/12/0_1284280120c10H.gif
    • 客戶端與服務端建立連線流程
      http://hi.csdn.net/attachment/201009/12/0_1284280127Kfu0.gif
    • ServerSocket

      ServerSocket server = new ServerSocket(1234);  
      Socket socket = server.accept(); 

      建立並監聽一個埠為1234的ServerSocket物件,然後呼叫了ServerSocket物件的accept()方法,這個方法的執行將使Server端的程式處於阻塞狀態,程式將一直阻塞直到捕捉到一個來自Client端的請求,並返回一個用於與該Client通訊的Socket物件。此後Server程式只要向這個Socket物件讀寫資料,就可以實現向遠端的Client讀寫資料。結束監聽時,關閉ServerSocket物件

    • Socket

      Socket socket = new Socket("127.0.0.1", 1234);

      客戶端建立一個socket物件,與服務端建立連線,服務端管理客戶連線請求的任務由作業系統完成,作業系統把連線請求儲存在一個先進先出的佇列中,佇列長度一般為50。當服務端連線佇列長度大於50時,連線被拒絕。

  • 動態代理機制
    java的動態代理機制中,有兩個重要的類和介面,Proxy、InvocationHandler,他們都在java.lang.reflect包下,這個類和介面是在實現動態代理過程中必須用到的

    • Proxy
      Proxy用於建立一個代理類物件,它提供了許多方法,其中應用最多的是 newProxyInstance這個方法。

      public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)

      引數解釋:
      loader: 一個ClassLoader物件,定義了代理物件是由哪個Classloader物件來載入
      interfaces: 一個interface陣列,表示代理我們將為代理物件提供一組什麼介面,如果我提供了一組介面,則代理物件就宣稱實現了該組介面,這樣我們就能通過代理物件呼叫相應介面了
      h: 一個InvocationHandler物件,表示動態代理物件在呼叫方法時具體執行的物件

    • InvocationHandler
      每個動態代理類都必須實現InvocationHandler介面,且每個代理類的例項都關聯到一個InvocationHandler物件,當代理物件呼叫一個方法時,這個方法的呼叫會對映到InvocationHandler的invoke方法執行

      public Object invoke(Object proxy, Method method, Object[] args)

      引數解釋:
      proxy: 指代代理的真實物件
      method: 指代所要呼叫的真實物件的某個方法
      args: 指代呼叫真實物件方法時接受的引數

    • 動態代理示例

      • 定義測試介面

        public interface TestHello {
        
            void hello(String name);
        
            void sayGood();
        }
      • 定義介面實現

        public class TestHelloImpl implements TestHello{
        
            @Override
            public void hello(String name) {
                System.out.println("hello " + name);
            }
        
            @Override
            public void sayGood() {
                System.out.println("you are the best!");
            }
        }
      • 定義動態代理類

        public class MyProxy implements InvocationHandler {
        
            private Object proxy;
        
            public MyProxy(Object proxy) {
                this.proxy = proxy;
            }
        
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                method.invoke(this.proxy, args);
                return null;
            }
        
        }
      • 模擬代理呼叫

        public class Client {
        
            public static void main(String[] args) {
        
                TestHello th = new TestHelloImpl();
        
                MyProxy mp = new MyProxy(th);
        
                TestHello thProxy =
                    (TestHello) Proxy.newProxyInstance(mp.getClass().getClassLoader(), th.getClass().getInterfaces(), mp);
        
                thProxy.hello("Jerry");
        
                thProxy.sayGood();
            }
        }
      • 測試輸出

        hello Jerry
        you are the best!
  • 一個簡單的RPC實現

    • 簡單RPC框架類,定義服務註冊方法 export, 服務消費方法 refer

      public class RpcFramework {
      
          /** 
           * 暴露服務 
           *  
           * @param service 服務實現 
           * @param port 服務埠 
           * @throws Exception 
           */  
          public static void export(final Object service, int port) throws Exception {  
              if (service == null)  
                  throw new IllegalArgumentException("service instance == null");  
              if (port <= 0 || port > 65535)  
                  throw new IllegalArgumentException("Invalid port " + port);  
              System.out.println("Export service " + service.getClass().getName() + " on port " + port);  
              ServerSocket server = new ServerSocket(port);  
              for(;;) {  
                  try {  
                      final Socket socket = server.accept();  
                      new Thread(new Runnable() {  
                          @Override  
                          public void run() { 
                              try {  
                                  try {  
                                      ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
                                      try {  
                                          String methodName = input.readUTF(); 
                                          Class<?>[] parameterTypes = (Class<?>[])input.readObject();  
                                          Object[] arguments = (Object[])input.readObject();  
                                          ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
                                          try {  
                                              Method method = service.getClass().getMethod(methodName, parameterTypes);  
                                              Object result = method.invoke(service, arguments);  
                                              output.writeObject(result);
                                          } catch (Throwable t) {  
                                              output.writeObject(t);  
                                          } finally {  
                                              output.close();  
                                          }  
                                      } finally {  
                                          input.close();  
                                      }  
                                  } finally {  
                                      socket.close();  
                                  }
                              } catch (Exception e) {  
                                  e.printStackTrace();  
                              }
                          }  
                      }).start();  
                  } catch (Exception e) {  
                      e.printStackTrace();  
                  }
              }
          }  
      
          /** 
           * 引用服務 
           *  
           * @param <T> 介面泛型 
           * @param interfaceClass 介面型別 
           * @param host 伺服器主機名 
           * @param port 伺服器埠 
           * @return 遠端服務 
           * @throws Exception 
           */  
          @SuppressWarnings("unchecked")  
          public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {  
              if (interfaceClass == null)  
                  throw new IllegalArgumentException("Interface class == null");  
              if (! interfaceClass.isInterface())  
                  throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");  
              if (host == null || host.length() == 0)  
                  throw new IllegalArgumentException("Host == null!");  
              if (port <= 0 || port > 65535)  
                  throw new IllegalArgumentException("Invalid port " + port);  
              System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);  
              return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {  
                  public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {  
                      Socket socket = new Socket(host, port);  
                      try {  
                          ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
                          try {  
                              output.writeUTF(method.getName());  
                              output.writeObject(method.getParameterTypes());  
                              output.writeObject(arguments);  
                              InputStream is = socket.getInputStream();
                              ObjectInputStream input = new ObjectInputStream(is);  
                              try {  
                                  Object result = input.readObject();  
                                  if (result instanceof Throwable) {  
                                      throw (Throwable) result;  
                                  }  
                                  return result;  
                              } finally {  
                                  input.close();  
                              }  
                          } finally {  
                              output.close();  
                          }  
                      } finally {  
                          socket.close();  
                      }  
                  }  
              });  
      
      }  
      
      }
    • 定義具體介面和實現

      public interface HelloService {
      
          String hello(String name);  
      
      }
      public class HelloServiceImpl implements HelloService{
      
          @Override
          public String hello(String name) {
              return "hello " + name;
          }
      
      }
    • 生成並註冊服務物件到服務端

      public class RpcProvider {
      
          public static void main(String[] args) {
              try {
                  HelloService service = new HelloServiceImpl();  
                  RpcFramework.export(service, 1235);            
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
    • 客戶端宣告服務端定義的服務物件,使用服務

      public class RpcConsumer {
      
          public static void main(String[] args) {
              try {
                  HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1235);  
                  for (int i = 0; i < Integer.MAX_VALUE; i ++) {  
                      String hello = service.hello("World" + i);  
                      System.out.println(hello);  
                      Thread.sleep(1000);  
                  }              
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
    • 本文多數內容非原創,僅供學習使用,歡迎指正!