1. 程式人生 > >基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇

基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇

基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇

前提

最近對網路程式設計方面比較有興趣,在微服務實踐上也用到了相對主流的RPC框架如Spring Cloud Gateway底層也切換為Reactor-Netty,像Redisson底層也是使用Netty封裝通訊協議,最近調研和準備使用的SOFARpc也是基於Netty封裝實現了多種協議的相容。因此,基於Netty造一個輪子,在SpringBoot的加持下,實現一個輕量級的RPC框架。這篇博文介紹的是RPC框架協議的定義以及對應的編碼解碼處理的實現。

依賴引入

截止本文(2020-01-12)編寫完成之時,Netty的最新版本為4.1.44.Final

,而SpringBoot的最新版本為2.2.2.RELEASE,因此引入這兩個版本的依賴,加上其他工具包和序列化等等的支援,pom檔案的核心內容如下:

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.61</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>28.1-jre</version>
        </dependency>
    </dependencies>

部分引數的序列化會依賴到FastJson或者Jackson,具體看偏好而定。

自定義協議的定義

為了提高協議傳輸的效率,需要定製一套高效的RPC協議,設計協議所需的欄位和型別。

基礎Packet欄位:

欄位名 欄位型別 欄位功能 備註
magicNumber int 魔數,類似於Java的位元組碼檔案的魔數是0xcafebase
version int 版本號 預留欄位,預設為1
serialNumber java.lang.String 請求流水號 十分重要,每個請求的唯一標識
messageType MessageType 訊息型別 自定義的列舉型別,見下面的MessageType
attachments Map<String, String> 附件 K-V形式,類似於HTTP協議中的Header
// 訊息列舉型別
@RequiredArgsConstructor
public enum MessageType {

    /**
     * 請求
     */
    REQUEST((byte) 1),

    /**
     * 響應
     */
    RESPONSE((byte) 2),

    /**
     * PING
     */
    PING((byte) 3),

    /**
     * PONG
     */
    PONG((byte) 4),

    /**
     * NULL
     */
    NULL((byte) 5),

    ;

    @Getter
    private final Byte type;

    public static MessageType fromValue(byte value) {
        for (MessageType type : MessageType.values()) {
            if (type.getType() == value) {
                return type;
            }
        }
        throw new IllegalArgumentException(String.format("value = %s", value));
    }
}

// 基礎Packet
@Data
public abstract class BaseMessagePacket implements Serializable {

    /**
     * 魔數
     */
    private int magicNumber;

    /**
     * 版本號
     */
    private int version;

    /**
     * 流水號
     */
    private String serialNumber;

    /**
     * 訊息型別
     */
    private MessageType messageType;

    /**
     * 附件 - K-V形式
     */
    private Map<String, String> attachments = new HashMap<>();

    /**
     * 新增附件
     */
    public void addAttachment(String key, String value) {
        attachments.put(key, value);
    }
}

請求Packet擴充套件欄位:

欄位名 欄位型別 欄位功能 備註
interfaceName java.lang.String 介面全類名
methodName java.lang.String 方法名
methodArgumentSignatures java.lang.String[] 方法引數簽名字串陣列 存放方法引數型別全類名字串陣列
methodArguments java.lang.Object[] 方法引數陣列 因為未知方法引數型別,所以用Object表示
@EqualsAndHashCode(callSuper = true)
@Data
public class RequestMessagePacket extends BaseMessagePacket {

    /**
     * 介面全類名
     */
    private String interfaceName;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 方法引數簽名
     */
    private String[] methodArgumentSignatures;

    /**
     * 方法引數
     */
    private Object[] methodArguments;
}

響應Packet擴充套件欄位:

欄位名 欄位型別 欄位功能 備註
errorCode java.lang.Long 響應碼
message java.lang.String 響應訊息 如果出現異常,message就是對應的異常資訊
payload java.lang.Object 訊息載荷 業務處理返回的訊息載荷,定義為Object型別
@EqualsAndHashCode(callSuper = true)
@Data
public class ResponseMessagePacket extends BaseMessagePacket {

