1. 程式人生 > >Netty(編解碼器框架)

Netty(編解碼器框架)

    每個網路應用程式都必須定義如何解析在兩個節點之間來回傳輸的原始位元組,以及如何將其和目標應用程式的資料格式做相互轉換。這種轉換邏輯由編解碼器處理,編解碼器  由編碼器和解碼器組成,它們每種都可以將位元組流從一種格式轉換為另一種格式。

    如果將訊息看作是對於特定的應用程式具有具體含義的結構化的位元組序列—它的資料。那麼編碼器是將訊息轉換為適合於傳輸的格式(最有可能的就是位元組流);而對應的解碼器則是將網路位元組流轉換回應用程式的訊息格式。因此,編碼器操作出站資料,而解碼器處理入站資料。

解碼器

因為解碼器是負責將入站資料從一種格式轉換到另一種格式的,所以Netty的解碼器實現了ChannelInboundHandler。

  • 將位元組解碼為訊息——ByteToMessageDecoder和ReplayingDecoder;
  • 將一種訊息型別解碼為另一種——MessageToMessageDecoder。

抽象類ByteToMessageDecoder 

    將位元組解碼為訊息(或者另一個位元組序列),Netty為它提供了一個抽象的基類:ByteToMessageDecoder。由於你不可能知道遠端節點是否會一次性地傳送一個完整的訊息,所以這個類會對入站資料進行緩衝,直到它準備好處理。

方法

描述

decode(

            ChannelHandlerContext ctx,

            ByteBuf in,

            List<Object> out)

必須實現的唯一抽象方法。

方法被呼叫時傳入一個包含傳入資料的ByteBuf,和一個新增解碼訊息的List。

對方法的呼叫會重複進行,知道沒有新元素被新增到List,或ByteBuf中沒有更多可讀取的位元組。

如果List不為空,它的內容會被傳遞給ChannelPipeline中的下一個ChannelInboundHandler

decodeLast(

            ChannelHandlerContext ctx,

            ByteBuf in,

            List<Object> out)

簡單呼叫decode()方法,

當Channel狀態為非活動時,這個方法會被呼叫一次。

可以重寫該方法已提供特殊處理

   假設接收了一個包含簡單int的位元組流,每個int都需要被單獨處理。在這種情況下,需要從入站ByteBuf中讀取每個int,並將它傳遞給ChannelPipeline中的下一個ChannelInboundHandler。為了解碼這個位元組流,需要擴充套件ByteToMessageDecoder類。 

public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {
            out.add(in.readInt());
        }
    }
}

編解碼器中的引用計數

    引用計數需要特別的注意。對於編碼器和解碼器來說,其過程也是相當的簡單:一旦訊息被編碼或者解碼,它就會被ReferenceCountUtil.release(message)呼叫自動釋放。如果你需要保留引用以便稍後使用,那麼你可以呼叫ReferenceCountUtil.retain(message)方法。這將會增加該引用計數,從而防止該訊息被釋放。

抽象類ReplayingDecoder

    ReplayingDecoder擴充套件了ByteToMessageDecoder類, 使得我們不必呼叫readableBytes()方法。它通過使用一個自定義的ByteBuf實現,ReplayingDecoderByteBuf,包裝傳入的ByteBuf實現了這一點,其將在內部執行該呼叫。

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

型別引數S指定了用於狀態管理的型別,其中Void代表不需要狀態管理。

public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {//擴充套件Replaying-Decoder<Void>以將位元組解碼為訊息
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)//傳入的ByteBuf是ReplayingDecoderByteBuf
            throws
            Exception {
        out.add(in.readInt());//從入站ByteBuf中讀取一個int,並將其新增到解碼訊息的List中
    }
}

    和之前一樣,從ByteBuf中提取的int將會被新增到List中。如果沒有足夠的位元組可用,這個readInt()方法的實現將會丟擲一個Error,其將在基類中被捕獲並處理。當有更多的資料可供讀取時,該decode()方法將會被再次呼叫。

