1. 程式人生 > >RPC遠端過程呼叫之我的理解(附帶專案希望有人交流)

RPC遠端過程呼叫之我的理解(附帶專案希望有人交流)

最近在學習開發過程中使用到了阿里開發的dubbo框架,將專案進行分散式。
最近的學習瞭解到了一些關於RPC的原理,心血來潮就試著實現了一下自己的RPC功能。
專案主要分為三個子專案
API 專案 定義了通訊的資料模型和序列化反序列化所使用的工具以及專案測試使用的bean和介面
Server 專案作為RPC的過程服務提供者
Client 專案是服務的呼叫者

我這裡使用的是MINA作為TCP伺服器 利用jdk自帶的方法進行序列化 在服務呼叫方做一個介面的代理讓介面執行socket通訊將呼叫服務的請求傳送到Server端在Server端反射出介面的實現類執行方法再講返回資料通過網路寫回去

先給大家看一下我的專案結果截圖

這裡是我的專案結構截圖

這裡是本專案的呼叫過程
這裡寫圖片描述

主要是利用了代理和反射 利用代理在客戶端呼叫時產生一個socket連線訪問服務端,在服務端根據資料反射出一個已經存在的介面實現型別來執行方法,再將執行結果返回給呼叫的客戶端

接下來直接上程式碼
這裡是我的API專案中的一些資料型別的定義
第一個是 請求傳送時的呼叫描述
包括 呼叫的介面名稱,呼叫方法名稱,呼叫方法引數列表,呼叫方法引數型別列表

/**
 * 用於描述本次呼叫的情況
 * @author Ming
 *
 */
public class NetModel implements Serializable
{
//類名稱 private String type; //方法名稱 private String method; //引數 private Object[] args; //引數的型別 private String[] types; public String getType() { return type; } public void setType(String type) { this.type = type; } public String getMethod() { return
method; } public void setMethod(String method) { this.method = method; } public Object[] getArgs() { return args; } public void setArgs(Object[] args) { this.args = args; } public String[] getTypes() { return types; } public void setTypes(String[] types) { this.types = types; } }

接下來就是序列化工具使用了jdk自帶的序列化方式,
這裡我還遇到了一些困難:單純的從流中讀取陣列因為不知道陣列的長度是多少所以在反序列化的過程中總是因為byte[]長度不準確不能正確的讀取出物件,所以在序列化的陣列前面多加了4位是一個int型別的資料表示的是這次序列化產生的byte[]長度這樣就可以準確的反序列化出想要的物件

/**
 * 網路通訊序列化工具
 * @author Ming
 *
 */
