1. 程式人生 > >從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現

從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現

RPC-Common模組提供RPC-Server和RPC-Client的通用物件,封裝統一規則,使RPC Server和RPC Client 可以基於同一協議通訊。主要包含底層通訊的Netty所需的編碼解碼器(RpcEncoder,RpcDecoder),實現自定義協議的傳輸物件(RpcRequest、RpcResponse)以及編碼解碼器對Java物件序列化(反序列化)使用的工具 ProtoSerializationUtil。

文章目錄

一 模組介紹

結構

結構如下圖
rpc-netty-common

流程圖

  1. RPC-Client封裝服務請求傳送到RPC-Server
  1. RPC-Server獲取請求,執行業務處理,返回結果
  1. RPC-Client獲取結果進行處理

二 pom檔案

RpcRequest和RpcResponse是Java bean,不需要多餘的依賴。
RpcEncoder和RpcDecoder是Netty編解碼器,依賴於netty-all。
ProtoSerializationUtil是基於ProtoStuff的序列化工具,並依賴objenesis實現更高階的反射功能,進行物件反序列化。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <artifactId>rpc-netty-common</artifactId> <parent> <groupId>com.github.linshenkx</groupId> <artifactId>rpc-netty-spring-boot-starter</artifactId> <version>1.0.5.RELEASE</version> <relativePath>../</relativePath> </parent> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> </dependency> <dependency> <groupId>org.objenesis</groupId> <artifactId>objenesis</artifactId> </dependency> </dependencies> </project>

三 RpcRequest和RpcResponse

RpcRequest

RpcRequest是Rpc請求物件,其屬性相當於自定義協議規則,如下
其中requestId是為了區分對相同服務的不同請求,還可以加上超時時間、版本資訊等來定製自己的協議

@Data
public class RpcRequest {
    /**
     * 請求ID
     */
    private String requestId;
    /**
     * 介面名稱
     */
    private String interfaceName;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 方法引數型別列表
     */
    private Class<?>[] parameterTypes;
    /**
     * 引數列表
     */
    private Object[] parameters;
}

RpcResponse

以下為最簡單也是最基本的RpcResponse,複雜一點可以區分訊息頭和訊息體,在訊息頭裡指定編碼長度,編碼型別等

@Data
public class RpcResponse {
    /**
     * 對應請求的requestId
     */
    private String requestId;
    /**
     * 異常資訊
     */
    private Exception exception;
    /**
     * 響應結果
     */
    private Object result;
}

四 序列化工具

目前只提供了基於protostuff的序列化工具,後面可以升級成一個序列化引擎,可以動態選擇序列化方式
protostuff的GitHub地址為:https://github.com/protostuff/protostuff

public class ProtoSerializationUtil {

    private static final Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

    private static final Objenesis objenesis = new ObjenesisStd(true);

    /**
     * 序列化(物件 -> 位元組陣列)
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> cls = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(cls);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化(位元組陣列 -> 物件)
     */
    public static <T> T deserialize(byte[] data, Class<T> cls) {
        try {
            T message = objenesis.newInstance(cls);
            Schema<T> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, message, schema);
            return message;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> cls) {
        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(cls);
            cachedSchema.put(cls, schema);
        }
        return schema;
    }
}

五 RpcEncoder和RpcDecoder

RpcEncoder編碼器

編碼器負責將Java物件序列化成位元組陣列,再封裝在Netty 的ByteBuf裡面

public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass){
        this.genericClass=genericClass;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if(genericClass.isInstance(msg)){
            byte[] data= ProtoSerializationUtil.serialize(msg);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

RpcDecoder

傳送的時候比較簡單,因為byteBuf是動態擴充套件的。但接收的時候就需要考慮半包、粘包的問題了。這裡只是用了(in.readableBytes()<4)來簡單處理,但如果傳輸的物件比較大,就應該考慮加其他的Decoder來解決。這裡暫未擴充套件。

public class RpcDecoder extends ByteToMessageDecoder {
    private Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass){
        this.genericClass=genericClass;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes()<4){
            return;
        }
        in.markReaderIndex();
        int dataLength=in.readInt();
        if(in.readableBytes()<dataLength){
            in.resetReaderIndex();
            return;
        }
        byte[] data=new byte[dataLength];
        in.readBytes(data);
        out.add(ProtoSerializationUtil.deserialize(data,genericClass));
    }
}