注意ReplayingDecoderByteBuf的下面這些方面:

  • 並不是所有的ByteBuf操作都被支援,如果呼叫了一個不被支援的方法,將會丟擲一個UnsupportedOperationException;
  • ReplayingDecoder稍慢於ByteToMessageDecoder。

這裡有一個簡單的準則:如果使用ByteToMessageDecoder不會引入太多的複雜性,那麼請使用它;否則,請使用ReplayingDecoder。 

更多的解碼器

io.netty.handler.codec.LineBasedFrameDecoder—這個類在Netty內部也有使用,它使用了行尾控制字元(\n或者\r\n)來解析訊息資料;

io.netty.handler.codec.http.HttpObjectDecoder—一個 HTTP資料的解碼器。

在io.netty.handler.codec子包下面,你將會發現更多用於特定用例的編碼器和解碼器實現。

抽象類MessageToMessageDecoder

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

型別引數I指定了decode()方法的輸入引數msg的型別,它是你必須實現的唯一方法。

方法

描述

decode (

          ChannelHandlerContext ctx,

          I msg,

          List<Object> out)

對於每個需要被解碼為另一種格式的入站訊息來說,該方法都將被呼叫。

解碼訊息隨後會傳遞給 ChannelPipeline中的下一個ChannelInboundHandler

     示例,編寫一個IntegerToStringDecoder解碼器來擴充套件MessageTo-MessageDecoder<Integer>。它的decode()方法會把Integer引數轉換為它的String表示:

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer>//擴充套件了MessageToMessageDecoder<Integer>
{
    @Override
    protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));//將Integer訊息轉換為它的String表示,並將其新增到輸出的List中
    }
}

io.netty.handler.codec.http.HttpObjectAggregator類,它擴充套件了MessageToMessageDecoder<HttpObject>

TooLongFrameException類

    由於Netty是一個非同步框架,所以需要在位元組可以解碼之前在記憶體中緩衝它們。因此,不能讓解碼器緩衝大量的資料以至於耗盡可用的記憶體。為了解除這個常  見的顧慮,Netty提供了TooLongFrameException類,其將由解碼器在幀超出指定的大小限制時丟擲。

    為了避免這種情況,你可以設定一個最大位元組數的閾值,如果超出該閾值,則會導致丟擲一個TooLongFrameException(隨後會被ChannelHandler.exceptionCaught()方法捕獲)。然後,如何處理該異常則完全取決於該解碼器的使用者。某些協議(如HTTP)可能允許你返回一個特殊的響應。而在其他的情況下,唯一的選擇可能就是關閉對應的連線。

public class ToIntegerDecoder extends ByteToMessageDecoder {
    private static final int MAX_FRAME_SIZE= 1024;
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        if (readable  > MAX_FRAME_SIZE)//檢查緩衝區中是否有超過MAX_FRAME_SIZE個位元組
        {
            in.skipBytes(readable);//130第10章  編解碼器框架將Integer訊息轉換為它的String表示,並將其新增到輸出的List中跳過所有的可讀位元組,丟擲TooLongFrame-Exception並通知ChannelHandler
            throw new TooLongFrameException("Frametoo  big!");
        }
            if (readable >= 4) {
            out.add(in.readInt());
        }
    }
}

編碼器

編碼器實現了ChannelOutboundHandler,並將出站資料從一種格式轉換為另一種格式,和解碼器的功能正好相反。

  • 將訊息編碼為位元組;
  • 將訊息編碼為訊息。

抽象類MessageToByteEncoder

方法

描述

encode(

          ChannelHandlerContext ctx,

          I msg,

          ByteBuf out)

