1. 程式人生 > >netty原始碼解解析(4.0)-20 ChannelHandler: 自己實現一個自定義協議的伺服器和客戶端

netty原始碼解解析(4.0)-20 ChannelHandler: 自己實現一個自定義協議的伺服器和客戶端

  本章不會直接分析Netty原始碼,而是通過使用Netty的能力實現一個自定義協議的伺服器和客戶端。通過這樣的實踐,可以更深刻地理解Netty的相關程式碼,同時可以瞭解,在設計實現自定義協議的過程中需要解決的一些關鍵問題。

  本週章涉及到的程式碼可以從github上下載: https://github.com/brandonlyg/tinytransport.git。

 

設計協議

  本章要設計的協議是基於TCP的應用層協議。在設計一個協議之前需要先回答以下幾個問題:

  • 使用場景是什麼?
  • 這個協議有哪些功能?
  • 效能上有什麼要求?
  • 對網路頻寬有什麼要求?
  • 安全上有哪些要求?  

  接下來依次回答這些問題:

  

  使用場景

  在可信任的內部網路中,不同程序之間高速交換訊息。

 

  功能

  • 在客戶端和伺服器進行訊息交換。
  • 傳送訊息然後非同步接收響應。
  • 客戶端和伺服器之間可以保持長連線。
  • 傳輸大量的資料。

 

  效能

  資料包的提取效能接近記憶體copy。

  

  擴充套件性

  可以通過擴充套件header欄位,進而擴充套件協議的功能。

 

  頻寬

  儘量少的冗餘資料,佔用儘量小的頻寬。

  

  安全

  由於是在可信任的內網中互動訊息,沒有特別端安全性要求。

 

  這些問題的答案,就是整個協議的設計要求。下面就按照這些設計要求來設計一套完整的協議,具體類容包括以下兩個部分:

  • 資料包的格式。
  • 客戶端和伺服器端訊息的互動規則。

 

資料包格式的設計

  設計自己的資料包格式之前,我們先來回顧以下LengthFieldBasedFrameDecoder能夠處理的資料包格式: 

  | header | contentLength | conent | 

  這個類把header的設計留給了子類,現在我們的注意力只需要集中在header欄位上即可。下面是header設計:

  | begin | version | cmd | contentType | compression | sequenceId | resCode | 

  整個資料包的格式就是:

  | begin | version | cmd | contentType | compression | sequenceId | resCode | contentLength | content |

  現在來看一下這個資料包能實現哪些設計要求。

  begin

  型別: 32位無符號整數(uint32),這欄位是一個常量,用來準確第定位到資料包的開始位置,這樣就能更準確地分離出資料包,進而保證了“客戶端和伺服器端進行訊息交換”。它的設計還要平衡資料包提取效能和準確性。嚴格來說,資料包中只能有一個begin,形式化描述如下:

  1. 設一個數據包P的長度是L,P(i)表示資料包中任意一個Byte,begin=0XADEF4BC9(這個值可以任意選擇,儘量不選擇有意義的數字)。

  2. 設反序列化一個uint32的演算法是ui=deserUint32(i), i>=0 && i < L。

  3. 必須滿足: deserUint32(0) == begin, 且deserUint32(i) != begin, i > 0 && i < L。

  要在(1)(2)兩個前提條件下滿足第(3)點,需要設計一個轉義符EC=0xFF, 對P中除begin以外的部分進行轉義,轉義演算法是:

  如果deserUint32(i)==begin或P(i)==EC,  在P(i)前面插入EC。

  找到begin的演算法是:

  如果deserUint32(i)==begin且P(i-1)!=EC。

  逆轉義演算法是:

  如果P(i)==EC, P(i+1)==EC或deserUint32(i+1)==begin,  刪除P(i)。

  以上使用轉義符的方案,雖然能夠準確地找到begin,但演算法複雜度是O(L),顯然不能滿足“接近記憶體copy"這個要求。但是如果不使用轉義符,就可以達到這個效能要求。如果仔細計算一下begin重複的概率就會發現, 它的重複概率只有1/0x100000000,如果再結合length欄位一起檢查資料包的正確性,得到錯誤資料包的概率就會更低。不使用轉義符,以極小的出錯概率換取效能大幅提升是一筆合適的買賣。

  總的來說,begin可以滿足兩個設計要求: 訊息交換,資料包的提取效能接近記憶體copy。

  

  version

  型別:uint8。協議的版本號,這個欄位用來滿足“擴充套件性”要求。每個version對應一種不同的header結構,換言之,知道了版本號,就知道怎樣解析header。 

 

  cmd

  型別: uint8。這個欄位用來定義不同資料包的功能。可以使用這個欄位定義心跳資料包,使用心跳資料包讓"伺服器和客戶端保持長連線"。此外業務層可使用這個欄位定義自己需要的資料包。

 

  contentType

  型別: uint8。這個欄位是content的型別。使用這個欄位可以在content資料交給業務層之前,對他進行一下特殊的處理。使用者可以定義自己的的訊息型別。它可以加"訊息交換"的能力。

  

  compression

  型別: uint8。 壓縮演算法。這個欄位可以用來表示content使用的壓縮演算法。通過使用適當的壓縮演算法,壓縮滿足"傳輸大量資料"和"頻寬"的要求。

  

  sequenceId

  型別: uint32。這個欄位是資料包的唯一序列號。只需要保證在一個socket連線建立-斷開週期內保證它的唯一性即可。使用這個ID,可以實現“傳送訊息然後非同步接收響應”。

  

  resCode

  型別: uint8。響應資料包的狀態碼,用來在響應資料包中附帶異常資訊。  

 

  至此資料包的格式已經設計完畢。接下來設計必要的互動規則。

 