    /**
     * error code
     */
    private Long errorCode;

    /**
     * 訊息描述
     */
    private String message;

    /**
     * 訊息載荷
     */
    private Object payload;
}

需要注意以下幾點:

  • 非基本型別在序列化和反序列化的時候,一定注意要先寫入或者先讀取序列的長度,以java.lang.String型別為例:
// 序列化 - 流水號
out.writeInt(packet.getSerialNumber().length());
out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);

// 反序列化 - 流水號
int serialNumberLength = in.readInt();
packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
  • 特殊編碼的字串在序列化的時候,要注意字串編碼的長度,例如UTF-8編碼下一個中文字元佔3個位元組,這一點可以抽取一個工具類專門處理字串的序列化:
public enum ByteBufferUtils {

    // 單例
    X;

    public void encodeUtf8CharSequence(ByteBuf byteBuf, CharSequence charSequence) {
        int writerIndex = byteBuf.writerIndex();
        byteBuf.writeInt(0);
        int length = ByteBufUtil.writeUtf8(byteBuf, charSequence);
        byteBuf.setInt(writerIndex, length);
    }
}
  • 方法引數陣列的序列化和反序列化方案需要定製,筆者為了簡化自定義協議,定義了方法引數簽名陣列,長度和方法引數陣列一致,這樣做方便後面編寫服務端程式碼的時候,簡化對方法引數陣列進行反序列化以及宿主類目標方法的查詢。注意一下Object[]的序列化和反序列化相對特殊,因為ByteBuf無法處理自定義型別的寫入和讀取(這個很好理解,網路程式設計就是面向01的程式設計):
write Object --> ByteBuf#writeInt() && ByteBuf#writeBytes()

read Object --> ByteBuf#readInt() && ByteBuf#readBytes() [<== 這個方法返回值是ByteBuf例項]
  • 最後注意釋放ByteBuf的引用,否則有可能導致記憶體洩漏。

自定義協議編碼解碼實現

自定義協議編碼解碼主要包括四個部分的編碼解碼器:

  • 請求Packet編碼器:RequestMessagePacketEncoder,主要用於客戶端把RequestMessagePacket例項序列化為二進位制序列。
  • 請求Packet解碼器:RequestMessagePacketDecoder,主要用於服務端把二進位制序列反序列化為RequestMessagePacket例項。
  • 響應Packet編碼器:ResponseMessagePacketEncoder,主要用於服務端把ResponseMessagePacket例項序列化為二進位制序列。
  • 響應Packet解碼器:ResponseMessagePacketDecoder,主要用於客戶端把二進位制序列反序列化為ResponseMessagePacket例項。

畫個圖描述一下幾個元件的互動流程(省略了部分入站和出站處理器):

序列化器Serializer的程式碼如下:

public interface Serializer {

    byte[] encode(Object target);

    Object decode(byte[] bytes, Class<?> targetClass);
}

// FastJson實現
public enum FastJsonSerializer implements Serializer {

    // 單例
    X;

    @Override
    public byte[] encode(Object target) {
        return JSON.toJSONBytes(target);
    }

    @Override
    public Object decode(byte[] bytes, Class<?> targetClass) {
        return JSON.parseObject(bytes, targetClass);
    }
}

請求Packet編碼器RequestMessagePacketEncoder的程式碼如下:

@RequiredArgsConstructor
public class RequestMessagePacketEncoder extends MessageToByteEncoder<RequestMessagePacket> {

    private final Serializer serializer;

