1. 程式人生 > >從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現

從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現

RPC-Common模組相對於1.0版本複雜了很多,最主要的變化在於將 Rpc的Netty處理器從RPC-Server和RPC-Client收回。1.0 版本的設計思路是儘可能減少冗餘依賴,所以RPC-Common一般只放通用的功能。現在則是儘可能都放在RPC-Common模組,以方便工程升級複雜化後的統一協調管理。以後功能將集中在一個模組下(名字不一定還是RPC-Common),RPC-Server和RPC-Client則會擔任將功能選擇性開放給使用者的輕量級角色。部分內容在 從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現 已有說明,本文不再贅述。

系列文章:

專欄:從零開始寫分散式RPC框架
專案GitHub地址:https://github.com/linshenkx/rpc-netty-spring-boot-starter

手寫通用型別負載均衡路由引擎(含隨機、輪詢、雜湊等及其帶權形式)
實現 序列化引擎(支援 JDK預設、Hessian、Json、Protostuff、Xml、Avro、ProtocolBuffer、Thrift等序列化方式)
從零寫分散式RPC框架 系列 2.0 (1)架構升級
從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造


從零寫分散式RPC框架 系列 2.0 (4)使用BeanPostProcessor實現自定義@RpcReference註解注入

文章目錄

一 模組介紹

結構圖

結構圖

流程圖

  1. RPC-Client 將服務請求封裝到遠端傳輸實體裡,經過編碼傳送到RPC-Server
  2. RPC-Server獲取請求,執行業務處理,返回結果
  1. RPC-Client獲取結果進行處理

二 協議相關實體類

RemotingTransporter

這裡需要注意的是invokeId我使用的是原子遞增的方式來確保唯一性,因為只需要對單客戶端保證唯一性,所以這種方式應該是夠用的。如果單客戶端有高併發請求的話(雖然不太合理),可以考慮其他方式。

Flag只佔一個位元組大小,但我這裡實現實現的效率比較低,後面應該得優化。

另外訊息體BodyContent只是個介面,根據訊息頭的標識,BodyContent可以代表不同的內容。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 18-11-12
 * @Description: 自定義遠端傳輸實體 (magic+flag+invokeId+bodyLength+bodyContent)
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RemotingTransporter {

    /**
     * 魔數
     */
    public static final short MAGIC=(short)0x9826;

    /**
     * 用原子遞增的方式來獲取不重複invokeId
     */
    private static final AtomicLong invokeIdGnerator=new AtomicLong(0L);

    /**
     * 標誌位, 一共8個地址位。
     * 低四位用來表示訊息體資料用的序列化工具的型別
     * 高四位中,第一位為1表示是request請求,為0表示是reponse應答
     * TODO:第二位為1表示雙向傳輸(即有返回response)
     * TODO:第三位為1表示是心跳ping事件
     * TODO:預留位
     */
    private Flag flag;

    @Getter
    @ToString
    public static class Flag{
        private boolean isRequest;
        private boolean isTwoway;
        private boolean isPing;
        private boolean isOther;

        private int serializeType;

        private byte thisByte;

        public Flag(boolean isRequest, boolean isTwoway, boolean isPing, boolean isOther, int serializeType) {

            if(serializeType<0||serializeType>15){
                throw new IllegalArgumentException("serializeType 對應整數應該在 0 到 15 之間");
            }

            this.isRequest = isRequest;
            this.isTwoway = isTwoway;
            this.isPing = isPing;
            this.isOther = isOther;
            this.serializeType = serializeType;

            int byteTem= (isRequest?1:0)<<7;
            byteTem=byteTem | ((isTwoway?1:0)<<6);
            byteTem=byteTem | ((isPing?1:0)<<5);
            byteTem=byteTem | ((isOther?1:0)<<4);
            byteTem=byteTem | serializeType;

            this.thisByte= (byte) byteTem;
        }

        public Flag(byte thisByte){
            this.thisByte=thisByte;

            isRequest=((thisByte>>>7)&1)==1;
            isTwoway=((thisByte>>>6)&1)==1;
            isPing=((thisByte>>>5)&1)==1;
            isOther=((thisByte>>>4)&1)==1;

            serializeType=thisByte & 15;

        }

    }

    /**
     * 每一個請求的唯一識別id(由於採用非同步通訊的方式,用來把請求request和返回的response對應上)
     */
    private long invokeId=invokeIdGnerator.getAndIncrement();

    /**
     * 訊息體位元組陣列長度
     */
    private int bodyLength;

    /**
     * 訊息體內容(還需要編碼序列化成位元組陣列)
     */
    private transient BodyContent bodyContent;

}

BodyContent及其實現類

BodyContent只起標識作用,本身無規定任何方法。在傳輸過程中以位元組陣列形式存在,由 RemotingTransporter 的訊息頭 標識其代表型別及序列化方式。

目前其實現類有 RpcRequest 和 RpcResponse,分別代表Rpc請求和Rpc響應。注意這些實現類應該只包含自身業務資訊,其他的應由 RemotingTransporter 統一標識處理。