協議互動規則設計

  使用心跳保持長連線

  cmd: PING(0x01), PONG(0x02)。客戶端連線到伺服器之後,每隔一段時間傳送一個PING包,伺服器端收到之後立即響應PONG包。伺服器端在一個超時時間後沒有收到PING就認為TCP連線不可用,主動端開。客戶端在傳送PING之後,經過一個超時時間後沒有收到PONG就認為連線不可用,重新建立連線。

 

  訊息的請求和響應

  cmd: REQUEST(0x10), RESPONSE(0x02)。客戶端使用REQUEST包向伺服器傳送請求,服務使用RESPONSE包響應。請求和響應的sequenceId一致。

  

  推送訊息

  cmd: PUSH(0x20)。使用PUSH向對方推送訊息,不需要響應。

 

程式碼分析

  這個輕量級的客戶端和伺服器框架在架構上分為4個部分:

  • 資料包: Frame, FrameDecoder, FrameEncoder, FrameGzipCodec。
  • 訊息: FMessage, FrameToMessageDecoder, MessageToFrameEncode, FMessageHandler, FMessageTrait, FMTraits。
  • 客戶端框架: TcpConnector, TcpClient。
  • 伺服器端框架: TcpServer。

 

  由於前面已經詳細講解了設計原理,這裡只重點分析一下關鍵程式碼。

  Frame

  Frame是資料包型別,它的主要功能是資料包的序列化(encode方法)和反序列化(decode)。

  序列化方法:

 1 /**
 2      * 把Frame物件編碼成資料包
 3      * @param out
 4      */
 5     public void encode(ByteBuf out){
 6         out.writeInt(BEGIN);
 7         out.writeByte(header.getVersion());
 8         out.writeByte(header.getCmd().getValue());
 9         out.writeByte(header.getContentType());
10         out.writeByte(header.getCompression());
11         out.writeInt(header.getSequenceId());
12         out.writeByte(header.getResCode());
13 
14         int contentLength = 0;
15         if(null != content){
16             contentLength = content.readableBytes();
17         }
18         if(contentLength > MAX_CONTENT_LENGTH){
19             throw new TooLongFrameException("content too long. contentLength:"+contentLength);
20         }
21         out.writeShort(contentLength);
22         if(null != content){
23             out.writeBytes(content);
24         }
25     }

  6-12行,序列化header中除contentLength的其他欄位。

  14-21行,序列化contentLength欄位。

  22-24行,序列content。

 

  反序列化方法

 1 /**
 2      * 從資料包解碼得到Frame
 3      * @param in 一個完整的資料包
 4      * @return Frame物件
 5      */
 6     public static Frame decode(ByteBuf in){
 7         if(in.readableBytes() < HEADER_LENGTH){
 8             throw new CorruptedFrameException("pack length less than header length("+HEADER_LENGTH+")");
 9         }
10 
11         //得到header
12         Header header = new Header();
13         in.readInt();
14         header.setVersion(in.readByte());
15         header.setCmd(Command.valueOf(in.readByte() & 0xFF));
16         header.setContentType((byte)(in.readByte() & 0xFF));
17         header.setCompression((byte)(in.readByte() & 0xFF));
18         header.setSequenceId(in.readInt());
19         header.setResCode((byte)(in.readByte() & 0xFF));
20 
21         //讀出content
22         int contentLength = in.readShort() & 0xFFFF;
23         if(in.readableBytes() != contentLength){
24             throw new CorruptedFrameException("content is not match."+in.readableBytes() + "-" + contentLength);
25         }
26 
27         ByteBuf content = contentLength > 0 ? in.retainedSlice(in.readerIndex(), contentLength) : null;
28         in.skipBytes(contentLength);
29 
30         //建立Frame物件
31         Frame frame = new Frame();
32         frame.setHeader(header);
33         frame.setContent(content);
34 
35         if(null != content) content.release();
36 
37         return frame;
38     }

  這段程式碼,註釋已經比較清晰了,這裡就不再多說。

 

  FrameDecoder

   這個類繼承了LengthFieldBasedFrameDecoder,所以只需要很少的程式碼就可以從Byte流中分離出資料包。

 1     public FrameDecoder(){
 2         super(Frame.MAX_LENGTH, Frame.HEADER_LENGTH - 2, 2);
 3     }
 4 
 5     @Override
 6     protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
 7         //找到begin位置
 8         int start = in.readerIndex();
 9         int begin = in.getInt(start + 0);