    @Override
    protected void encode(ChannelHandlerContext context, RequestMessagePacket packet, ByteBuf out) throws Exception {
        // 魔數
        out.writeInt(packet.getMagicNumber());
        // 版本
        out.writeInt(packet.getVersion());
        // 流水號
        out.writeInt(packet.getSerialNumber().length());
        out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
        // 訊息型別
        out.writeByte(packet.getMessageType().getType());
        // 附件size
        Map<String, String> attachments = packet.getAttachments();
        out.writeInt(attachments.size());
        // 附件內容
        attachments.forEach((k, v) -> {
            out.writeInt(k.length());
            out.writeCharSequence(k, ProtocolConstant.UTF_8);
            out.writeInt(v.length());
            out.writeCharSequence(v, ProtocolConstant.UTF_8);
        });
        // 介面全類名
        out.writeInt(packet.getInterfaceName().length());
        out.writeCharSequence(packet.getInterfaceName(), ProtocolConstant.UTF_8);
        // 方法名
        out.writeInt(packet.getMethodName().length());
        out.writeCharSequence(packet.getMethodName(), ProtocolConstant.UTF_8);
        // 方法引數簽名(String[]型別) - 非必須
        if (null != packet.getMethodArgumentSignatures()) {
            int len = packet.getMethodArgumentSignatures().length;
            // 方法引數簽名陣列長度
            out.writeInt(len);
            for (int i = 0; i < len; i++) {
                String methodArgumentSignature = packet.getMethodArgumentSignatures()[i];
                out.writeInt(methodArgumentSignature.length());
                out.writeCharSequence(methodArgumentSignature, ProtocolConstant.UTF_8);
            }
        } else {
            out.writeInt(0);
        }
        // 方法引數(Object[]型別) - 非必須
        if (null != packet.getMethodArguments()) {
            int len = packet.getMethodArguments().length;
            // 方法引數陣列長度
            out.writeInt(len);
            for (int i = 0; i < len; i++) {
                byte[] bytes = serializer.encode(packet.getMethodArguments()[i]);
                out.writeInt(bytes.length);
                out.writeBytes(bytes);
            }
        } else {
            out.writeInt(0);
        }
    }
}

請求Packet解碼器RequestMessagePacketDecoder的程式碼如下:

@RequiredArgsConstructor
public class RequestMessagePacketDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext context, ByteBuf in, List<Object> list) throws Exception {
        RequestMessagePacket packet = new RequestMessagePacket();
        // 魔數
        packet.setMagicNumber(in.readInt());
        // 版本
        packet.setVersion(in.readInt());
        // 流水號
        int serialNumberLength = in.readInt();
        packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
        // 訊息型別
        byte messageTypeByte = in.readByte();
        packet.setMessageType(MessageType.fromValue(messageTypeByte));
        // 附件
        Map<String, String> attachments = Maps.newHashMap();
        packet.setAttachments(attachments);
        int attachmentSize = in.readInt();
        if (attachmentSize > 0) {
            for (int i = 0; i < attachmentSize; i++) {
                int keyLength = in.readInt();
                String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString();
                int valueLength = in.readInt();
                String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString();
                attachments.put(key, value);
            }
        }
        // 介面全類名
        int interfaceNameLength = in.readInt();
        packet.setInterfaceName(in.readCharSequence(interfaceNameLength, ProtocolConstant.UTF_8).toString());
        // 方法名
        int methodNameLength = in.readInt();
        packet.setMethodName(in.readCharSequence(methodNameLength, ProtocolConstant.UTF_8).toString());
        // 方法引數簽名
        int methodArgumentSignatureArrayLength = in.readInt();
        if (methodArgumentSignatureArrayLength > 0) {
            String[] methodArgumentSignatures = new String[methodArgumentSignatureArrayLength];
            for (int i = 0; i < methodArgumentSignatureArrayLength; i++) {
                int methodArgumentSignatureLength = in.readInt();
                methodArgumentSignatures[i] = in.readCharSequence(methodArgumentSignatureLength, ProtocolConstant.UTF_8).toString();
            }
            packet.setMethodArgumentSignatures(methodArgumentSignatures);
        }
        // 方法引數
        int methodArgumentArrayLength = in.readInt();
        if (methodArgumentArrayLength > 0) {
            // 這裡的Object[]實際上是ByteBuf[] - 後面需要二次加工為對應型別的例項
            Object[] methodArguments = new Object[methodArgumentArrayLength];
            for (int i = 0; i < methodArgumentArrayLength; i++) {
                int byteLength = in.readInt();
                methodArguments[i] = in.readBytes(byteLength);
            }
            packet.setMethodArguments(methodArguments);
        }
        list.add(packet);
    }
}

