1. 程式人生 > >Netty中的ChannelPipeline原始碼分析

Netty中的ChannelPipeline原始碼分析

ChannelPipeline在Netty中是用來處理請求的責任鏈,預設實現是DefaultChannelPipeline,其構造方法如下:

 1 private final Channel channel;
 2 private final ChannelFuture succeededFuture;
 3 private final VoidChannelPromise voidPromise;
 4 final AbstractChannelHandlerContext head;
 5 final AbstractChannelHandlerContext tail;
 6 
 7 protected DefaultChannelPipeline(Channel channel) {
 8     this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
 9     this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
10     this.voidPromise = new VoidChannelPromise(channel, true);
11     this.tail = new DefaultChannelPipeline.TailContext(this);
12     this.head = new DefaultChannelPipeline.HeadContext(this);
13     this.head.next = this.tail;
14     this.tail.prev = this.head;
15 }

ChannelPipeline和Channel是一一對應關係,一個Channel繫結一條ChannelPipeline責任鏈
succeededFuture 和voidPromise用來處理非同步操作
AbstractChannelHandlerContext 是持有請求的上下文物件,其和ChannelHandler是對應關係(在使用Sharable註解的情況下,不同的AbstractChannelHandlerContext 還可以對應同一個ChannelHandler),ChannelPipeline責任鏈
處理的就AbstractChannelHandlerContext ,再將最後的AbstractChannelHandlerContext 交給ChannelHandler去做正真的邏輯處理

AbstractChannelHandlerContext構造方法如下:

 1 private final String name;
 2 private final DefaultChannelPipeline pipeline;
 3 final EventExecutor executor;
 4 private final boolean inbound;
 5 private final boolean outbound;
 6 private final boolean ordered;
 7 volatile AbstractChannelHandlerContext next;
 8 volatile AbstractChannelHandlerContext prev;
 9 
10 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
11     this.name = (String)ObjectUtil.checkNotNull(name, "name");
12     this.pipeline = pipeline;
13     this.executor = executor;
14     this.inbound = inbound;
15     this.outbound = outbound;
16     this.ordered = executor == null || executor instanceof OrderedEventExecutor;
17 }

name是AbstractChannelHandlerContext的名稱,pipeline就是上面說的ChannelPipeline;executor是用來進行非同步操作的,預設使用的是在前面部落格中說過的NioEventLoop  (Netty中NioEventLoopGroup的建立原始碼分析)

inbound 和outbound 代表兩種請求處理方式,對應Netty中的I/O操作,若是inbound則處理Input操作,由ChannelPipeline從head 開始向後遍歷連結串列,並且只處理ChannelInboundHandler型別的AbstractChannelHandlerContext;若是outbound 則處理Output操作,由ChannelPipeline從tail開始向前遍歷連結串列,並且只處理ChannelOutboundHandler型別的AbstractChannelHandlerContext;
ordered 是判斷是否需要提供executor。

由next和prev成員可以知道,ChannelPipeline維護的是一條AbstractChannelHandlerContext的雙向連結串列
其頭節點head和尾節點tail分別預設初始化了HeadContext和TailContext

HeadContext的構造:

1 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
2     private final Unsafe unsafe;
3     
4     HeadContext(DefaultChannelPipeline pipeline) {
5     super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
6     this.unsafe = pipeline.channel().unsafe();
7     this.setAddComplete();
8     }
9 }

其中setAddComplete是由AbstractChannelHandlerContext實現的:

1 final void setAddComplete() {
2     int oldState;
3     do {
4         oldState = this.handlerState;
5     } while(oldState != 3 && !HANDLER_STATE_UPDATER.compareAndSet(this, oldState, 2));
6 
7 }

handlerState表示AbstractChannelHandlerContext對應的ChannelHandler的狀態,有一下幾種:

1 private static final int ADD_PENDING = 1;
2 private static final int ADD_COMPLETE = 2;
3 private static final int REMOVE_COMPLETE = 3;
4 private static final int INIT = 0;
5 private volatile int handlerState = 0;    

handlerState初始化預設是INIT狀態。

HANDLER_STATE_UPDATER是一個原子更新器:

1 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

所以setAddComplete方法,就是通過CAS操作,將handlerState狀態更新為ADD_COMPLETE