10         if(begin != Frame.BEGIN){
11             dropFailedData(in);
12         }
13 
14         //解碼得到Frame物件
15         ByteBuf dataPack = null;
16         try{
17             dataPack = (ByteBuf)super.decode(ctx, in);
18             Frame frame = Frame.decode(dataPack);
19             return frame;
20         }finally {
21             if(null != dataPack){
22                 dataPack.release();
23             }
24         }
25     }

  2行,設定了資料包的最大長度Frame.MAX_LENGTH, 資料包header除contentLength之外的長度Frame.HEADER_LENGTH-2, contentLength欄位的長度。這樣,只要正確地找到資料包的開始位置就能LengthFieldBasedFrameDecoder就能幫助我們把資料包提取出來。

  8-12行,確定資料包的開始位置。

  17-18行,提取資料包,並把資料包反序列化成Frame。

 

  FMessageTrait

  為了能夠靈活地處理FMessage的content, 框架中定義了FMessageTrait介面,可以使用不同個FMessageTrait實現處理不同的content型別。

 1 /**
 2  * FMessage訊息特徵介面,根據不同的contentType進行Frame和FMessage之間的轉換
 3  */
 4 public interface FMessageTrait {
 5 
 6     /**
 7      * 得到匹配的contentType
 8      * @return contentType的值
 9      */
10     int getContentType();
11 
12     /**
13      * 把FMessage轉換成Frame
14      * @param fmsg
15      * @return
16      * @throws EncoderException
17      */
18     Frame encode(FMessage fmsg) throws EncoderException;
19 
20     /**
21      * 把Frame轉換成FMessage
22      * @param frame
23      * @return
24      * @throws DecoderException
25      */
26     FMessage decode(Frame frame) throws DecoderException;
27 }

  FrameToMessageDecoder和MessageToFrameEncoder使用FMessageTrait進行FMessage和Frame之間的轉換。

 1 /**
 2  * 把Frame轉換成FMessage
 3  */
 4 @ChannelHandler.Sharable
 5 public class FrameToMessageDecoder extends MessageToMessageDecoder<Frame> {
 6 
 7     private Map<Integer, FMessageTrait> fmTraits = new HashMap<>();
 8 
 9 
10     public void addFMessageTrait(FMessageTrait trait){
11         fmTraits.put(trait.getContentType(), trait);
12     }
13 
14     @Override
15     protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception {
16         int contentType = frame.getHeader().getContentType();
17         FMessageTrait trait = fmTraits.get(contentType);
18         if(null == trait){
19             throw new EncoderException("can't find trait. contentType:"+contentType);
20         }
21 
22         FMessage fmsg = trait.decode(frame);
23         out.add(fmsg);
24     }
25 }

  10-12行,把FMessageTrait放入map中。構建contentType-FMessageTrait之間的對映。

  17行,從map中得到FMessageTrait。

  22行,使用FMessageTrait把Frame轉換成FMessage。

  MessageToFrameEncoder的實現類似。不同的是在22處呼叫FMessageTrait的encode方法把FMessage轉換成Frame。

  FMTraits中給出了幾種常見的FMessageTrait實現:

  • FMTraitBytes:  處理byte array型別的content。
  • FMTraitString: 處理String型別的content。
  • FMTraitJson: 處理Json格式是content。
  • FMTraitProtobuf: 處理protobuf格式的content。

  他們都有一個共同的祖先AbstractFMTrait, 這個抽象類實現FMessageTrait的encode和decode方法,定義了兩個抽象方法encodeContent和decodeContent,子類只需專注於content的處理就可以了。

  下面以FMTraitBytes為例,講解一下FMessageTrait的具體實現。FMTraitBytes處理的FMessage型別要求conent是byte[]型別。

 1     public static final int BYTES = 0x01;
 2     public static final FMessageTrait FMTBytes = new FMTraitBytes();
 3     public static class FMTraitBytes extends AbstractFMTrait {
 4         protected int contentType;
 5 
 6         public FMTraitBytes(){
 7             this(BYTES);
 8         }
 9 
10         public FMTraitBytes(int contentType){
11             this.contentType = contentType;
12         }
13 
14         @Override
15         public int getContentType() {
16             return contentType;
17         }
18 
19         @Override
20         protected ByteBuf encodeContent(FMessage fmsg) throws EncoderException{
21             byte[] bytes = (byte[])fmsg.getContent();
22 
23             ByteBuf buf = null;
24             if(null != bytes && bytes.length > 0){
25                 buf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
26                 buf.writeBytes(bytes);
27             }
28 
29             return buf;
30         }
31 
32         @Override
33         protected Object decodeContent(Frame frame) throws DecoderException {
34             ByteBuf buf = frame.getContent();
35             byte[] bytes = null;
36             if(null != buf && buf.readableBytes() > 0){
37                 bytes = new byte[buf.readableBytes()];
38                 buf.readBytes(bytes);
39             }
40 
41             return bytes;
42         }
43     }

  6-17行,實現了contentType的設定和獲取。

  21-29行,把FMessage的content轉換成ByteBuf。

  34-42行, 發Frame的content轉換成byte[]。

 

  FMessageHandler

  這是一個專門用來處理FMessage的ChannelInboundHandler。channelRead0方法負責把不同cmd的FMessage派發到專用方法處理,這些方法有:

  • onPing: 收到PING, 會自動響應一個PONG。
  • onPong: 收到PONG。
  • onRequest: 收到REQUEST。
  • onResponse: 收到RESPONSE。
  • onPush: 收到PUSH。

 

  客戶端框架

  TcpConnector功能是發起連線,它的主要功能集中在以下三個方法中。

 1    public void addFMessageTrait(FMessageTrait trait){
 2         fmEncoder.addFMessageTrait(trait);
 3         fmDecoder.addFMessageTrait(trait);
 4     }
 5 
 6     public TcpClient connect(InetSocketAddress address) throws Exception{
 7         ChannelFuture future = bootstrap.connect(address);
 8         Channel channel = future.channel();
 9 
10         TcpClient client = new TcpClient(channel, workerElg.next());
11         channel.attr(TcpClient.CLIENT).set(client);
12 
13         future.sync();
14 
15         return client;
16     }
17 
18    protected void doInitChannel(SocketChannel ch) throws Exception {
19         ChannelPipeline pl = ch.pipeline();
20 
21         pl.addLast(H_FRAME_DECODER, new FrameDecoder());
22         pl.addLast(H_FRAME_ENCODER, frameEncoder);
23 
24         pl.addLast(H_READ_TIMEOUT, new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
25 
26         pl.addLast(H_FM_DECODER, fmDecoder);
27         pl.addLast(H_FM_ENCODER, fmEncoder);
28 
29         pl.addLast(H_FM_HANDLER, clientHandler);
30     }

  addFMessageTrait設定FMessageTrait,開發者可以根據需要定製FMessage的處理能力,FMTraitBytes會預設新增。

  connect用來發起連線,建立TcpClient物件。

  doInitChannel初始化Channel, 開發者可以覆蓋這個方法,定製channel的ChannelHandler。

  另外,TcpConnector內部實現了一個FMessageHandler的派生類ClientHandler。這個類的channelActive方法中啟動一個定時任務定時傳送PING。onResponse方法負責呼叫TcpClient的onResponse方法。

 

  TcpClient是客戶端連線物件,它主要有兩個方法:

  public boolean send(FMessage msg);

  public Promise<FMessage> send(FMessage msg, TimeUnit timeUnit, long timeout);

  第一個不處理響應。第二個可以非同步數量響應。

  另外還有一個給TcpConnector使用的onResponse方法,用來觸發第二個send返回Promise物件的回撥。

 

  伺服器端框架

  TcpServer是伺服器端框架,它比較簡單。開發者只需要覆蓋doInitChannel,新增自己的ChannelHandler,就可以實現伺服器端的定製。  

  

  

  

  

&n