1. 程式人生 > >簡易RPC框架-私有協議棧

簡易RPC框架-私有協議棧

rem nowrap adding document list highlight fine repl alt

HTTP協議

客戶機與服務端之間的數據交互需要遵守一定的約定,比如協議版本,數據類型,是否有緩存,是否有壓縮等,只有在這些約定的基礎上才能相互之間愉快的工作。

技術分享

Netty通信過程中的編解碼

這時說的是基於TCP/IP的Netty之間的通信。TCP/IP協議下客戶端與服務端之間要進行數據交互,一般需要將數據轉換成二進制格式,直接傳java bean是不能支持的。在RPC模式下客戶端在向服務端發起請求前需要將數據做編碼,服務端在接收客戶端發的數據後需要做解碼之後才能正常工作。

  • 解碼流程

技術分享

  • 編碼流程

技術分享

Netty 私有協議棧

為了更好的控制RPC客戶端與服務端之間的通信,也可以編寫私有的協議棧來支撐。

定義消息體

類似HTTP協議,包含頭信息以及內容信息。


public class RpcMessage implements Serializable {

    private RpcMessageHeader messageHeader;

    private Object messageBody;

}

頭信息,包含內容體長度,消息類型等信息。可以根據消息類型來做不同的業務,比如區分是心跳信息還是業務或者是監控之類的信息。


public class RpcMessageHeader implements Serializable {
    private int length;

    private int type;
   
}

定義解碼器

因為TCP/IP協議容易出現粘包拆包現象,這裏為了簡單直接選擇繼承組件提供的LengthFieldBasedFrameDecoder,只需要重寫下面的方法即可:


 public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame=(ByteBuf)super.decode(ctx,in);
        if(null==frame){
            return null;
        }

        RpcMessage message=new RpcMessage();
        RpcMessageHeader messageHeader=new RpcMessageHeader();
        messageHeader.setLength(frame.readInt());
        message.setMessageHeader(messageHeader);

        byte[] data = new byte[message.getMessageHeader().getLength()];
        frame.readBytes(data);

        Object obj = ProtoStuffSerializeUtil.deserialize(data, genericClass);
        message.setMessageBody(obj);
        return message;
    }

定義編碼器

編碼器繼承MessageToByteEncoder,將對象轉換成字節的編碼器


public class RpcEncoder extends MessageToByteEncoder<RpcMessage>

重點是下面的編碼函數,在ByteBuf中輸出數據長度以及數據體,如有其它需要可以補充其它的字段,比如消息類型。


 public void encode(ChannelHandlerContext ctx, RpcMessage in, ByteBuf out) throws Exception {
        if(null==in){
            throw new RpcException("RpcMessage is null");
        }
        if (genericClass.isInstance(in.getMessageBody())) {
            byte[] data = ProtoStuffSerializeUtil.serialize(in.getMessageBody());
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }

ServerHandle

  • 修改服務端執行器消息實體類型為新定義的RpcMessage

public class RpcServerInvoker extends AbstractInvoker<RpcMessage> 
  • 修改服務端回調

從服務端方法獲取到返回的結果後,重新封裝成消息對象(RpcMessage)發送給客戶端。


protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) {

        this.executor.execute(new Runnable() {
            @Override
            public void run() {
                RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);
                RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation((RpcRequest) message.getMessageBody()));
                RpcMessage responseMessage=new RpcMessage();
                byte[] data = ProtoStuffSerializeUtil.serialize(response);
                RpcMessageHeader messageHeader=new RpcMessageHeader();
                messageHeader.setLength(data.length);
                responseMessage.setMessageHeader(messageHeader);
                responseMessage.setMessageBody(response);
                channelHandlerContext.writeAndFlush(responseMessage);
            }
        });

    }

ClientHandle

  • 修改客戶端執行器消息實體類型為新定義的RpcMessage

public class RpcClientInvoker extends AbstractInvoker<RpcMessage>
  • 修改客戶端回調方法

接收的返回結果修改為RpcMessage,從body屬性中獲取原來的RpcResponse對象


public void channelRead0(ChannelHandlerContext ctx, RpcMessage message) {
        RpcResponse response=(RpcResponse) message.getMessageBody();
        String requestId = response.getRequestId();
        ResponseFuture responseFuture = pendingRPC.get(requestId);
        if (responseFuture != null) {
            pendingRPC.remove(requestId);
            responseFuture.done(response);
        }
    }
  • 修改發送請求的消息對象,組裝成RpcMessage發送

public ResponseFuture invoke(RpcInvocation invocation) {
        RpcRequest request=this.getRpcRequest();
        ResponseFuture responseFuture = new ResponseFuture(request);
        pendingRPC.put(request.getRequestId(), responseFuture);
        RpcMessage message=new RpcMessage();
        byte[] data = ProtoStuffSerializeUtil.serialize(request);
        RpcMessageHeader messageHeader=new RpcMessageHeader();
        messageHeader.setLength(data.length);
        message.setMessageHeader(messageHeader);
        message.setMessageBody(request);
        channel.writeAndFlush(message);
        return responseFuture;
    }

本文源碼

https://github.com/jiangmin168168/jim-framework

文中代碼是依賴上述項目的,如果有不明白的可下載源碼

引用

  • 文中插圖來自來網絡
  • 文中的思路參考了Netty權威指南

簡易RPC框架-私有協議棧