TailContext的構造:

1 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
2     TailContext(DefaultChannelPipeline pipeline) {
3         super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false);
4         this.setAddComplete();
5     }
6 }

和HeadContext一樣,將handlerState狀態更新為ADD_COMPLETE


結合官方給出的ChannelPipeline的圖示更容易理解:

 1                                              I/O Request
 2                                         via Channel or
 3                                     ChannelHandlerContext
 4                                                   |
 5 +---------------------------------------------------+---------------+
 6 |                           ChannelPipeline         |               |
 7 |                                                  \|/              |
 8 |    +---------------------+            +-----------+----------+    |
 9 |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
10 |    +----------+----------+            +-----------+----------+    |
11 |              /|\                                  |               |
12 |               |                                  \|/              |
13 |    +----------+----------+            +-----------+----------+    |
14 |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
15 |    +----------+----------+            +-----------+----------+    |
16 |              /|\                                  .               |
17 |               .                                   .               |
18 | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
19 |        [ method call]                       [method call]         |
20 |               .                                   .               |
21 |               .                                  \|/              |
22 |    +----------+----------+            +-----------+----------+    |
23 |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
24 |    +----------+----------+            +-----------+----------+    |
25 |              /|\                                  |               |
26 |               |                                  \|/              |
27 |    +----------+----------+            +-----------+----------+    |
28 |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
29 |    +----------+----------+            +-----------+----------+    |
30 |              /|\                                  |               |
31 +---------------+-----------------------------------+---------------+
32               |                                  \|/
33 +---------------+-----------------------------------+---------------+
34 |               |                                   |               |
35 |       [ Socket.read() ]                    [ Socket.write() ]     |
36 |                                                                   |
37 |  Netty Internal I/O Threads (Transport Implementation)            |
38 +-------------------------------------------------------------------+

 

下面對一些主要方法分析:
addFirst方法,有如下幾種過載:

 1 public final ChannelPipeline addFirst(ChannelHandler handler) {
 2     return this.addFirst((String)null, (ChannelHandler)handler);
 3 }
 4 
 5 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
 6     return this.addFirst((EventExecutorGroup)null, name, handler);
 7 }
 8 
 9 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
10     return this.addFirst((EventExecutorGroup)null, (ChannelHandler[])handlers);
11 }
12 
13 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
14     if (handlers == null) {
15         throw new NullPointerException("handlers");
16     } else if (handlers.length != 0 && handlers[0] != null) {
17         int size;
18         for(size = 1; size < handlers.length && handlers[size] != null; ++size) {
19             ;
20         }
21 
22         for(int i = size - 1; i >= 0; --i) {
23             ChannelHandler h = handlers[i];
24             this.addFirst(executor, (String)null, h);
25         }
26 
27         return this;
28     } else {
29         return this;
30     }
31 }
32 
33 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
34     final AbstractChannelHandlerContext newCtx;
35     synchronized(this) {
36         checkMultiplicity(handler);
37         name = this.filterName(name, handler);
38         newCtx = this.newContext(group, name, handler);
39         this.addFirst0(newCtx);
40         if (!this.registered) {
41             newCtx.setAddPending();
42             this.callHandlerCallbackLater(newCtx, true);
43             return this;
44         }
45 
46         EventExecutor executor = newCtx.executor();
47         if (!executor.inEventLoop()) {
48             newCtx.setAddPending();
49             executor.execute(new Runnable() {
50                 public void run() {
51                     DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
52                 }
53             });
54             return this;
55         }
56     }
57 
58     this.callHandlerAdded0(newCtx);
59     return this;
60 }

前面幾種都是間接呼叫的第四種沒什麼好說的,直接看第四種addFirst
首先呼叫checkMultiplicity,檢查ChannelHandlerAdapter在不共享的情況下是否重複:

 1 private static void checkMultiplicity(ChannelHandler handler) {
 2     if (handler instanceof ChannelHandlerAdapter) {
 3         ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;
 4         if (!h.isSharable() && h.added) {
 5             throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
 6         }
 7 
 8         h.added = true;
 9     }
10 
11 }