響應Packet編碼器ResponseMessagePacketEncoder的程式碼如下:

@RequiredArgsConstructor
public class ResponseMessagePacketEncoder extends MessageToByteEncoder<ResponseMessagePacket> {

    private final Serializer serializer;

    @Override
    protected void encode(ChannelHandlerContext ctx, ResponseMessagePacket packet, ByteBuf out) throws Exception {
        // 魔數
        out.writeInt(packet.getMagicNumber());
        // 版本
        out.writeInt(packet.getVersion());
        // 流水號
        out.writeInt(packet.getSerialNumber().length());
        out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
        // 訊息型別
        out.writeByte(packet.getMessageType().getType());
        // 附件size
        Map<String, String> attachments = packet.getAttachments();
        out.writeInt(attachments.size());
        // 附件內容
        attachments.forEach((k, v) -> {
            out.writeInt(k.length());
            out.writeCharSequence(k, ProtocolConstant.UTF_8);
            out.writeInt(v.length());
            out.writeCharSequence(v, ProtocolConstant.UTF_8);
        });
        // error code
        out.writeLong(packet.getErrorCode());
        // message
        String message = packet.getMessage();
        ByteBufferUtils.X.encodeUtf8CharSequence(out, message);
        // payload
        byte[] bytes = serializer.encode(packet.getPayload());
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}

響應Packet解碼器ResponseMessagePacketDecoder的程式碼如下:

public class ResponseMessagePacketDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        ResponseMessagePacket packet = new ResponseMessagePacket();
        // 魔數
        packet.setMagicNumber(in.readInt());
        // 版本
        packet.setVersion(in.readInt());
        // 流水號
        int serialNumberLength = in.readInt();
        packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
        // 訊息型別
        byte messageTypeByte = in.readByte();
        packet.setMessageType(MessageType.fromValue(messageTypeByte));
        // 附件
        Map<String, String> attachments = Maps.newHashMap();
        packet.setAttachments(attachments);
        int attachmentSize = in.readInt();
        if (attachmentSize > 0) {
            for (int i = 0; i < attachmentSize; i++) {
                int keyLength = in.readInt();
                String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString();
                int valueLength = in.readInt();
                String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString();
                attachments.put(key, value);
            }
        }
        // error code
        packet.setErrorCode(in.readLong());
        // message
        int messageLength = in.readInt();
        packet.setMessage(in.readCharSequence(messageLength, ProtocolConstant.UTF_8).toString());
        // payload - ByteBuf例項
        int payloadLength = in.readInt();
        packet.setPayload(in.readBytes(payloadLength));
        out.add(packet);
    }
}

核心的編碼解碼器已經編寫完,接著要注意一下TCP協議二進位制包傳送的時候只保證了包的傳送順序、確認傳送以及重傳,無法保證二進位制包是否完整(有些部落格也稱此類場景為粘包、半包等等,其實網路協議裡面並沒有定義這些術語,估計是有人杜撰出來),因此這裡採取了定長幀編碼和解碼器LengthFieldPrependerLengthFieldBasedFrameDecoder,簡單來說就是在訊息幀的開頭幾位定義了整個幀的長度,讀取到整個長度的訊息幀才認為是一個完整的二進位制報文。舉個幾個例子:

|<--------packet frame--------->|
| Length Field | Actual Content |
序號 Length Field Actual Content
0 4 abcd
1 9 throwable
2 14 {"name":"doge"}

編寫測試客戶端和服務端

客戶端程式碼如下:

@Slf4j
public class TestProtocolClient {

