1. 程式人生 > >分散式思想和rpc解決方案介紹

分散式思想和rpc解決方案介紹

1.RPC的誕生

RPC(Remote Procedure Call)遠端過程呼叫,通過這個rpc協議,呼叫遠端計算機上的服務,就像呼叫本地的服務一樣。

image

不同的服務部署在不同的機器上面,並且在啟動後在註冊中心進行註冊,如果要呼叫,可以通過rpc呼叫對應的服務。
如圖,在不同的Controller中可以從註冊中心(可以使用eureka,zookeeper實現,本文例子使用簡單的hash
map作為實現)獲取可以呼叫的服務,然後通過rpc進行呼叫。

2.java遠端的遠端呼叫-RMI(Remote method Invoke)

java提供了遠端的對於遠端服務呼叫的支援:RMI(Remote method Invoke)。

3.手寫一個RPC框架

3.1 實現的技術方案

設計技術點:Socket通訊、動態代理與反射、Java序列化

RPC本質是使用動態代理,通過網路通訊技術進行增強。

image

3.2程式碼實現

3.2.1 客戶端程式碼

image

    package main.java.rpc;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    
    //rpc框架的客戶端代理部分
    public class RpcClientFrame {
           /*動態代理類,實現了對遠端服務的訪問*/
        private static class DynProxy implements InvocationHandler{
            //遠端呼叫的服務
            private Class serviceClass;
            //遠端呼叫地址
            private final InetSocketAddress addr;
            public DynProxy(Class serviceClass,InetSocketAddress addr) {
                this.serviceClass = serviceClass;
                this.addr = addr;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                ObjectInputStream inputStream = null;
                ObjectOutputStream outputStream = null;
                Socket socket = null;
                try {
                    socket = new Socket();
                    socket.connect(addr);
                    //類名 方法名 方法型別列表  方法入參列表
                    outputStream = new ObjectOutputStream(socket.getOutputStream());
                    outputStream.writeUTF(serviceClass.getSimpleName());
                    outputStream.writeUTF(method.getName());
                    outputStream.writeObject(method.getParameterTypes());
                    outputStream.writeObject(args);
                    outputStream.flush();
                    
                    inputStream = new ObjectInputStream(socket.getInputStream());
                     //我們要把呼叫的細節打印出來
                    System.out.println("遠端呼叫成功!" + serviceClass.getName());
                    //最後要網路的請求返回給返回
                    return inputStream.readObject();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    socket.close();
                    inputStream.close();
                    outputStream.close();
                }
                return null;
            }
            
        }
        //定義客戶端要定義的服務
        package enjoyedu.service;

        /**
         * 享學課堂
         *類說明:服務員介面
         */
        public interface TechInterface {
            //洗腳服務
            String XJ(String name);
        }
    package main.java;

    import main.java.rpc.RpcClientFrame;
    import main.java.service.TechInterface;
    
    /**
     * rpc的客戶端呼叫遠端服務
     * @author hasee
     *
     */
    public class Client {
        public static void main(String[] args) {
            //動態代理獲取我們的物件
            TechInterface techInterface = (TechInterface) RpcClientFrame.getProxyObject(TechInterface.class);
            //進遠端呼叫我們的物件
            System.out.println(techInterface.XJ("luke"));
        }
    }

3.2.2服務端和註冊中心程式碼

image

1.//服務端定義要呼叫的服務介面
package service;
public interface TechInterface {
    //洗腳服務
    String XJ(String name);
}

2.//服務端定義要呼叫的服務的介面實現類
package service.impl;
import service.TechInterface;
public class TechImpl implements TechInterface {
      public String XJ(String name) {

            return "您好,13號技師為你服務:"+name;
        }
}

package server;
import java.io.IOException;
import javax.imageio.spi.RegisterableService;
import register.RegisterCenter;
import service.TechInterface;
import service.impl.TechImpl;

/**
 * rpc的服務端,提供服務
 * @author hasee
 *
 */
public class Server {
    public static void main(String[] args) throws IOException {
        RegisterCenter registerCenter = new RegisterCenter(8888);
        //註冊技師物件至註冊中心
        registerCenter.register(TechInterface.class, TechImpl.class);
        registerCenter.start();
    }
}

package register;
/**
 * 註冊中心,這個例子使用一個hashmap作為實現
 * @author hasee
 *
 */

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RegisterCenter {
    //執行緒池
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    //定義註冊中心的靜態物件
    private static Map<String, Class> serviceRegistry = new HashMap<String, Class>();
    //服務埠
    private static int port = 8888;
    
    /**
     *  註冊服務
     * @param serviceInterface 介面名字
     * @param impl 實現類的class物件
     */
    public void register(Class serviceInterface, Class impl) {
        //服務的註冊:socket通訊+反射
        serviceRegistry.put(serviceInterface.getSimpleName(), impl);
    }
    
    public RegisterCenter(int port) {
        this.port = port;
    }
    
     
    /**
     * 啟動服務端
     * @throws IOException
     */
    public static void start() throws IOException {
        // 建立ServerSocket例項監聽埠
        ServerSocket serverSocket = new ServerSocket(port);
        System.out.println("start server");
         // 1.監聽客戶端的TCP連線,接到TCP連線後將其封裝成task,由執行緒池執行,並且同時將socket送入(server.accept()=socket)
        try {
            while (true) {
                //serverSocket.accept()會阻塞直到服務端接受到客戶端的請求。
                executorService.execute(new ServiceTask(serverSocket.accept()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 將客戶端的每一個請求都封裝成一個執行緒ServiceTask,投放到執行緒池裡面進行執行。
     * @author hasee
     *
     */
    private static class ServiceTask implements Runnable {
        private Socket client;
        public ServiceTask(Socket client) {
            this.client = client;
        }
        public void run() {
            //讀取socket中的流資料
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            try {
                // 類名、方法名、引數型別、引數值
                inputStream = new ObjectInputStream(client.getInputStream());
                //獲取呼叫服務名稱
                String serviceName = inputStream.readUTF();
                //獲取呼叫方法的名稱
                String methodName = inputStream.readUTF();
                //獲取引數型別列表
                Class<?>[] requiresTypes = (Class<?>[]) inputStream.readObject();
                //獲取引數列表
                Object[] args = (Object[]) inputStream.readObject();
                Class serviceClass = serviceRegistry.get(serviceName);
                //反射呼叫方法
                Method method = serviceClass.getMethod(methodName, requiresTypes);
                Object result = method.invoke(serviceClass.newInstance(), args);
                //把結果反饋到客戶端
                outputStream = new ObjectOutputStream(client.getOutputStream());
                outputStream.writeObject(result);
                outputStream.flush();
                //關閉io資源
                inputStream.close();
                client.close();
                
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
}

3.2.3 測試結果

  • 先啟動服務端
  • 其次啟動客戶端

輸出結果:您好,13號技師為你服務:luke