三 Netty編碼器和解碼器

RemotingTransporterEncoder 編碼器

編碼器的實現比較簡單。
這裡需要注意,相對於1.0版本 編碼器的處理型別由構造方法動態傳入的設計,這裡直接限制了處理型別為 RemotingTransporter 。這是因為在1.0版本中,不管是RpcRequest還是RpcResponse都會被編碼,而我們並不想對每種型別都專門寫一個編碼器,所以一個比較討巧的做法是將型別動態傳入。
而在2.0版本中,因為我們統一了不管是 RpcRequest還是 RpcResponse在傳輸過程中其表現實體都為RemotingTransporter,將其型別資訊封裝到訊息頭裡,遮蔽了各自傳輸實體的差異,所以直接指定 RemotingTransporter 即可。而且還可以減少型別資訊和程式碼的高耦合,將不同型別不同編解碼方式的判斷和處理收歸到編解碼器裡。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 18-11-12
 * @Description: TODO
 */
@Log4j2
public class RemotingTransporterEncoder extends MessageToByteEncoder<RemotingTransporter> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RemotingTransporter remotingTransporter, ByteBuf byteBuf) throws Exception {
        //獲取請求體陣列
        //使用序列化引擎
        byte[] body= SerializerEngine.serialize(remotingTransporter.getBodyContent(), SerializeTypeEnum.queryByCode(remotingTransporter.getFlag().getSerializeType()));
        //magic+flag+invokeId+bodyLength+bodyContent
        byteBuf.writeShort(RemotingTransporter.MAGIC)
                .writeByte(remotingTransporter.getFlag().getThisByte())
                .writeLong(remotingTransporter.getInvokeId())
                .writeInt(body.length)
                .writeBytes(body);
        log.info("write end");

    }

}

RemotingTransporterDecoder 解碼器

需要注意,這裡解碼器沒有繼承自ByteToMessageDecoder而是使用了ReplayingDecoder(當然它也繼承自ByteToMessageDecoder),其泛型引數指定了用於狀態管理的型別(不需要狀態管理可以使用Void),這裡我們是用了內部列舉類 RemotingTransporterDecoder.State。

其工作原理可簡單理解為:ByteBuf源源不斷地傳遞給解碼器,解碼器需要根據當前狀態判斷應該執行什麼讀取操作以進行解碼。因此一個狀態通常代表著傳輸體的一個變數,根據前面所說,這裡的狀態應該有:HEADER_MAGIC, HEADER_FLAG, HEADER_INVOKE_ID, HEADER_BODY_LENGTH, BODY 。另外需要注意讀取是狀態時連續且迴圈的,因此在根據狀態 switch 時應按順序編寫,並可不使用 break 以提高效率。每個狀態對應的解碼操作執行完的時候都應該將狀態及時切換,最後一個狀態執行完切換回預設初始狀態。最初的初始狀態在父類構造方法傳入。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 18-11-12
 * @Description: TODO
 */
@Log4j2
public class RemotingTransporterDecoder extends ReplayingDecoder<RemotingTransporterDecoder.State> {

    private static final int MAX_BODY_SIZE = 1024 * 1024 * 5;

    /**
     * 用於暫存解碼RemotingTransporter資訊,一個就夠了
     */
    private static RemotingTransporter remotingTransporter=RemotingTransporter.builder().build();

    /**
     * 用於ReplayingDecoder的狀態管理
     */
    enum State {
        HEADER_MAGIC, HEADER_FLAG, HEADER_INVOKE_ID, HEADER_BODY_LENGTH, BODY
    }

    public RemotingTransporterDecoder( ){
        //設定 state() 的初始值,以便進入switch
        super(State.HEADER_MAGIC);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        //注意這裡在BODY之前都沒有break
        switch (this.state()){
            case HEADER_MAGIC:
                checkMagic(byteBuf.readShort());
                //移到下一檢查點(一是改變state的值的狀態,二是獲取到最新的讀指標的下標)
                checkpoint(State.HEADER_FLAG);
            case HEADER_FLAG:
                remotingTransporter.setFlag(new RemotingTransporter.Flag(byteBuf.readByte()));
                checkpoint(State.HEADER_INVOKE_ID);
            case HEADER_INVOKE_ID:
                remotingTransporter.setInvokeId(byteBuf.readLong());
                checkpoint(State.HEADER_BODY_LENGTH);
            case HEADER_BODY_LENGTH:
                remotingTransporter.setBodyLength(byteBuf.readInt());
                checkpoint(State.HEADER_BODY_LENGTH);
            case BODY:
                int bodyLength = checkBodyLength(remotingTransporter.getBodyLength());
                byte[] bytes=new byte[bodyLength];
                byteBuf.readBytes(bytes);
                Class genericClass=remotingTransporter.getFlag().isRequest()?RpcRequest.class: RpcResponse.class;
                BodyContent bodyContent= (BodyContent) SerializerEngine.deserialize(bytes,genericClass,SerializeTypeEnum.queryByCode(remotingTransporter.getFlag().getSerializeType()));
                RemotingTransporter remotingTransporter1=RemotingTransporter.builder()
                        .flag(remotingTransporter.getFlag())
                        .invokeId(remotingTransporter.getInvokeId())
                        .bodyLength(remotingTransporter.getBodyLength())
                        .bodyContent(bodyContent)
                        .build();
                list.add(remotingTransporter1);
                break;
            default:
                break;
        }
        //順利讀完body後應置回起點
        checkpoint(State.HEADER_MAGIC);

    }