public class SerializationUtil {
    /**
     * 物件序列化成陣列
     * @param object
     * @return
     */
    public static byte[] objectToBytes(Object object){  
        ByteArrayOutputStream output = new ByteArrayOutputStream();  
        ObjectOutputStream objectOut;  
        try {  
            objectOut = new ObjectOutputStream(output);  
            objectOut.writeObject(object);  
            objectOut.close();  
            output.close();  
        } catch (IOException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
        return output.toByteArray();  
    }
    /**
     * 陣列反序列化成物件
     * @param bytes
     * @return
     */
    public static Object byetsToObject(byte[] bytes){  
        ByteArrayInputStream input = new ByteArrayInputStream(bytes);  
        ObjectInputStream objectIn;  
        Object object = null;  
        try {  
            objectIn = new ObjectInputStream(input);      
            object = objectIn.readObject();  
            objectIn.close();  
            input.close();  

        } catch (IOException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }catch (ClassNotFoundException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }         
        return object;  
    }
    /**
     * 將int數值轉換為佔四個位元組的byte陣列,本方法適用於(低位在前,高位在後)的順序。  
     * @param value
     * @return
     */
    public static byte[] intToBytes(int value)   {   
        byte[] byte_src = new byte[4];  
        byte_src[3] = (byte) ((value & 0xFF000000)>>24);  
        byte_src[2] = (byte) ((value & 0x00FF0000)>>16);  
        byte_src[1] = (byte) ((value & 0x0000FF00)>>8);    
        byte_src[0] = (byte) ((value & 0x000000FF));          
        return byte_src;  
    } 

     /**  
        * byte陣列中取int數值,本方法適用於(低位在前,高位在後)的順序。 
        *   
        * @param ary  
        *            byte陣列  
        * @param offset  
        *            從陣列的第offset位開始  
        * @return int數值  
        */    
    public static int bytesToInt(byte[] ary, int offset) {  
        int value;    
        value = (int) ((ary[offset]&0xFF)   
                | ((ary[offset+1]<<8) & 0xFF00)  
                | ((ary[offset+2]<<16)& 0xFF0000)   
                | ((ary[offset+3]<<24) & 0xFF000000));  
        return value;  
    }  

    /**
     * 將物件直接序列化成為RPC網路通訊中使用的二進位制陣列
     * @param obj
     * @return
     */
    public static byte[] objToNetBytes(Object obj) {
        byte[] objBytes = objectToBytes(obj);
        int length = objBytes.length;
        byte[] lengthBytes = intToBytes(length);
        byte[] RPCBytes = new byte[length+4];
        System.arraycopy(lengthBytes, 0, RPCBytes, 0, 4);
        System.arraycopy(objBytes, 0, RPCBytes, 4, length);
        return RPCBytes;    
    }

    /**
     * 將RPC網路通訊中的二進位制陣列序列化成為使用物件
     * @param bytes
     * @return
     */
    public static Object netBytesToObj(byte [] RPCbytes) {
        int length = bytesToInt(RPCbytes, 0);
        byte [] objBytes = new byte[length];
        System.arraycopy(RPCbytes, 4, objBytes, 0, length);
        Object object = byetsToObject(objBytes);
        return object;      
    }
}

API專案基本介紹完成了剩下的就是介面和JavaBean就不再多做說明了,在文章最後我會帶上我的專案。直接上程式碼了

public class Player implements Serializable{
    @Override
    public String toString() {
        return "Player [year=" + year + ", name=" + name + ", date=" + date + "]";
    }

    private int year;

    private String name;

    private Date date;


    public Player(int year, String name, Date date) {
        super();
        this.year = year;
        this.name = name;
        this.date = date;
    }

    public Player() {

    }

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }
}
public interface PlayerService {
    Player creat();

    List<Player> carList(Integer x);

    List<Player> list(Integer count,String name);
}

下一步介紹的是Server端的專案
先上一個專案結構圖
專案截圖

className.properties是一個介面實現類的配置檔案,key是介面名稱,value是介面的實現,我這裡就只有一個測試的介面和介面實現類
所以就只有一行。

com.learn.api.service.PlayerService=com.learn.server.serviceImpl.PlayerServiceImpl

conf.properties檔案是給MINA使用的用來說明監聽的地址和埠
ip=127.0.0.1
port=10800

MinaStart 是一個MINA框架的入口根據conf.properties開啟一個埠監聽

/**
 * mina程式入口
 * @author Ming
 *
 */
public class MinaStart {
    private static IoAcceptor accept = new NioSocketAcceptor();
    public static Properties className = new Properties();

    public static void start(int port,String ip,IoHandlerAdapter adapter ) throws Exception{
        accept.getSessionConfig().setReadBufferSize(2048 * 10);
        accept.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,60*5);

        LoggingFilter lf = new LoggingFilter();
        lf.setSessionCreatedLogLevel(LogLevel.INFO);
        lf.setSessionOpenedLogLevel(LogLevel.INFO);
        lf.setMessageReceivedLogLevel(LogLevel.INFO);
        lf.setMessageSentLogLevel(LogLevel.INFO);