isSharable方法:

 1 public boolean isSharable() {
 2     Class<?> clazz = this.getClass();
 3     Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
 4     Boolean sharable = (Boolean)cache.get(clazz);
 5     if (sharable == null) {
 6         sharable = clazz.isAnnotationPresent(Sharable.class);
 7         cache.put(clazz, sharable);
 8     }
 9 
10     return sharable;
11 }

首先嚐試從當前執行緒的InternalThreadLocalMap中獲取handlerSharableCache,(InternalThreadLocalMap是在Netty中使用高效的FastThreadLocal替代JDK的ThreadLocal使用的 Netty中FastThreadLocal原始碼分析)
InternalThreadLocalMap的handlerSharableCache方法:

1 public Map<Class<?>, Boolean> handlerSharableCache() {
2     Map<Class<?>, Boolean> cache = this.handlerSharableCache;
3     if (cache == null) {
4         this.handlerSharableCache = (Map)(cache = new WeakHashMap(4));
5     }
6 
7     return (Map)cache;
8 }

噹噹前執行緒的InternalThreadLocalMap中沒有handlerSharableCache時,直接建立一個大小為4的WeakHashMap弱引用Map;

根據clazz從map中get,若是沒有,需要檢測當前clazz是否有Sharable註解,添加了Sharable註解的ChannelHandlerAdapter可以在不同Channel中共享使用一個單例,前提是確保執行緒安全;
之後會將該clazz以及是否實現Sharable註解的情況新增在cache快取中;
其中ChannelHandler的added是用來標識是否新增過;

回到addFirst方法:
checkMultiplicity成功結束後,呼叫filterName方法,給當前要產生的AbstractChannelHandlerContext物件產生一個名稱,
然後呼叫newContext方法,產生AbstractChannelHandlerContext物件:

1 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
2     return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
3 }

這裡實際上產生了一個DefaultChannelHandlerContext物件:

 1 final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
 2     private final ChannelHandler handler;
 3 
 4     DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
 5         super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
 6         if (handler == null) {
 7             throw new NullPointerException("handler");
 8         } else {
 9             this.handler = handler;
10         }
11     }
12 
13     public ChannelHandler handler() {
14         return this.handler;
15     }
16 
17     private static boolean isInbound(ChannelHandler handler) {
18         return handler instanceof ChannelInboundHandler;
19     }
20 
21     private static boolean isOutbound(ChannelHandler handler) {
22         return handler instanceof ChannelOutboundHandler;
23     }
24 }

可以看到DefaultChannelHandlerContext 僅僅是將AbstractChannelHandlerContext和ChannelHandler封裝了

在產生了DefaultChannelHandlerContext 物件後,呼叫addFirst0方法:

1 private void addFirst0(AbstractChannelHandlerContext newCtx) {
2     AbstractChannelHandlerContext nextCtx = this.head.next;
3     newCtx.prev = this.head;
4     newCtx.next = nextCtx;
5     this.head.next = newCtx;
6     nextCtx.prev = newCtx;
7 }

這裡就是一個簡單的雙向連結串列的操作,將newCtx節點插入到了head後面

然後判斷registered成員的狀態:

1 private boolean registered;

在初始化時是false

registered若是false,首先呼叫AbstractChannelHandlerContext的setAddPending方法:

1 final void setAddPending() {
2    boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, 0, 1);
3 
4     assert updated;
5 
6 }

和前面說過的setAddComplete方法同理,通過CAS操作,將handlerState狀態設定為ADD_PENDING
接著呼叫callHandlerCallbackLater方法:

 1 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
 2     assert !this.registered;
 3 
 4     DefaultChannelPipeline.PendingHandlerCallback task = added ? new DefaultChannelPipeline.PendingHandlerAddedTask(ctx) : new DefaultChannelPipeline.PendingHandlerRemovedTask(ctx);
 5     DefaultChannelPipeline.PendingHandlerCallback pending = this.pendingHandlerCallbackHead;
 6     if (pending == null) {
 7         this.pendingHandlerCallbackHead = (DefaultChannelPipeline.PendingHandlerCallback)task;
 8     } else {
 9         while(pending.next != null) {
10             pending = pending.next;
11         }
12 
13         pending.next = (DefaultChannelPipeline.PendingHandlerCallback)task;
14     }
15 
16 }