    private int checkBodyLength(int bodyLength) throws RemotingContextException {
        if (bodyLength > MAX_BODY_SIZE) {
            throw new RemotingContextException("body of request is bigger than limit value "+ MAX_BODY_SIZE);
        }
        return bodyLength;
    }

    private void checkMagic(short magic) throws RemotingContextException{
        //檢查魔數
        if (RemotingTransporter.MAGIC != magic) {
            log.error("魔數不匹配");
            throw new RemotingContextException("magic value is not equal "+RemotingTransporter.MAGIC);
        }
    }

}

四 Netty服務端和客戶端處理器

RpcServerHandler 服務端處理器

服務端處理器的主要變化時增加了Map<String, Semaphore> serviceSemaphoreMap用於限制不同服務的工作執行緒數。使其同一時間進入handle的執行緒數受到限制。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description: RPC服務端處理器(處理RpcRequest)
 */
@Log4j2
public class RpcServerHandler extends SimpleChannelInboundHandler<RemotingTransporter> {

  /**
   * 存放 服務名稱 與 服務例項 之間的對映關係
   */
  private final Map<String, Object> handlerMap;

  /**
   * 存放 服務名稱 與 訊號量 之間的對映關係
   * 用於限制每個服務的工作執行緒數
   */
  private final Map<String, Semaphore> serviceSemaphoreMap;

  public RpcServerHandler(Map<String, Object> handlerMap,Map<String, Semaphore> serviceSemaphoreMap) {
    this.handlerMap = handlerMap;
    this.serviceSemaphoreMap=serviceSemaphoreMap;
  }

  @Override
  protected void channelRead0(ChannelHandlerContext channelHandlerContext, RemotingTransporter remotingTransporter) throws Exception {
    log.info("channelRead0 begin");
    remotingTransporter.setFlag(new RemotingTransporter.Flag(false,true,false,false,remotingTransporter.getFlag().getSerializeType()));
    RpcResponse rpcResponse=new RpcResponse();
    RpcRequest rpcRequest=(RpcRequest)remotingTransporter.getBodyContent();
    Semaphore semaphore = serviceSemaphoreMap.get(rpcRequest.getInterfaceName());
    boolean acquire=false;
        try {
        // 處理 RPC 請求成功
        log.info("進入限流");
        acquire=semaphore.tryAcquire();
        if(acquire){
          Object result= handle(rpcRequest);
          rpcResponse.setResult(result);
        }

      } catch (Exception e) {
        // 處理 RPC 請求失敗
        rpcResponse.setException(e);
        log.error("handle result failure", e);
      } finally {
        if(acquire){
          semaphore.release();
          log.info("釋放訊號量");
        }
      }
    remotingTransporter.setBodyContent(rpcResponse);
    channelHandlerContext.writeAndFlush(remotingTransporter).addListener(ChannelFutureListener.CLOSE);
  }


  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    log.error("server caught exception", cause);
    ctx.close();
  }

  private Object handle(RpcRequest request) throws Exception {
    log.info("開始執行handle");
    // 獲取服務例項
    String serviceName = request.getInterfaceName();
    Object serviceBean = handlerMap.get(serviceName);
    if (serviceBean == null) {
      throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
    }
    // 獲取反射呼叫所需的變數
    Class<?> serviceClass = serviceBean.getClass();
    String methodName = request.getMethodName();
    log.info(methodName);
    Class<?>[] parameterTypes = request.getParameterTypes();
    log.info(parameterTypes[0].getName());
    Object[] parameters = request.getParameters();
    // 執行反射呼叫
    Method method = serviceClass.getMethod(methodName, parameterTypes);
    method.setAccessible(true);
    return method.invoke(serviceBean, parameters);
  }


}

RpcClientHandler 客戶端處理器

主要是將請求Id對應Response改成了對應RemotingTransporter,其他基本不變

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/11/1
 * @Description: TODO
 */
@Log4j2
public class RpcClientHandler extends SimpleChannelInboundHandler<RemotingTransporter> {

    private ConcurrentMap<Long,RemotingTransporter> remotingTransporterMap;

    public RpcClientHandler(ConcurrentMap<Long,RemotingTransporter> remotingTransporterMap){
        this.remotingTransporterMap=remotingTransporterMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RemotingTransporter remotingTransporter) throws Exception {
        log.info("read a Response,invokeId: "+remotingTransporter.getInvokeId());
        remotingTransporterMap.put(remotingTransporter.getInvokeId(),remotingTransporter);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        log.error("client caught exception",cause);
        ctx.close();
    }

}