netty原始碼分析(21)- inBound事件的傳播
上一節學習了刪除 ChannelHandler
的過程,至此我們已經瞭解了 pipeline
和 ChannelHandlerContext
, ChannelHandler
著三者之間的關係。 pipeline
通過維持一個連結串列結構,連結串列節點是 ChannelHandlerContext
,該節點持有 ChannelHandler
。部分對 ChannelHandler
的操作直接暴露給 ChannelHandlerContext
,因此我們可以直接操作 ChannelHandlerContext
來間接操作 ChannelHandler
。
本節以 ChannelRead
事件為例,學習inBound事件的傳播過程。
class DataServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new InboundHandlerA(), new InboundHandlerB(), new InboundHandlerC() ); } } class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandlerA = " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //呼叫通道的fireChannelRead方法是從頭節點HeadContext開始傳播 ctx.channel().pipeline().fireChannelRead("hello world"); } } class InboundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandlerB = " + msg); //呼叫資料節點的傳播方法是從當前節點往下傳播事件 ctx.fireChannelRead(msg); } } class InboundHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandlerC = " + msg); ctx.fireChannelRead(msg); } }
啟動服務並新增一個連線,百變Handler的新增順序。發現 InboundHandler
是順序執行的

改變順序之前
class DataServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new InboundHandlerB(), new InboundHandlerA(), new InboundHandlerC() ); } }

改變順序之後
-
handler
之間的傳播資訊通過fireXXX
方法,以channelRead
事件為例,則時通過fireChannelRead()
方法,可以用ChannelContextHandler
呼叫,也可以用pipeline
呼叫。其區別是從哪個節點開始傳播。
//呼叫通道的fireChannelRead方法是從頭節點HeadContext開始傳播 ctx.channel().pipeline().fireChannelRead("hello world"); //從本節點往下傳播 ctx.fireChannelRead(msg);
-
ctx.channel().pipeline().fireChannelRead("hello world")
傳播過程
@Override public final ChannelPipeline fireChannelRead(Object msg) { //從頭節點開始執行channelRead方法 AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { //執行channelRead() next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //獲取對應的handler並呼叫channelRead方法 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
最終傳播到 HeadContext
,呼叫 channelRead
方法再往下進行傳播
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //從當前節點往下傳播channelRead事件 ctx.fireChannelRead(msg); }
fireChannelRead()
呼叫了 findContextInbound()
通過 inbound
屬性輪詢出下一個 ChannelInboundHandler
。
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { //先找到下一個節點,再執行channelRead方法 //findContextInbound : 找到下一個節點 invokeChannelRead(findContextInbound(), msg); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; //通過inbound屬性輪詢出下一個inboundHandlerContext do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
而之前的章節也有提到過,inbound和outbound屬性是在新增 ChannelHandler
的時候,建立 ChannelHandlerContext
時被新增。而判斷是否時inbound則用的是 instanceof
關鍵字。
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
- 最終inbound事件的傳播過程,是從頭節點開始,逐個往下傳遞並觸發使用者回撥函式,在這過程當中,可以手動呼叫
pipeline
的傳播事件的方法,從任何一個節點開始從頭開始觸發傳播事件,也可以直接通過ChannelHandlerContext
的傳播事件方法,一次從本節點開始往下傳播事件。最後傳到尾節點TailContext
以channelRead
為例,當走到這個方法則表明,通道內未對傳播的內容進行處理,並且佔用的記憶體未釋放,在尾節點列印了日誌並最終釋放了記憶體。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(msg); } protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { //釋放記憶體 ReferenceCountUtil.release(msg); } }