首先斷言判斷registered可能存在的多執行緒改變,然後根據added判斷產生何種型別的PendingHandlerCallback
PendingHandlerCallback是用來處理ChannelHandler的兩種回撥,定義如下:

 1 private abstract static class PendingHandlerCallback implements Runnable {
 2     final AbstractChannelHandlerContext ctx;
 3     DefaultChannelPipeline.PendingHandlerCallback next;
 4 
 5     PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
 6         this.ctx = ctx;
 7     }
 8 
 9     abstract void execute();
10 }


PendingHandlerAddedTask定義如下:

 1 private final class PendingHandlerAddedTask extends DefaultChannelPipeline.PendingHandlerCallback {
 2     PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
 3         super(ctx);
 4     }
 5 
 6     public void run() {
 7         DefaultChannelPipeline.this.callHandlerAdded0(this.ctx);
 8     }
 9 
10     void execute() {
11         EventExecutor executor = this.ctx.executor();
12         if (executor.inEventLoop()) {
13             DefaultChannelPipeline.this.callHandlerAdded0(this.ctx);
14         } else {
15             try {
16                 executor.execute(this);
17             } catch (RejectedExecutionException var3) {
18                 if (DefaultChannelPipeline.logger.isWarnEnabled()) {
19                     DefaultChannelPipeline.logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3});
20                 }
21 
22                 DefaultChannelPipeline.remove0(this.ctx);
23                 this.ctx.setRemoved();
24             }
25         }
26 
27     }
28 }

除去異常處理,無論是在execute方法還是在run方法中,主要核心是非同步執行callHandlerAdded0方法:

 1 private void callHandlerAdded0(AbstractChannelHandlerContext ctx) {
 2     try {
 3         ctx.setAddComplete();
 4         ctx.handler().handlerAdded(ctx);
 5     } catch (Throwable var10) {
 6         boolean removed = false;
 7 
 8         try {
 9             remove0(ctx);
10 
11             try {
12                 ctx.handler().handlerRemoved(ctx);
13             } finally {
14                 ctx.setRemoved();
15             }
16 
17             removed = true;
18         } catch (Throwable var9) {
19             if (logger.isWarnEnabled()) {
20                 logger.warn("Failed to remove a handler: " + ctx.name(), var9);
21             }
22         }
23 
24         if (removed) {
25             this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", var10));
26         } else {
27             this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", var10));
28         }
29     }
30 
31 }

除去異常處理,主要核心就兩行程式碼,首先通過setAddComplete方法,設定handlerState狀態為ADD_COMPLETE,然後回撥ChannelHandler的handlerAdded方法,這個handlerAdded方法就很熟悉了,在使用Netty處理業務邏輯時,會覆蓋這個方法。

PendingHandlerRemovedTask定義如下:

 1 private final class PendingHandlerRemovedTask extends DefaultChannelPipeline.PendingHandlerCallback {
 2     PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
 3         super(ctx);
 4     }
 5 
 6     public void run() {
 7         DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx);
 8     }
 9 
10     void execute() {
11         EventExecutor executor = this.ctx.executor();
12         if (executor.inEventLoop()) {
13             DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx);
14         } else {
15             try {
16                 executor.execute(this);
17             } catch (RejectedExecutionException var3) {
18                 if (DefaultChannelPipeline.logger.isWarnEnabled()) {
19                     DefaultChannelPipeline.logger.warn("Can't invoke handlerRemoved() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3});
20                 }
21 
22                 this.ctx.setRemoved();
23             }
24         }
25 
26     }
27 }

和PendingHandlerAddedTask一樣,主要還是非同步呼叫callHandlerRemoved0方法:

 1 private void callHandlerRemoved0(AbstractChannelHandlerContext ctx) {
 2     try {
 3         try {
 4             ctx.handler().handlerRemoved(ctx);
 5         } finally {
 6             ctx.setRemoved();
 7         }
 8     } catch (Throwable var6) {
 9         this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", var6));
10     }
11 
12 }

首先直接回調ChannelHandler的handlerRemoved方法,然後通過setRemoved方法將handlerState狀態設定為REMOVE_COMPLETE

回到callHandlerCallbackLater,其中成員pendingHandlerCallbackHead定義:

1 private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;

