1. 程式人生 > >Netty5原始碼分析(六) -- CodeC編解碼分析

Netty5原始碼分析(六) -- CodeC編解碼分析

Netty5的CodeC編解碼對以往的版本進行了簡化,沒有單獨的Encoder / Decoder介面,都繼承了ChannelHandlerApdater類,來實現ChannelHandler介面。

對Decoder來說,主要有兩個頂層的抽象類,一個是從位元組流到訊息的ByteToMessageDecoder,一個是中間訊息到業務訊息的MessageToMessageDecoder。

ByteToMessageDecoder放置在MessageToMessageDecoder前面,處理inbound事件。

拿讀IO資料來舉例,最初的資料來源與底層的SocketChannel的讀方法,比如UnpooledDirectByteBuf,先分配一個直接記憶體緩衝區DirectByteBuffer,然後把資料複製到ByteBuf,返回ByteBuf給Netty上層的類。

對UnpooledDirectByteBuf來說,它底層封裝了Java的ByteBuffer,使用ByteBuf來對ByteBuffer進行操作,併發ByteBuf拋給頂層

// NioSocketChannel.doReadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }
// AbstractByteBuf.writeBytes
 public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
        ensureWritable(length);
        int writtenBytes = setBytes(writerIndex, in, length);
        if (writtenBytes > 0) {
            writerIndex += writtenBytes;
        }
        return writtenBytes;
    }
//UnpooledDirectByteBuf.setBytes
 public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
        ensureAccessible();
        ByteBuffer tmpBuf = internalNioBuffer();
        tmpBuf.clear().position(index).limit(index + length);
        try {
            return in.read(tmpNioBuf);
        } catch (ClosedChannelException e) {
            return -1;
        }
    }

Netty框架拿到ByteBuf之後就拿到了讀到的位元組資料,就可以呼叫ByteToMessageDecoder來把位元組流轉化成業務訊息。把位元組流轉化成業務訊息也就是如何分幀的問題。具體有幾類:

1. 固定長度來分幀

2.根據制定的分隔符

3. 採用訊息頭+訊息體的方式,訊息頭制定訊息長度

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());
                    }
                    cumulation.writeBytes(data);
                    data.release();
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;

                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }


把位元組流轉化成訊息後,這個訊息有時候是中間訊息,還需要把中間訊息轉化成具體的業務訊息,這時候呼叫MessageToMessageDecoder介面。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RecyclableArrayList out = RecyclableArrayList.newInstance();
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.get(i));
            }
            out.recycle();
        }
    }


// HttpContentDecoder的decode方法,把HttpResponse訊息轉化成具體的業務訊息
 protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
        if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) {

            if (!(msg instanceof LastHttpContent)) {
                continueResponse = true;
            }
            // 100-continue response must be passed through.
            out.add(ReferenceCountUtil.retain(msg));
            return;
        }

        if (continueResponse) {
            if (msg instanceof LastHttpContent) {
                continueResponse = false;
            }
            // 100-continue response must be passed through.
            out.add(ReferenceCountUtil.retain(msg));
            return;
        }

        if (msg instanceof HttpMessage) {
            assert message == null;
            message = (HttpMessage) msg;
            decodeStarted = false;
            cleanup();
        }


最後當ChannelPipeline到達後端的業務ChannelHandler時,拿到的訊息是已經解碼後的業務訊息。

編碼的過程也是一樣,提供兩個頂層介面,

1. MessageToMessageEncoder負責把業務訊息轉化成中間訊息

2. MessageToByteEncoder負責把中間訊息/業務訊息轉化成位元組流

編碼過程是一個反向的過程,這裡就不重複展示程式碼了。MessgeToMessageEncoder/Decoder不是必須的元件,可以根據實際情況使用。

ByteToMessageDecoder / MessageToByteEncoder 是必須的元件