encode()方法是你需要實現的唯一抽象方法。它被呼叫時將會傳入要被該類編碼為ByteBuf的(型別為I的)出站訊息。該ByteBuf隨後將會被轉發給ChannelPipeline中的下一個ChannelOutboundHandler

    這個類只有一個方法,而解碼器有兩個。原因是解碼器通常需要在Channel關閉之後產生最後一個訊息(因此也就有了decodeLast()方法)。這顯然不適用於編碼器的場景——在連線被關閉之後仍然產生一個訊息是毫無意義的。

    示例,ShortToByteEncoder,其接受一個Short型別的例項作為訊息,將它編碼為Short的原子型別值,並將它寫入ByteBuf中,其將隨後被轉發給ChannelPipeline中的下一個ChannelOutboundHandler。每個傳出的Short值都將會佔用ByteBuf中的2位元組。

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    public void
    encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        out.writeShort(msg);//將Short寫入ByteBuf中
    }
}

 抽象類MessageToMessageEncoder

方法

描述

encode(ChannelHandlerContext ctx,I msg,List<Object> out) 這是你需要實現的唯一方法。每個通過write()方法寫入的訊息都將會被傳遞給encode()方法,以編碼為一個或者多個出站訊息。隨後,這些出站訊息將會被轉發給ChannelPipeline中的下一個ChannelOutboundHandler

示例,編碼器將每個出站Integer的String表示新增到了該List中。 

public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
    @Override
    public void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

抽象的編解碼器類

    它結合了ByteToMessageDecoder以及它的逆向——MessageToByteEncoder

方法名稱

描述

decode(

            ChannelHandlerContext ctx,

             ByteBuf in,

            List<Object>)

只要有位元組可以被消費,這個方法就將被呼叫。它將入站ByteBuf 轉換為指定的訊息格式,並將其轉發給ChannelPipeline 中的下一個ChannelInboundHandler

decodeLast(

            ChannelHandlerContext ctx,

            ByteBuf in,

            List<Object>  out)

這個方法預設實現委託給decode()方法。在Channel狀態變為非活動時被呼叫一次。可以被重寫以實現特殊處理。

encode(

            ChannelHandlerContext ctx,

            I msg,

            ByteBuf  out)

對每個將被編碼並寫入出站ByteBuf的訊息來說,這個方法都會被呼叫。

抽象類MessageToMessageCodec 

通過使用MessageToMessageCodec,我們可以在一個單個的類中實現該轉換的往返過程。

MessageToMessageCodec是一個引數化的類:

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler

方法名稱

描述

protected abstract decode(

            ChannelHandlerContext ctx,

            INBOUND_IN msg,

            List<Object> out)

這個方法被呼叫時會被傳入INBOUND_IN型別的訊息。它將把它們解碼為OUTBOUND_IN型別的訊息,這些訊息將被轉發給ChannelPipeline中的下一個Channel-InboundHandler

protected abstract encode(

            ChannelHandlerContext ctx,

           OUTBOUND_IN msg,

            List<Object> out)

對於每個OUTBOUND_IN型別的訊息,這個方法都將會被呼叫。這些訊息將會被編碼為INBOUND_IN型別的訊息,然後被轉發給ChannelPipeline中的下一個ChannelOutboundHandler

    decode()方法是  將INBOUND_IN型別的訊息轉換為OUTBOUND_IN型別的訊息,而encode()方法則進行它的逆向操作。將INBOUND_IN型別的訊息看作是通過網路傳送的型別,而將OUTBOUND_IN型別的訊息看作是應用程式所處理的型別,將可能有助於理解。

CombinedChannelDuplexHandler類

    這個類充當了ChannelInboundHandler和ChannelOutboundHandler(該類的型別引數I和O)的容器。通過提供分別繼承瞭解碼器類和編碼器類的型別,我們可以實現一個編解碼器,而又不必直接擴充套件抽象的編解碼器類。

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        while (in.readableBytes() >= 2) {
            out.add(in.readChar());
        }
    }
}

這裡的decode()方法一次將從ByteBuf中提取2位元組,並將它們作為char寫入到List中,其將會被自動裝箱為Character物件。

public class CharToByteEncoder extends MessageToByteEncoder<Character>
{
    @Override
    public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception
    {
        out.writeChar(msg);//將Character解碼為char,並將其寫入到出站ByteBuf中
    }
}

有了解碼器和編碼器,將它們來構建一個編解碼器。 

public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder>
{
    public CombinedByteCharCodec()
    {
        super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}

參考《Netty實戰》