結合PendingHandlerCallback 可知,這個pendingHandlerCallbackHead是 DefaultChannelPipeline儲存的一條PendingHandlerCallback單鏈表,用來處理ChannelHandler的handlerAdded和handlerRemoved的回撥,在add的這些方法裡呼叫callHandlerCallbackLater時,added引數都為true,所以add的ChannelHandler只向pendingHandlerCallbackHead添加了handlerAdded的回撥。

回到addFirst方法,若是registered為true,先獲取EventExecutor,判斷是否處於輪詢中,若不是,則需要開啟輪詢執行緒直接非同步執行callHandlerAdded0方法,若處於輪詢,由於ChannelPipeline的呼叫是發生在輪詢時的,所以還是直接非同步執行callHandlerAdded0方法。

addFirst方法到此結束,再來看addLast方法,同樣有好幾種過載:

 1 public final ChannelPipeline addLast(ChannelHandler handler) {
 2     return this.addLast((String)null, (ChannelHandler)handler);
 3 }
 4 
 5 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
 6     return this.addLast((EventExecutorGroup)null, name, handler);
 7 }
 8 
 9 public final ChannelPipeline addLast(ChannelHandler... handlers) {
10     return this.addLast((EventExecutorGroup)null, (ChannelHandler[])handlers);
11 }
12 
13 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
14     if (handlers == null) {
15         throw new NullPointerException("handlers");
16     } else {
17         ChannelHandler[] var3 = handlers;
18         int var4 = handlers.length;
19 
20         for(int var5 = 0; var5 < var4; ++var5) {
21             ChannelHandler h = var3[var5];
22             if (h == null) {
23                 break;
24             }
25 
26             this.addLast(executor, (String)null, h);
27         }
28 
29         return this;
30     }
31 }
32 
33 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
34     final AbstractChannelHandlerContext newCtx;
35     synchronized(this) {
36         checkMultiplicity(handler);
37         newCtx = this.newContext(group, this.filterName(name, handler), handler);
38         this.addLast0(newCtx);
39         if (!this.registered) {
40             newCtx.setAddPending();
41             this.callHandlerCallbackLater(newCtx, true);
42             return this;
43         }
44 
45         EventExecutor executor = newCtx.executor();
46         if (!executor.inEventLoop()) {
47             newCtx.setAddPending();
48             executor.execute(new Runnable() {
49                 public void run() {
50                     DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
51                 }
52             });
53             return this;
54         }
55     }
56 
57     this.callHandlerAdded0(newCtx);
58     return this;
59 }

還是間接呼叫最後一種:
對比addFirst來看,只有addLast0不一樣:

1 private void addLast0(AbstractChannelHandlerContext newCtx) {
2     AbstractChannelHandlerContext prev = this.tail.prev;
3     newCtx.prev = prev;
4     newCtx.next = this.tail;
5     prev.next = newCtx;
6     this.tail.prev = newCtx;
7 }

還是非常簡單的雙向連結串列基本操作,只不過這次,是將AbstractChannelHandlerContext插入到了tail之前
還有兩個,addBefore和addAfter方法,和上述方法類似,就不再累贅


接下來看看ChannelPipeline是如何完成請求的傳遞的:
invokeHandlerAddedIfNeeded方法:

1 final void invokeHandlerAddedIfNeeded() {
2     assert this.channel.eventLoop().inEventLoop();
3 
4     if (this.firstRegistration) {
5         this.firstRegistration = false;
6         this.callHandlerAddedForAllHandlers();
7     }
8 
9 }

斷言判斷是否處於輪詢執行緒(ChannelPipeline處理請求都是在輪詢執行緒中,都需要非同步處理)
其中firstRegistration成員在DefaultChannelPipeline初始化時為true:

1 private boolean firstRegistration = true;

此時設定為false,表示第一次呼叫,以後都不再呼叫後面的callHandlerAddedForAllHandlers:

 1 private void callHandlerAddedForAllHandlers() {
 2     DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
 3     synchronized(this) {
 4         assert !this.registered;
 5 
 6         this.registered = true;
 7         pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
 8         this.pendingHandlerCallbackHead = null;
 9     }
10 
11     for(DefaultChannelPipeline.PendingHandlerCallback task = pendingHandlerCallbackHead; task != null; task = task.next) {
12         task.execute();
13     }
14 
15 }

