1. 程式人生 > >netty原始碼閱讀之編碼之MessageToByteEncoder

netty原始碼閱讀之編碼之MessageToByteEncoder

MessageToByteEncoder的write過程,我們分析以下幾步:

1、匹配物件

2、分配記憶體

3、編碼實現

4、釋放物件

5、傳播資料

6、釋放記憶體

原始碼在這裡:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }

一、匹配物件

匹配物件從acceptOutboundMessage這裡進入,只要符合條件的物件才能進入下一步,否則繼續往下傳播,也就是

else {
                ctx.write(msg, promise);
            }

我們看 acceptOutboundMessage這個的原始碼吧:

    /**
     * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
     * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     */
    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

看看mater是何物:

    private final TypeParameterMatcher matcher;
     */
    protected MessageToByteEncoder(boolean preferDirect) {
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect;
    }

 看match方法:

    private static final class ReflectiveMatcher extends TypeParameterMatcher {
        private final Class<?> type;

        ReflectiveMatcher(Class<?> type) {
            this.type = type;
        }

        @Override
        public boolean match(Object msg) {
            return type.isInstance(msg);
        }
    }

也就是通過isInstance判斷是否符合條件,稍微分析下就能知道結論了。

二、記憶體分配

netty建立一個byteBuf,來存放傳進來的資訊,這裡是為這個byteBuf分配記憶體。

記憶體分配就是這個函式:

                buf = allocateBuffer(ctx, cast, preferDirect);

進入:

    /**
     * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
     * Sub-classes may override this method to returna {@link ByteBuf} with a perfect matching {@code initialCapacity}.
     */
    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                               boolean preferDirect) throws Exception {
        if (preferDirect) {
            return ctx.alloc().ioBuffer();
        } else {
            return ctx.alloc().heapBuffer();
        }
    }

 一般情況下netty預設使用堆外記憶體。

三、編碼實現

編碼實現呢,就是我們具體編碼器做的東西,例如我們Encoder實現的方法:

public class Encoder extends MessageToByteEncoder<User> {
    @Override
    protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {

        byte[] bytes = user.getName().getBytes();
        out.writeInt(4 + bytes.length);
        out.writeInt(user.getAge());
        out.writeBytes(bytes);
    }
}

還有很多netty自帶的encoder,大家可以看裡面的實現。

四、釋放物件

在上一步的編碼的時候,我們注意到這裡:

                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

finally的時候,還需要釋放cast。cast就是我們傳進來的msg,我們注意看原始碼,它只有是ReferenceCounted的時候,才釋放物件,也就是傳入進來的本身就是byteBuf,那麼netty就會自動釋放這個物件,不需要使用者來釋放。

    /**
     * Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
     * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
     */
    public static boolean release(Object msg) {
        if (msg instanceof ReferenceCounted) {
            return ((ReferenceCounted) msg).release();
        }
        return false;
    }

五、傳播資料

傳播資料的程式碼是這一段:

                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;

如果buf資料寫入成功,那就是可讀的,ctx.write(buf,promise)就會傳播,一直傳播到headcontext節點。

否則,傳播一個空的buf:

ctx.write(Unpooled.EMPTY_BUFFER, promise);

 最後釋放這個buf:buf=null

六、釋放記憶體

上一步最後buf=null的時候已經頗有釋放記憶體的意思了,但是有可能丟擲異常了走不到buf=null這一步,不管如何總是不能有記憶體洩漏,netty想得很周到,在fianlly的時候判斷不為空也就是沒有釋放過那就需要釋放:

finally {
            if (buf != null) {
                buf.release();
            }
        }

最後說明一下,一般情況下不會走到了這裡,只有出現異常才走到這裡。