        accept.getFilterChain().addLast("logger", lf);
        accept.getFilterChain().addLast("exceutor",
                new ExecutorFilter(Executors.newCachedThreadPool()));
        accept.setHandler(adapter);
        accept.bind(new InetSocketAddress(ip, port));
        System.out.println("啟動MINA服務 監聽埠:"+port);
    }

    public static void main(String[] args) throws Exception{
        Properties conf = new Properties(); 
        conf.load(MinaStart.class.getResourceAsStream("/conf.properties"));
        String ip = conf.getProperty("ip");
        int port = Integer.parseInt(conf.getProperty("port"));
        start(port, ip, new RpcInvokeHandler());
    }
}

ClassNameManerge 是一個介面實現類的管理類,順便就用了一個不怎麼嚴謹的單例模式…根據className.properties 反射獲取例項

public class ClassNameManerge {
    private static Map<String, Object> instances = new HashMap<String, Object>();

    private static Properties classNames = new Properties();

    static {
        try {
            classNames.load(ClassNameManerge.class.getResourceAsStream("/className.properties"));
        } catch (Exception e) {
            System.out.println("配置檔案讀取異常");
            e.printStackTrace();
        }
    }

    /**
     * 根據配置檔案中的配置介面實現類獲取對應的介面實習 (使用了一個不怎麼嚴謹的單例模式)
     * @param className 介面的類名稱
     * @return 介面的一個實現
     */
    public static Object getInstance(String className) {
        String type = classNames.getProperty(className);
        Object instance = instances.get(type);
        if(instance!=null) {
            return instance;
        }
        try {
            Class clazz = Class.forName(type);
            instance = clazz.newInstance();
            instances.put(className, instance);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return instance;
    }
}

接下來就是server端主要處理的業務了是一個MINA框架的執行邏輯處理的型別,其中messageReceived方法指的是當訊息接收到時所執行的程式碼

public class RpcInvokeHandler extends IoHandlerAdapter{
    /**
     * 提供了一個在Server端執行方法的過程
     * 這裡偷懶使用的是MINA框架
     */
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        try {
            /**
             * 從資料流中讀取出描述呼叫的物件
             */
            IoBuffer bf = (IoBuffer) message;
            byte[] netBytes = new byte[bf.limit()];
            bf.get(netBytes);
            Object obj = SerializationUtil.netBytesToObj(netBytes);
            NetModel model = (NetModel) obj;

            Object[] args = model.getArgs();
            String methodName = model.getMethod();
            String type = model.getType();
            String[] types = model.getTypes();

            Class [] classes = null;
            if(types!=null) {
                classes = new Class[types.length];
                for (int i = 0; i < classes.length; i++) {
                    classes[i] = Class.forName(types[i]);
                }
            }

            Object service = ClassNameManerge.getInstance(type);
            Class<? extends Object> clazz = service.getClass();
            Method method = clazz.getMethod(methodName, classes);
            Object invoke = method.invoke(service, args);
            byte[] bytes = SerializationUtil.objToNetBytes(invoke);

            /**
             * 這裡一段可以註釋掉只是用來展示說明呼叫方法說明的
             */
            System.out.println("RPC >>>");
            System.out.println("\t inerface:"+type);
            System.out.println("\t method:"+methodName);
            String ss ="";
            for (String b : types) {
                ss += (b+"--");
            }
            System.out.println("\t argsType:"+ss);
            String tt ="";
            for (Object b : args) {
                tt += (b+"--");
            }
            System.out.println("\t args:"+tt);

            /**
             * 將呼叫結果寫回呼叫客戶端
             */
            IoBuffer buffer = IoBuffer.allocate(bytes.length);  
            buffer.put(bytes, 0, bytes.length);    
            buffer.flip();    
            session.write(buffer);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

最後還有一個介面的實現類

public class PlayerServiceImpl implements PlayerService{

    public Player creat() {
        return new Player(22,"cmm",new Date());
    }

    public List<Player> carList(Integer x) {
        // TODO Auto-generated method stub
        List<Player> list = new ArrayList<Player>();
        for (int i = 0; i < x; i++) {
            list.add(new Player(22*i,"cmm"+i,new Date()));
        }
        return list;
    }

    public List<Player> list(Integer count, String name) {
        List<Player> list = new ArrayList<Player>();
        for (int i = 0; i < count; i++) {
            list.add(new Player(22*i,name+i,new Date()));
        }
        return list;
    }

}

到這裡server端的處理程式碼也介紹完了

下一步就是client端的程式碼介紹了

也上一個結構圖吧
client端專案結構

config.properties檔案配置的是server端的ip以及埠

ip=127.0.0.1
port=10800

RpcClient是一個socket呼叫客戶端裡面其實就只有一個方法就是把呼叫說明給server 返回server的處理結果


/**
 * scoket 通訊客戶端
 * @author Ming
 *
 */
public class RpcClient {
    private static Properties conf = new Properties();

    private static int port = 0;

    private static String ip = null;

    static {
        try {
            conf.load(RpcClient.class.getResourceAsStream("/config.properties"));
            ip = conf.getProperty("ip");
            port = Integer.parseInt(conf.getProperty("port"));
        } catch (Exception e) {
            System.out.println("配置檔案讀取失敗");
            e.printStackTrace();
        }
    }

    /**
     * 呼叫server端的socket訪問
     * @param model 本次呼叫的描述
     * @return 呼叫結果
     */
    public Object invoke (NetModel model) {
        Object value = null; 
        try {
            Socket socket = new Socket(ip, port);
            OutputStream out = socket.getOutputStream();
            byte[] bytes = SerializationUtil.objToNetBytes(model);
            out.write(bytes);
            InputStream in = socket.getInputStream();
            byte [] legnthBytes = new byte[4];
            in.read(legnthBytes);
            int legnth = SerializationUtil.bytesToInt(legnthBytes, 0);
            byte [] objBytes =  new byte [legnth];
            in.read(objBytes);
            value = SerializationUtil.byetsToObject(objBytes);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return value;

    }

}

ProxyFactory看名字就應該明白是一個代理物件的工廠,用於生成客戶端的介面代理物件

public class ProxyFactory {

    private static InvocationHandler handler = new InvocationHandler() {

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            RpcClient client = new RpcClient();
            NetModel model = new NetModel();

            Class<?>[] classes = proxy.getClass().getInterfaces();
            String className = classes[0].getName();

            String [] types = null; 
            if(args!=null) {
                types = new String [args.length];
                for (int i = 0; i < types.length; i++) {
                    types[i] = args[i].getClass().getName();
                }
            }

            model.setArgs(args);
            model.setTypes(types);
            model.setType(className);
            model.setMethod(method.getName());

            Object invoke = client.invoke(model);
            return invoke;
        }
    };

    public static <T> T getInstance(Class<T> clazz) {
        ClassLoader classLoader = clazz.getClassLoader();   
        Class<?>[] interfaces = new Class[] {clazz};  
        return (T) Proxy.newProxyInstance(classLoader, interfaces, handler);  
    }
}

最後就是test測試類了

/**
 * 測試類
 * @author Ming
 *
 */
public class Test {
    public static void main(String[] args) {
        name();
    }

    public static void name() {
        PlayerService service = ProxyFactory.getInstance(PlayerService.class);
        List<Player> carList = service.carList(6);
        for (Player player : carList) {
            System.out.println(player);
        }
        System.out.println("-----------------------");
        List<Player> list = service.list(5,"James");
        for (Player player : list) {
            System.out.println(player);
        }
    }
}

先執行server 端 MinaStart的main方法 再執行client端Test的main方法
然後當然就是執行結果的截圖了
這裡寫圖片描述
這裡寫圖片描述

最後是我的程式碼了,在csdn的檔案裡面我加了那張過程的圖片在部落格中的展現效果不是很理想

對了如果使用git下載的同學要說聲對不起,因為我是先上傳的git再寫的本篇部落格,可能有一些程式碼塊的註釋沒有那麼完整,不過基本的註釋還是有的。
最後希望讀過我這篇部落格的人,能夠給我一些意見一起交流學習謝謝。