1. 程式人生 > >netty解碼器詳解(小白也能看懂!)

netty解碼器詳解(小白也能看懂!)

什麼是編解碼器?

  首先,我們回顧一下netty的元件設計:Netty的主要元件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等。

ChannelHandler

  ChannelHandler充當了處理入站和出站資料的應用程式邏輯的容器。例如,實現ChannelInboundHandler介面(或ChannelInboundHandlerAdapter),你就可以接收入站事件和資料,這些資料隨後會被你的應用程式的業務邏輯處理。當你要給連線的客戶端傳送響應時,也可以從ChannelInboundHandler沖刷資料。你的業務邏輯通常寫在一個或者多個ChannelInboundHandler中。ChannelOutboundHandler原理一樣,只不過它是用來處理出站資料的。

ChannelPipeline

  ChannelPipeline提供了ChannelHandler鏈的容器。以客戶端應用程式為例,如果事件的運動方向是從客戶端到服務端的,那麼我們稱這些事件為出站的,即客戶端傳送給服務端的資料會通過pipeline中的一系列ChannelOutboundHandler,並被這些Handler處理,反之則稱為入站的

編碼解碼器

  當你通過Netty傳送或者接受一個訊息的時候,就將會發生一次資料轉換。入站訊息會被解碼:從位元組轉換為另一種格式(比如java物件);如果是出站訊息,它會被編碼成位元組。

  Netty提供了一系列實用的編碼解碼器,他們都實現了ChannelInboundHadnler或者ChannelOutcoundHandler介面。在這些類中,channelRead方法已經被重寫了。以入站為例,對於每個從入站Channel讀取的訊息,這個方法會被呼叫。隨後,它將呼叫由已知解碼器所提供的decode()方法進行解碼,並將已經解碼的位元組轉發給ChannelPipeline中的下一個ChannelInboundHandler。

解碼器

抽象基類ByteToMessageDecoder

由於你不可能知道遠端節點是否會一次性發送一個完整的資訊,tcp有可能出現粘包拆包的問題,這個類會對入站資料進行緩衝,直到它準備好被處理。

主要api有兩個:

複製程式碼

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            // Only call decode() if there is something left in the buffer to decode.
            // See https://github.com/netty/netty/issues/4386
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }
}

複製程式碼

decode方法:

  必須實現的方法,ByteBuf包含了傳入資料,List用來新增解碼後的訊息。對這個方法的呼叫將會重複進行,直到確定沒有新的元素被新增到該List,或者該ByteBuf中沒有更多可讀取的位元組時為止。然後如果該List不會空,那麼它的內容將會被傳遞給ChannelPipeline中的下一個ChannelInboundHandler。

decodeLast方法:

  當Channel的狀態變成非活動時,這個方法將會被呼叫一次。

 

最簡單的例子:

複製程式碼

public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {
            out.add(in.readInt());
        }
    }
}

複製程式碼

這個例子,每次入站從ByteBuf中讀取4位元組,將其解碼為一個int,然後將它新增到下一個List中。當沒有更多元素可以被新增到該List中時,它的內容將會被髮送給下一個ChannelInboundHandler。int在被新增到List中時,會被自動裝箱為Integer。在呼叫readInt()方法前必須驗證所輸入的ByteBuf是否具有足夠的資料。

 

一個實用的例子:

複製程式碼

public class MyDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
     //在讀取前標記readerIndex
        in.markReaderIndex();
     //讀取頭部
        int length = in.readInt();
        if (in.readableBytes() < length) {
     //訊息不完整,無法處理,將readerIndex復位
            in.resetReaderIndex();
            return;
        }
        out.add(in.readBytes(length).toString(CharsetUtil.UTF_8));
    }
}

複製程式碼

 

抽象類ReplayingDecoder

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

  ReplayingDecoder擴充套件了ByteToMessageDecoder類,使用這個類,我們不必呼叫readableBytes()方法。引數S指定了使用者狀態管理的型別,其中Void代表不需要狀態管理。

以上程式碼可以簡化為:

複製程式碼

public class MySimpleDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //傳入的ByteBuf是ReplayingDecoderByteBuf
        //首先從入站ByteBuf中讀取頭部,得到訊息體長度length,然後讀取length個位元組, 
        //並新增到解碼訊息的List中
        out.add(in.readBytes(in.readInt()).toString(CharsetUtil.UTF_8));
    }

複製程式碼

如何實現的?

ReplayingDecoder在呼叫decode方法時,傳入的是一個自定義的ByteBuf實現:

final class ReplayingDecoderByteBuf extends ByteBuf 

ReplayingDecoderByteBuf在讀取資料前,會先檢查是否有足夠的位元組可用,以readInt()為例:

複製程式碼

final class ReplayingDecoderByteBuf extends ByteBuf {

    private static final Signal REPLAY = ReplayingDecoder.REPLAY;

    ......    

     @Override
    public int readInt() {
        checkReadableBytes(4);
        return buffer.readInt();
    }

    private void checkReadableBytes(int readableBytes) {
        if (buffer.readableBytes() < readableBytes) {
            throw REPLAY;
        }
    }  

    ......

}

複製程式碼