    public static void main(String[] args) throws Exception {
        int port = 9092;
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
            bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                    ch.pipeline().addLast(new LengthFieldPrepender(4));
                    ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
                    ch.pipeline().addLast(new ResponseMessagePacketDecoder());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
                            Object targetPayload = packet.getPayload();
                            if (targetPayload instanceof ByteBuf) {
                                ByteBuf byteBuf = (ByteBuf) targetPayload;
                                int readableByteLength = byteBuf.readableBytes();
                                byte[] bytes = new byte[readableByteLength];
                                byteBuf.readBytes(bytes);
                                targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
                                byteBuf.release();
                            }
                            packet.setPayload(targetPayload);
                            log.info("接收到來自服務端的響應訊息,訊息內容:{}", JSON.toJSONString(packet));
                        }
                    });
                }
            });
            ChannelFuture future = bootstrap.connect("localhost", port).sync();
            log.info("啟動NettyClient[{}]成功...", port);
            Channel channel = future.channel();
            RequestMessagePacket packet = new RequestMessagePacket();
            packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
            packet.setVersion(ProtocolConstant.VERSION);
            packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
            packet.setMessageType(MessageType.REQUEST);
            packet.setInterfaceName("club.throwable.contract.HelloService");
            packet.setMethodName("sayHello");
            packet.setMethodArgumentSignatures(new String[]{"java.lang.String"});
            packet.setMethodArguments(new Object[]{"doge"});
            channel.writeAndFlush(packet);
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

服務端程式碼如下:

@Slf4j
public class TestProtocolServer {

    public static void main(String[] args) throws Exception {
        int port = 9092;
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                            ch.pipeline().addLast(new LengthFieldPrepender(4));
                            ch.pipeline().addLast(new RequestMessagePacketDecoder());
                            ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<RequestMessagePacket>() {

                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
                                    log.info("接收到來自客戶端的請求訊息,訊息內容:{}", JSON.toJSONString(packet));
                                    ResponseMessagePacket response = new ResponseMessagePacket();
                                    response.setMagicNumber(packet.getMagicNumber());
                                    response.setVersion(packet.getVersion());
                                    response.setSerialNumber(packet.getSerialNumber());
                                    response.setAttachments(packet.getAttachments());
                                    response.setMessageType(MessageType.RESPONSE);
                                    response.setErrorCode(200L);
                                    response.setMessage("Success");
                                    response.setPayload("{\"name\":\"throwable\"}");
                                    ctx.writeAndFlush(response);
                                }
                            });
                        }
                    });
            ChannelFuture future = bootstrap.bind(port).sync();
            log.info("啟動NettyServer[{}]成功...", port);
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

這裡在測試的環境中,最大的訊息幀長度暫時定義為1024。先啟動服務端,再啟動客戶端,見控制檯輸出如下:

// 服務端
22:29:32.596 [main] INFO club.throwable.protocol.TestProtocolServer - 啟動NettyServer[9092]成功...
...省略其他日誌...
22:29:53.538 [nioEventLoopGroup-3-1] INFO club.throwable.protocol.TestProtocolServer - 接收到來自客戶端的請求訊息,訊息內容:{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":[{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false}],"methodName":"sayHello","serialNumber":"7f992c7cf9f445258601def1cac9bec0","version":1}

// 客戶端
22:31:28.360 [main] INFO club.throwable.protocol.TestProtocolClient - 啟動NettyClient[9092]成功...
...省略其他日誌...
22:31:39.320 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到來自服務端的響應訊息,訊息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"{\"name\":\"throwable\"}","serialNumber":"320808e709b34edbb91ba557780b58ad","version":1}

小結

一個基於Netty實現的簡單的自定義協議基本完成,但是要編寫一個優秀的RPC框架,還需要做服務端的宿主類和目標方法查詢、呼叫,客戶端的動態代理,NettyNIO模式下的同步呼叫改造,心跳處理,異常處理等等。後面會使用多篇文章逐個問題解決,網路程式設計其實挺好玩了,就是編碼量會比較大(゜-゜)つロ

Demo專案:

  • ch0-custom-rpc-protocol

(e-a-20200112 c-1-d