1. 程式人生 > >用Java實現RPC框架例項

用Java實現RPC框架例項

一、RPC簡介

RPC,全稱為Remote Procedure Call,即遠端過程呼叫,它是一個計算機通訊協議。它允許像呼叫本地服務一樣呼叫遠端服務。它可以有不同的實現方式。如RMI(遠端方法呼叫)、Hessian、Http invoker等。另外,RPC是與語言無關的。

RPC示意圖

 

如上圖所示,假設Computer1在呼叫sayHi()方法,對於Computer1而言呼叫sayHi()方法就像呼叫本地方法一樣,呼叫 –>返回。但從後續呼叫可以看出Computer1呼叫的是Computer2中的sayHi()方法,RPC遮蔽了底層的實現細節,讓呼叫者無需關注網路通訊,資料傳輸等細節。

二、RPC框架的實現

上面介紹了RPC的核心原理:RPC能夠讓本地應用簡單、高效地呼叫伺服器中的過程(服務)。它主要應用在分散式系統。如Hadoop中的IPC元件。但怎樣實現一個RPC框架呢?
 

從下面幾個方面思考,僅供參考:

1.通訊模型:假設通訊的為A機器與B機器,A與B之間有通訊模型,在Java中一般基於BIO或NIO;。

2.過程(服務)定位:使用給定的通訊方式,與確定IP與埠及方法名稱確定具體的過程或方法;

3.遠端代理物件:本地呼叫的方法(服務)其實是遠端方法的本地代理,因此可能需要一個遠端代理物件,對於Java而言,遠端代理物件可以使用Java的動態物件實現,封裝了呼叫遠端方法呼叫;

4.序列化,將物件名稱、方法名稱、引數等物件資訊進行網路傳輸需要轉換成二進位制傳輸,這裡可能需要不同的序列化技術方案。如:protobuf,Arvo等。

三、Java實現RPC框架

1、實現技術方案

下面使用比較原始的方案實現RPC框架,採用Socket通訊、動態代理與反射與Java原生的序列化。

2、RPC框架架構

RPC架構分為三部分:

1)服務提供者,執行在伺服器端,提供服務介面定義與服務實現類。

2)服務中心,執行在伺服器端,負責將本地服務釋出成遠端服務,管理遠端服務,提供給服務消費者使用。

3)服務消費者,執行在客戶端,通過遠端代理物件呼叫遠端服務。

3、 具體實現

服務提供者介面定義與實現,程式碼如下:

HelloServices介面實現類:

 

public interface HelloService {
    String sayHi(String name);
}

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHi(String name) {
        return "hi " + name;
    }
}

 

服務中心程式碼實現,程式碼如下:

 

public interface Server {

    void stop();

    void start() throws IOException;

    void register(Class serviceInterface, Class impl);

    boolean isRunning();

    int getPort();

}

服務中心實現類:

 

package com.example.demoms.rpc.server;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

 

public class ServerImpl implements Server {


    private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private Map<String, Class> serviceRegistry = new HashMap<>();

    private boolean isRunning = false;

    private int port;

    public ServerImpl(int port) {
        this.port = port;
    }

    public ServerImpl() {
    }

    @Override
    public void stop() {
        isRunning = false;
        executorService.shutdown();
    }

    @Override
    public void start() throws IOException {

        try (ServerSocket serverSocket = new ServerSocket(port)) {
            //serverSocket.bind(new InetSocketAddress(port));
            System.out.println("server start...");
            try {
                while (true) {
                    executorService.execute(new ServerTask(serverSocket.accept()));
                }
            } finally {
                serverSocket.close();
            }

        }

    }

    @Override
    public void register(Class serviceInterface, Class impl) {
        serviceRegistry.put(serviceInterface.getName(), impl);
    }

    @Override
    public boolean isRunning() {
        return isRunning;
    }

    @Override
    public int getPort() {
        return port;
    }


    class ServerTask implements Runnable {

        private Socket client;

        public ServerTask(Socket accept) {
            this.client = accept;
        }

        @Override
        public void run() {
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            try {
                inputStream = new ObjectInputStream(client.getInputStream());
                String serviceName = inputStream.readUTF();
                String methodName = inputStream.readUTF();
                Class<?>[] argsType = ((Class<?>[]) inputStream.readObject());
                Object[] args = (Object[]) inputStream.readObject();

                Class serviceClass = serviceRegistry.get(serviceName);
                if (serviceClass == null) {
                    throw new ClassNotFoundException("遠端呼叫類不存在:" + serviceName);
                }
                Method method = serviceClass.getMethod(methodName, argsType);
                Object result = method.invoke(serviceClass.newInstance(), args);

                outputStream = new ObjectOutputStream(client.getOutputStream());
                outputStream.writeObject(result);

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (outputStream != null) {
                        outputStream.close();
                    }
                    if (inputStream != null) {
                        inputStream.close();
                    }
                    if (client != null) {
                        client.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


}

客戶端的遠端代理物件:

 

package com.example.demoms.rpc.consumer;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;

public class RPCClient {

    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress address) {
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface},
                (proxy, method, args) -> {
                    Socket socket = null;
                    ObjectInputStream inputStream = null;
                    ObjectOutputStream outputStream = null;
                    try {
                        socket = new Socket();
                        socket.connect(address);

                        outputStream = new ObjectOutputStream(socket.getOutputStream());
                        outputStream.writeUTF(serviceInterface.getName());
                        outputStream.writeUTF(method.getName());
                        outputStream.writeObject(method.getParameterTypes());
                        outputStream.writeObject(args);

                        inputStream = new ObjectInputStream(socket.getInputStream());
                        return inputStream.readObject();


                    } finally {
                        if (outputStream != null) outputStream.close();
                        if (inputStream != null) inputStream.close();
                        if (socket != null) socket.close();

                    }
                });
    }

}
 

最後為測試類:

 

package com.example.demoms.rpc.consumer;

import com.example.demoms.rpc.provider.HelloService;
import com.example.demoms.rpc.provider.HelloServiceImpl;
import com.example.demoms.rpc.server.Server;
import com.example.demoms.rpc.server.ServerImpl;

import java.io.IOException;
import java.net.InetSocketAddress;

 

public class RPCTest {

    public static void main(String[] args) {
        new Thread(() -> {
            Server server = new ServerImpl(8888);
            server.register(HelloService.class, HelloServiceImpl.class);
            try {
                server.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8888));
        System.out.println(service.sayHi("hello"));
    }

}
 

執行結果:

 

server start...
hi hello

四、總結 
      RPC本質為訊息處理模型,RPC遮蔽了底層不同主機間的通訊細節,讓程序呼叫遠端的服務就像是本地的服務一樣。

五、可以改進的地方 
     這裡實現的簡單RPC框架是使用Java語言開發,與Java語言高度耦合,並且通訊方式採用的Socket是基於BIO實現的,IO效率不高,還有Java原生的序列化機制佔記憶體太多,執行效率也不高。可以考慮從下面幾種方法改進。

可以採用基於JSON資料傳輸的RPC框架; 
可以使用NIO或直接使用Netty替代BIO實現; 
使用開源的序列化機制,如Hadoop Avro與Google protobuf等; 
服務註冊可以使用Zookeeper進行管理,能夠讓應用更加穩定。