剛才說過registered初始是false,在這裡判斷符合,之後就令其為true,然後獲取處理ChannelHandler的回撥連結串列pendingHandlerCallbackHead,並且將pendingHandlerCallbackHead置為null
然後遍歷這個單鏈表,處理ChannelHandler的handlerAdded和handlerRemoved的回撥

fireChannelRegistered方法,當Channel完成了向Selector的註冊後,會由channel的Unsafe進行回撥,非同步處理:

1 public final ChannelPipeline fireChannelRegistered() {
2     AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
3     return this;
4 }

實際上的處理由AbstractChannelHandlerContext的靜態方法invokeChannelRegistered完成,這裡傳遞的引數head就是DefaultChannelPipeline初始化時建立的HeadContext:

 1 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
 2     EventExecutor executor = next.executor();
 3     if (executor.inEventLoop()) {
 4         next.invokeChannelRegistered();
 5     } else {
 6         executor.execute(new Runnable() {
 7             public void run() {
 8                 next.invokeChannelRegistered();
 9             }
10         });
11     }
12 
13 }

可以看到實際上是非同步執行head物件的invokeChannelRegistered方法:

 1 private void invokeChannelRegistered() {
 2     if (this.invokeHandler()) {
 3         try {
 4             ((ChannelInboundHandler)this.handler()).channelRegistered(this);
 5         } catch (Throwable var2) {
 6             this.notifyHandlerException(var2);
 7         }
 8     } else {
 9         this.fireChannelRegistered();
10     }
11 
12 }


其中invokeHandler是用來判斷當前的handlerState狀態:

1 private boolean invokeHandler() {
2     int handlerState = this.handlerState;
3     return handlerState == 2 || !this.ordered && handlerState == 1;
4 }

若是當前handlerState狀態為ADD_COMPLETE,或者不需要提供EventExecutor並且狀態為ADD_PENDING時返回true,否則返回false
在成立的情況下,呼叫ChannelInboundHandler的channelRegistered方法,由於當前是head,所以由HeadContext實現了:

1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
2     DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
3     ctx.fireChannelRegistered();
4 }

首先呼叫invokeHandlerAddedIfNeeded,處理ChannelHandler的handlerAdded和handlerRemoved的回撥
然後呼叫ctx的fireChannelRegistered方法:

1 public ChannelHandlerContext fireChannelRegistered() {
2     invokeChannelRegistered(this.findContextInbound());
3     return this;
4 }

findContextInbound方法,用來找出下一個ChannelInboundInvoker:

1 private AbstractChannelHandlerContext findContextInbound() {
2     AbstractChannelHandlerContext ctx = this;
3 
4     do {
5         ctx = ctx.next;
6     } while(!ctx.inbound);
7 
8     return ctx;
9 }

從當前節點向後遍歷,inbound之前說過,該方法就是找到下一個ChannelInboundInvoker的型別的AbstractChannelHandlerContext,然後呼叫靜態方法invokeChannelRegistered,重複上述操作,若是在ChannelInboundHandler中沒有重寫channelRegistered方法,會一直執直到完所有ChannelHandler的channelRegistered方法。
ChannelInboundHandlerAdapter中的預設channelRegistered方法:

1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
2     ctx.fireChannelRegistered();
3 }

比HeadContext中的實現還簡單,直接呼叫fireChannelRegistered向後傳遞


fireChannelRead方法,是在Selector輪循到讀事件就緒,會由channel的Unsafe進行回撥,非同步處理:

1 public final ChannelPipeline fireChannelRead(Object msg) {
2     AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
3     return this;
4 }

還是從head開始呼叫AbstractChannelHandlerContext的靜態方法invokeChannelRead:

 1 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
 2     final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
 3     EventExecutor executor = next.executor();
 4     if (executor.inEventLoop()) {
 5         next.invokeChannelRead(m);
 6     } else {
 7         executor.execute(new Runnable() {
 8             public void run() {
 9                 next.invokeChannelRead(m);
10             }
11         });
12     }
13 
14 }

和上面一個邏輯非同步呼叫AbstractChannelHandlerContext物件的invokeChannelRead方法:

 1 private void invokeChannelRead(Object msg) {
 2     if (this.invokeHandler()) {
 3         try {
 4             ((ChannelInboundHandler)this.handler()).channelRead(this, msg);
 5         } catch (Throwable var3) {
 6             this.notifyHandlerException(var3);
 7         }
 8     } else {
 9         this.fireChannelRead(msg);
10     }
11 
12 }

