1. 程式人生 > >Netty學習:ChannelPipeline

Netty學習:ChannelPipeline

一個{@link ChannelHandler}的列表,它處理或攔截{@link Channel}的入站事件和出站操作。

建立管道

 每個通道都有自己的管道,在建立新通道時自動建立管道。

事件如何在管道中流動

下圖描述了在{@link ChannelPipeline}中{@link ChannelHandler}s如何處理I/O事件。I/O事件由{@link ChannelInboundHandler}或{@link ChannelOutboundHandler}處理,並通過呼叫{@link ChannelHandlerContext}中定義的事件傳播方法(例如{@link ChannelHandlerContext#fireChannelRead(Object)}和{@link ChannelHandlerContext#write(Object)})轉發到最近的處理程式。

 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

入站事件由入站處理程式按照自底向上的方向處理,如圖左側所示。入站處理程式通常處理由圖底部的I/O執行緒生成的入站資料。入站資料通常通過實際輸入操作從遠端對等點讀取,例如{@link SocketChannel#read(ByteBuffer)}。如果入站事件超出了頂級入站處理程式,則會悄無聲息地丟棄該事件,或者在需要您注意時記錄該事件。

出站事件由出站處理程式按照自頂向下的方向處理,如圖右側所示。出站處理程式通常生成或轉換出站通訊流,如寫請求。如果出站事件超出底部出站處理程式,則由與 {@link Channel}關聯的I/O執行緒處理。I/O執行緒通常執行實際的輸出操作,例如{@link SocketChannel#write(ByteBuffer)}。

例如,假設我們建立了以下管道:

 * p.addLast("1", new InboundHandlerA());
 * p.addLast("2", new InboundHandlerB());
 * p.addLast("3", new OutboundHandlerA());
 * p.addLast("4", new OutboundHandlerB());
 * p.addLast("5", new InboundOutboundHandlerX());

在上面的示例中,名稱以{@code Inbound}開頭的類意味著它是入站處理程式。名稱以{@code Outbound}開頭的類意味著它是一個出站處理程式。

在給定的示例配置中,當事件進入入站時,處理程式的評估順序是1、2、3、4、5。當事件出站時,順序是5,4,3,2,1。在此原則之上,{@link ChannelPipeline}跳過了對某些處理程式的評估,以縮短堆疊深度:

  • 3和4沒有實現{@link ChannelInboundHandler},因此入站事件的實際計算順序是:1、2和5。
  • 1和2沒有實現{@link ChannelOutboundHandler},因此出站事件的實際計算順序是:5、4和3。
  • 如果5同時實現{@link ChannelInboundHandler}和{@link ChannelOutboundHandler},則入站和出站事件的計算順序分別為125和543。

將事件轉發到下一個處理程式

正如您在圖中可能注意到的,處理程式必須呼叫{@link ChannelHandlerContext}中的事件傳播方法,以便將事件轉發給下一個處理程式。這些方法包括:

入站事件傳播方法:

  • {@link ChannelHandlerContext#fireChannelRegistered()}
  • {@link ChannelHandlerContext#fireChannelActive()}
  • {@link ChannelHandlerContext#fireChannelRead(Object)}
  • {@link ChannelHandlerContext#fireChannelReadComplete()}
  • {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
  • {@link ChannelHandlerContext#fireUserEventTriggered(Object)}
  • {@link ChannelHandlerContext#fireChannelWritabilityChanged()}
  • {@link ChannelHandlerContext#fireChannelInactive()}
  • {@link ChannelHandlerContext#fireChannelUnregistered()}

 出站事件傳播方法:

  • {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}
  • {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}
  • {@link ChannelHandlerContext#write(Object, ChannelPromise)}
  • {@link ChannelHandlerContext#flush()}
  • {@link ChannelHandlerContext#read()}
  • {@link ChannelHandlerContext#disconnect(ChannelPromise)}
  • {@link ChannelHandlerContext#close(ChannelPromise)}
  • {@link ChannelHandlerContext#deregister(ChannelPromise)}

下面的例子展示了事件傳播通常是如何完成的: 

 * public class MyInboundHandler extends {@link ChannelInboundHandlerAdapter} {
 *     {@code @Override}
 *     public void channelActive({@link ChannelHandlerContext} ctx) {
 *         System.out.println("Connected!");
 *         ctx.fireChannelActive();
 *     }
 * }
 *
 * public class MyOutboundHandler extends {@link ChannelOutboundHandlerAdapter} {
 *     {@code @Override}
 *     public void close({@link ChannelHandlerContext} ctx, {@link ChannelPromise} promise) {
 *         System.out.println("Closing ..");
 *         ctx.close(promise);
 *     }
 * }

建立一個管道

使用者應該在管道中有一個或多個{@link ChannelHandler}來接收I/O事件(例如讀取)和請求I/O操作(例如寫入和關閉)。例如,一個典型的伺服器在每個通道的管道中都有以下處理程式,但是根據協議和業務邏輯的複雜性和特徵,您的里程可能會有所不同:

  • 協議解碼器——將二進位制資料(例如{@link ByteBuf})轉換為Java物件。
  • 協議編碼器——將Java物件轉換為二進位制資料。
  • 業務邏輯Handler——執行實際的業務邏輯(例如資料庫訪問)。

可以表示為,如下例所示:

  * static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
 * ...
 *
 * {@link ChannelPipeline} pipeline = ch.pipeline();
 *
 * pipeline.addLast("decoder", new MyProtocolDecoder());
 * pipeline.addLast("encoder", new MyProtocolEncoder());

//告訴管道在與I/O執行緒不同的執行緒中執行MyBusinessLogicHandler的事件處理程式方法,這樣I/O執行緒就不會被耗時的任務阻塞。

//如果您的業務邏輯是完全非同步的,或者非常快地完成,則不需要指定組。

* pipeline.addLast(group, "handler", new MyBusinessLogicHandler());

執行緒安全

可以在任何時候新增或刪除{@link ChannelHandler},因為{@link ChannelPipeline}是執行緒安全的。例如,您可以在即將交換敏感資訊時插入加密處理程式,並在交換後刪除它。

ChannelPipeline addFirst(String name, ChannelHandler handler);在管道的第一個位置插入一個{@link ChannelHandler}。

ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);在管道的第一個位置插入一個{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法

ChannelPipeline addLast(String name, ChannelHandler handler);在管道的最後一個位置追加一個{@link ChannelHandler}。

 ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);在管道的最後一個位置追加一個{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法

ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);在該管道的現有handler之前插入{@link ChannelHandler}。

ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);在該管道的現有handler之前插入{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法

ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);在該管道的現有處理程式之後插入{@link ChannelHandler}。

ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);在該管道的現有處理程式之後插入{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法

ChannelPipeline addFirst(ChannelHandler... handlers);在管道的第一個位置插入{@link ChannelHandler}s。

ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);在管道的第一個位置插入{@link ChannelHandler}s。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法

ChannelPipeline addLast(ChannelHandler... handlers);在管道的最後一個位置插入{@link ChannelHandler}s。

ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);在管道的最後一個位置插入{@link ChannelHandler}s。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法

ChannelPipeline remove(ChannelHandler handler);從該管道中刪除指定的{@link ChannelHandler}。

ChannelHandler remove(String name);從管道中刪除具有指定名稱的{@link ChannelHandler}。

<T extends ChannelHandler> T remove(Class<T> handlerType);從管道中刪除指定型別的{@link ChannelHandler}。

ChannelHandler removeFirst();刪除管道中的第一個{@link ChannelHandler}。

ChannelHandler removeLast();刪除此管道中的最後一個{@link ChannelHandler}。

ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);用管道中的newHandler替換指定的{@link ChannelHandler}。

ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);用管道中的新處理程式替換指定名稱的{@link ChannelHandler}。

<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);用管道中的新處理程式替換指定型別的{@link ChannelHandler}。

ChannelHandler first();返回此管道中的第一個{@link ChannelHandler}。

ChannelHandlerContext firstContext();返回管道中第一個{@link ChannelHandler}的上下文。

ChannelHandler last();返回此管道中的最後一個{@link ChannelHandler}。

ChannelHandlerContext lastContext();返回此管道中最後一個{@link ChannelHandler}的上下文。

ChannelHandler get(String name);返回管道中指定名稱的{@link ChannelHandler}。

<T extends ChannelHandler> T get(Class<T> handlerType);返回管道中指定型別的{@link ChannelHandler}。

ChannelHandlerContext context(ChannelHandler handler);返回此管道中指定的{@link ChannelHandler}的上下文物件。

ChannelHandlerContext context(String name);返回管道中指定名稱的{@link ChannelHandler}的上下文物件。

ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);返回管道中指定型別的{@link ChannelHandler}的上下文物件。

Channel channel();返回管道所連線的 {@link Channel}

List<String> names();返回handler的名稱LIST

Map<String, ChannelHandler> toMap();將此管道轉換為有序的{@link對映},其鍵為處理程式名稱,其值為處理程式。