如果位元組數量不夠,會丟擲一個Error(實際是一個Signal public final class Signal extends Error implements Constant<Signal> ),然後會在上層被捕獲並處理,它會把ByteBuf中的ReadIndex恢復到讀之前的位置,以供下次讀取。當有更多資料可供讀取時,該decode()方法將會被再次呼叫。最終結果和之前一樣,從ByteBuf中提取的String將會被新增到List中。

 

雖然ReplayingDecoder使用方便,但它也有一些侷限性:

1. 並不是所有的 ByteBuf 操作都被支援,如果呼叫了一個不被支援的方法,將會丟擲一個 UnsupportedOperationException。

2. ReplayingDecoder 在某些情況下可能稍慢於 ByteToMessageDecoder,例如網路緩慢並且訊息格式複雜時,訊息被拆成了多個碎片,於是decode()方法會被多次呼叫反覆地解析一個訊息。

3. 你需要時刻注意decode()方法在同一個訊息上可能被多次呼叫.。

錯誤用法:

一個簡單的echo服務,客戶端在連線建立時,向服務端傳送訊息(兩個1)。服務端需要一次拿到兩個Integer,並做處理。

EchoServerHandler

複製程式碼

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg from client: " + msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

複製程式碼

 

EchoClientHandler

複製程式碼

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("sent to server: 11");
        ctx.writeAndFlush(1);
        
        Thread.sleep(1000);
        ctx.writeAndFlush(1);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

複製程式碼

 

解碼器

複製程式碼

public class MyReplayingDecoder extends ReplayingDecoder<Void> {

    private final Queue<Integer> values = new LinkedList<>();

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        values.add(in.readInt());
        values.add(in.readInt());

        assert values.size() == 2;

        out.add(values.poll() + values.poll());
    }
}

複製程式碼

執行程式,就會發現斷言失敗。

我們通過在decode()方法中列印日誌或者打斷點的方式,可以看到,decode()方法是被呼叫了兩次的,分別在服務端兩次接受到訊息的時候:

第一次呼叫時,由於緩衝區中只有四個位元組,在第二句 values.add(in.readInt()) 中丟擲了異常REPLAY,在ReplayingDecoder中被捕獲,並復位ReadIndex。此時values.size() = 1。

第二次呼叫時,從頭開始讀取到兩個Integer並放入values,因此values.size() = 3。

正確用法:

複製程式碼

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    //清空佇列
    values.clear();
    values.add(in.readInt());
    values.add(in.readInt());

    assert values.size() == 2;

    out.add(values.poll() + values.poll());
}

複製程式碼

 

如何提高ReplayingDecoder的效能?如上所說,使用ReplayingDecoder存在對一個訊息多次重複解碼的問題,我們可以通過Netty提供的狀態控制來解決這個問題。

首先我們將訊息結構設計為:header(4個位元組,存放訊息體長度),body(訊息體)

根據訊息的結構,我們定義兩個狀態:

複製程式碼

public enum MyDecoderState {
    /**
     * 未讀頭部
     */
    READ_LENGTH,

    /**
     * 未讀內容
     */
    READ_CONTENT;
}

複製程式碼

EchoClientHandler

複製程式碼

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println("sent to server: msg" + i);
            ctx.writeAndFlush("msg" + i);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

複製程式碼

 

EchoServerHandler

複製程式碼

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg from client: " +
            ((ByteBuf) msg).toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

複製程式碼

 

解碼器

複製程式碼

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {

    private int length;

    public IntegerHeaderFrameDecoder() {
        // Set the initial state.
        super(MyDecoderState.READ_LENGTH);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        switch (state()) {
            case READ_LENGTH:
                length = in.readInt();
                checkpoint(MyDecoderState.READ_CONTENT);
            case READ_CONTENT:
                ByteBuf frame = in.readBytes(length);
                checkpoint(MyDecoderState.READ_LENGTH);
                out.add(frame);
                break;
            default:
                throw new Error("Shouldn't reach here.");
        }
    }
}

複製程式碼

 

編碼器

複製程式碼

public class MyEncoder extends MessageToByteEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        byte[] b = msg.getBytes();
        int length = b.length;

        //write length of msg
        out.writeInt(length);

        //write msg
        out.writeBytes(b);
    }
}

複製程式碼

當頭部被成功讀取到時,我們呼叫 checkpoint(MyDecoderState.READ_CONTENT) 設定狀態為“未讀訊息”,相當於設定一個標誌位,如果在後續讀取時丟擲異常,那麼readIndex會被複位到上一次你呼叫checkpoint()方法的地方。下一次接收到訊息,再次呼叫decode()方法時,就能夠從checkpoint處開始讀取,避免了又從頭開始讀。

更多解碼器:

LineBasedFrameDecoder

這個類在Netty內部也有使用,它使用行尾控制字元(\n或者\r\n)作為分隔符來解析資料。

DelimiterBasedFrameDecoder

使用自定義的特殊字元作為訊息的分隔符。

HttpObjectDecoder

一個HTTP資料的解碼器。

 

這些解碼器也非常實用,下次更新關於這些解碼器的原理和詳細使用。

更多詳細內容參見《netty in action》 或者netty原始碼的英文註釋。

 

鄭州看婦科哪家好

鄭州割包皮醫院

鄭州婦科醫院

鄭州婦科醫院哪家好