netty原始碼閱讀之編碼之MessageToByteEncoder
MessageToByteEncoder的write過程,我們分析以下幾步:
1、匹配物件
2、分配記憶體
3、編碼實現
4、釋放物件
5、傳播資料
6、釋放記憶體
原始碼在這裡:
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { encode(ctx, cast, buf); } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) { ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release(); } } }
一、匹配物件
匹配物件從acceptOutboundMessage這裡進入,只要符合條件的物件才能進入下一步,否則繼續往下傳播,也就是
else {
ctx.write(msg, promise);
}
我們看 acceptOutboundMessage這個的原始碼吧:
/** * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. */ public boolean acceptOutboundMessage(Object msg) throws Exception { return matcher.match(msg); }
看看mater是何物:
private final TypeParameterMatcher matcher;
*/
protected MessageToByteEncoder(boolean preferDirect) {
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}
看match方法:
private static final class ReflectiveMatcher extends TypeParameterMatcher { private final Class<?> type; ReflectiveMatcher(Class<?> type) { this.type = type; } @Override public boolean match(Object msg) { return type.isInstance(msg); } }
也就是通過isInstance判斷是否符合條件,稍微分析下就能知道結論了。
二、記憶體分配
netty建立一個byteBuf,來存放傳進來的資訊,這裡是為這個byteBuf分配記憶體。
記憶體分配就是這個函式:
buf = allocateBuffer(ctx, cast, preferDirect);
進入:
/**
* Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
* Sub-classes may override this method to returna {@link ByteBuf} with a perfect matching {@code initialCapacity}.
*/
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
boolean preferDirect) throws Exception {
if (preferDirect) {
return ctx.alloc().ioBuffer();
} else {
return ctx.alloc().heapBuffer();
}
}
一般情況下netty預設使用堆外記憶體。
三、編碼實現
編碼實現呢,就是我們具體編碼器做的東西,例如我們Encoder實現的方法:
public class Encoder extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {
byte[] bytes = user.getName().getBytes();
out.writeInt(4 + bytes.length);
out.writeInt(user.getAge());
out.writeBytes(bytes);
}
}
還有很多netty自帶的encoder,大家可以看裡面的實現。
四、釋放物件
在上一步的編碼的時候,我們注意到這裡:
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
finally的時候,還需要釋放cast。cast就是我們傳進來的msg,我們注意看原始碼,它只有是ReferenceCounted的時候,才釋放物件,也就是傳入進來的本身就是byteBuf,那麼netty就會自動釋放這個物件,不需要使用者來釋放。
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
五、傳播資料
傳播資料的程式碼是這一段:
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
如果buf資料寫入成功,那就是可讀的,ctx.write(buf,promise)就會傳播,一直傳播到headcontext節點。
否則,傳播一個空的buf:
ctx.write(Unpooled.EMPTY_BUFFER, promise);
最後釋放這個buf:buf=null
六、釋放記憶體
上一步最後buf=null的時候已經頗有釋放記憶體的意思了,但是有可能丟擲異常了走不到buf=null這一步,不管如何總是不能有記憶體洩漏,netty想得很周到,在fianlly的時候判斷不為空也就是沒有釋放過那就需要釋放:
finally {
if (buf != null) {
buf.release();
}
}
最後說明一下,一般情況下不會走到了這裡,只有出現異常才走到這裡。