這裡也和上面一樣,呼叫了HeadContext的channelRead方法:

1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
2     ctx.fireChannelRead(msg);
3 }

這裡直接不處理,呼叫ChannelHandlerContext 的fireChannelRead方法:

1 public ChannelHandlerContext fireChannelRead(Object msg) {
2     invokeChannelRead(this.findContextInbound(), msg);
3     return this;
4 }

和之前註冊一樣,選擇下一個ChannelInboundHandler,重複執行上述操作。


再來看到writeAndFlush方法,和上面的就不太一樣,這個發生在輪詢前,使用者通過channel來間接呼叫,在AbstractChannel中實現:

1 public ChannelFuture writeAndFlush(Object msg) {
2     return this.pipeline.writeAndFlush(msg);
3 }

實際上直接呼叫了DefaultChannelPipeline的writeAndFlush方法:

1 public final ChannelFuture writeAndFlush(Object msg) {
2     return this.tail.writeAndFlush(msg);
3 }

這裡又有些不一樣了,呼叫了tail的writeAndFlush方法,即TailContext的writeAndFlush,在AbstractChannelHandlerContext中實現:

1 public ChannelFuture writeAndFlush(Object msg) {
2     return this.writeAndFlush(msg, this.newPromise());
3 }

newPromise產生了一個ChannelPromise,用來處理非同步事件的;實際上呼叫了writeAndFlush的過載:

 1 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
 2     if (msg == null) {
 3         throw new NullPointerException("msg");
 4     } else if (this.isNotValidPromise(promise, true)) {
 5         ReferenceCountUtil.release(msg);
 6         return promise;
 7     } else {
 8         this.write(msg, true, promise);
 9         return promise;
10     }
11 }

繼續呼叫write方法:

 1 private void write(Object msg, boolean flush, ChannelPromise promise) {
 2     AbstractChannelHandlerContext next = this.findContextOutbound();
 3     Object m = this.pipeline.touch(msg, next);
 4     EventExecutor executor = next.executor();
 5     if (executor.inEventLoop()) {
 6         if (flush) {
 7             next.invokeWriteAndFlush(m, promise);
 8         } else {
 9             next.invokeWrite(m, promise);
10         }
11     } else {
12         Object task;
13         if (flush) {
14             task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
15         } else {
16             task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
17         }
18 
19         safeExecute(executor, (Runnable)task, promise, m);
20     }
21 
22 }

還是很相似,只不過先呼叫findContextOutbound找到下一個ChannelOutboundInvoker型別的ChannelHandlerContext,而且這裡是從尾部往前遍歷的,這樣來看前面所給的圖是沒有任何問題的
在找到ChannelOutboundInvoker後,呼叫invokeWriteAndFlush或者invokeWrite方法:
invokeWriteAndFlush方法:

 1 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
 2     if (this.invokeHandler()) {
 3         this.invokeWrite0(msg, promise);
 4         this.invokeFlush0();
 5     } else {
 6         this.writeAndFlush(msg, promise);
 7     }
 8 
 9 }
10 
11 private void invokeWrite0(Object msg, ChannelPromise promise) {
12     try {
13         ((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
14     } catch (Throwable var4) {
15         notifyOutboundHandlerException(var4, promise);
16     }
17 
18 }
19 
20 private void invokeFlush0() {
21     try {
22         ((ChannelOutboundHandler)this.handler()).flush(this);
23     } catch (Throwable var2) {
24         this.notifyHandlerException(var2);
25     }
26 
27 }

可以看到invokeWriteAndFlush回調了ChannelOutboundHandler的write和flush方法

最終會呼叫HeadContext的write和flush方法:

1 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
2     this.unsafe.write(msg, promise);
3 }
4 
5 public void flush(ChannelHandlerContext ctx) throws Exception {
6     this.unsafe.flush();
7 }

可以看到呼叫了unsafe的write和flush方法,向unsafe緩衝區寫入了訊息,當Selector輪詢到寫事件就緒時,就會通過unsafe將剛才寫入的內容交由JDK的SocketChannel完成最終的write操作。


ChannelPipeline的分析到此全部結束。

&n