1. 程式人生 > >程式設計思想:如何設計一個好的通訊網路協議

程式設計思想:如何設計一個好的通訊網路協議

當網路中兩個程序需要通訊時,我們往往會使用 `Socket` 來實現。`Socket` 都不陌生。當三次握手成功後,客戶端與服務端就能通訊,並且,彼此之間通訊的資料包格式都是二進位制,由` TCP/IP` 協議負責傳輸。 當客戶端和服務端取得了二進位制資料包後,我們往往需要『萃取』出想要的資料,這樣才能更好的執行業務邏輯。所以,我們需要定義好資料結構來描述這些二進位制資料的格式,這就是通訊網路協議。簡單講,就是需要約定好二進位制資料包中每一段位元組的含義,比如從第 n 位元組開始的 m 長度是核心資料,有了這樣的約定後,我們就能解碼出想要的資料,執行業務邏輯,這樣我們就能暢通無阻的通訊了。 ### 網路協議的設計 **概要劃分** 一個最基本的網路協議必須包含 - 資料的長度 - 資料 瞭解 `TCP` 協議的同學一定聽說過`粘包、拆包` 這兩個術語。因為`TCP`協議是資料流協議,它的底層根據二進位制緩衝區的實際情況進行包的劃分。所以,不可避免的會出現`粘包,拆包` 現象 。為了解決它們,我們的網路協議往往會使用一個 4 位元組的 `int` 型別來表示資料的大小。比如,`Netty` 就為我們提供了 `LengthFieldBasedFrameDecoder` 解碼器,它可以有效的使用自定義長度幀來解決上述問題。 同時一個好的網路協議,還會將動作和業務資料分離。試想一下, `HTTP` 協議的分為請求頭,請求體—— - 請求頭:定義了介面地址、`Http Method`、`HTTP` 版本 - 請求體:定義了需要傳遞的資料 這就是一種分離關注點的思想。所以自定義的網路協議也可以包含: - 動作指令:比如定義 `code` 來分門別類的代表不同的業務邏輯 - 序列化演算法:描述了 `JAVA` 物件和二進位制之間轉換的形式,提供多種序列化/反序列化方式。比如 `json`、`protobuf` 等等,甚至是自定義演算法。比如:`rocketmq ` 等等。 同時,協議的開頭可以定義一個約定的`魔數`。這個固定值(4位元組),一般用來判斷當前的資料包是否合法。比如,當我們使用 `telnet` 傳送錯誤的資料包時,很顯然,它不合法,會導致解碼失敗。所以,為了減輕伺服器的壓力,我們可以取出資料包的前`4`個位元組與固定的`魔數`對比,如果是非法的格式,直接關閉連線,不繼續解碼。 **網路協議結構如下所示**: ```javascript +--------------+-----------+------------+-----------+----------+ | 魔數(4) | code(1) |序列化演算法(1) |資料長度(4) |資料(n) | +--------------+-----------+------------+-----------+----------+ ``` ### RocketMQ 通訊網路協議的實現 **RocketMQ 網路協議** 這一小節,我們從`RocketMQ` 中,分析優秀通訊網路協議的實現。`RocketMQ` 專案中,客戶端和服務端的通訊是基於 Netty 之上構建的。同時,為了更加有效的通訊,往往需要對傳送的訊息自定義網路協議。 `RocketMQ` 的網路協議,從資料分類的角度上看,可分為兩大類 - 訊息頭資料(Header Data) - 訊息體資料(Body Data) ![](https://img2020.cnblogs.com/blog/299214/202003/299214-20200330040625987-1566249621.png) 從左到右 - 第一段:4 個位元組整數,等於2、3、4 長度總和 - 第二段:4 個位元組整數,等於3 的長度。特別的 `byte[0]` 代表序列化演算法,`byte[1~3]`才是真正的長度 - 第三段:代表訊息頭資料,結構如下 ```java { "code":0, "language":"JAVA", "version":0, "opaque":0, "flag":1, "remark":"hello, I am respponse /127.0.0.1:27603", "extFields":{ "count":"0", "messageTitle":"HelloMessageTitle" } } ``` - 第四段:代表訊息體資料 **RocketMQ 訊息頭協議詳細如下:** | Header 欄位名 | 型別 | Request | Response | | ------------- | ---------------------- | ------------------------------------------------------------ | -------------------------------------------- | | code | 整數 | 請求操作程式碼,請求接收方根據不同的程式碼做不同的操作 | 應答結果程式碼,0表示成功,非0表示各種錯誤程式碼 | | language | 字串 | 請求發起方實現語言,預設JAVA | 應答接收方實現語言 | | version | 整數 | 請求發起方程式版本 | 應答接收方程式版本 | | opaque | 整數 | 請求發起方在同一連線上不同的請求標識程式碼,多執行緒連線複用使用 | 應答方不做修改,直接返回 | | flag | 整數 | 通訊層的標誌位 | 通訊層的標誌位 | | remark | 字串 | 傳輸自定義文字資訊 | 錯誤詳細描述資訊 | | extFields | HashMap | 請求自定義欄位 | 應答自定義欄位 | **編碼過程** `RocketMQ` 的通訊模組是基於 `Netty`的。通過定義 `NettyEncoder` 來實現對每一個 `Channel`的 出棧資料進行編碼,如下所示: ```java @ChannelHandler.Sharable public class NettyEncoder extends MessageToByteEncoder { @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { ... } } } ``` 其中,核心的編碼過程位於 `RemotingCommand` 物件中,`encodeHeader` 階段,需要統計出訊息總長度,即: - 定義訊息頭長度,一個整數表示:佔4個位元組 - 定義訊息頭資料,並計算其長度 - 定義訊息體資料,並計算其長度 - 額外再加 4是因為需要加入訊息總長度,一個整數表示:佔4個位元組 ```java public ByteBuffer encodeHeader(final int bodyLength) { // 1> 訊息頭長度,一個整數表示:佔4個位元組 int length = 4; // 2> 訊息頭資料 byte[] headerData; headerData = this.headerEncode(); // 再加訊息頭資料長度 length += headerData.length; // 3> 再加訊息體資料長度 length += bodyLength; // 4> 額外加 4是因為需要加入訊息總長度,一個整數表示:佔4個位元組 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 5> 將訊息總長度加入 ByteBuffer result.putInt(length); // 6> 將訊息的頭長度加入 ByteBuffer result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // 7> 將訊息頭資料加入 ByteBuffer result.put(headerData); result.flip(); return result; } ``` 其中,`encode` 階段會將 `CommandCustomHeader` 資料轉換 `HashMap`,方便序列化 ```java public void makeCustomHeaderToNet() { if (this.customHeader != null) { Field[] fields = getClazzFields(customHeader.getClass()); if (null == this.extFields) { this.extFields = new HashMap(); } for (Field field : fields) { if (!Modifier.isStatic(field.getModifiers())) { String name = field.getName(); if (!name.startsWith("this")) { Object value = null; try { field.setAccessible(true); value = field.get(this.customHeader); } catch (Exception e) { log.error("Failed to access field [{}]", name, e); } if (value != null) { this.extFields.put(name, value.toString()); } } } } } } ``` 特別的,訊息頭序列化支援兩種演算法: - `JSON` - `RocketMQ` ```java private byte[] headerEncode() { this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } } ``` 這兒需要值得注意的是,`encode`階段將當前 `RPC` 型別和 `headerData`長度編碼到一個 `byte[4]` 陣列中,`byte[0]` 位序列化型別。 ```java public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); result[1] = (byte) ((source >> 16) & 0xFF); result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; } ``` 其中,通過與運算 `& 0xFF` 取低八位資料。 所以, 最終 `length` 長度等於序列化型別 + header length + header data + body data 的位元組的長度。 **解碼過程** `RocketMQ` 解碼通過`NettyDecoder`來實現,它繼承自 `LengthFieldBasedFrameDecoder`,其中呼叫了父類`LengthFieldBasedFrameDecoder`的建構函式 ```java super(FRAME_MAX_LENGTH, 0, 4, 0, 4); ``` 這些引數設定`4`個位元組代表 `length`總長度,同時解碼時跳過最開始的`4`個位元組: ```java frame = (ByteBuf) super.decode(ctx, in); ``` 所以,得到的 `frame`= 序列化型別 + header length + header data + body data 。解碼如下所示: ```java public static RemotingCommand decode(final ByteBuffer byteBuffer) { //總長度 int length = byteBuffer.limit(); //原始的 header length,4位 int oriHeaderLen = byteBuffer.getInt(); //真正的 header data 長度。忽略 byte[0]的 serializeType int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; } private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; } ``` 其中,`getProtocolType`,右移 `24`位,拿到 `serializeType`: ```java public static SerializeType getProtocolType(int source) { return SerializeType.valueOf((byte) ((source >> 24) & 0xFF)); } ``` `getHeaderLength` 拿到 0-24 位代表的 `headerData` length: ```java public static int getHeaderLength(int length) { return length & 0xFFFFFF; } ``` ### 小結 對於諸多中介軟體而言,底層的網路通訊模組往往會使用 `Netty`。`Netty` 提供了諸多的編解碼器,可以快速方便的上手。本文從如何設計一個網路協議入手,最終切入到 `RocketMQ` 底層網路協議的實現。可以看到,它並不複雜。仔細研讀幾遍變能理解其奧義。具體參考類`NettyEncoder`、`NettyDecoder`、`RemotingCom