1. 程式人生 > >Netty進階:Pilpeline原理分析

Netty進階:Pilpeline原理分析

在上篇文章中提到每個SocketCahnnel或者ServerSocketChannel的父類AbstractChannel的建構函式中會例項化DefaultChannelPipeline。在本文中會詳細的介紹ChannelPiple例項化過程中的細節、以及ChannelPiple的工作原理。

1. ChannelPiple的例項化細節

首先來看看DefaultChannelPipeline類的繼承關係圖:

netty

看到DefaultChannelPipeline實現了 ChannelInboundInvoker及ChannelOutboundInvoker兩個介面。顧名思義,一個是處理通道的inbound事件呼叫器,另一個是處理通道的outbound事件呼叫器。

  • inbound: 本質上就是執行I/O執行緒將從外部read到的資料傳遞給業務執行緒的一個過程。
  • outbound: 本質上就是業務執行緒將資料傳遞給I/O執行緒, 直至傳送給外部的一個過程。

再回顧一下上篇文章中已經提到過的DefaultChannelPipeline的構造方法

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null
); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }

在此建構函式中綁定了當前channel例項,然後初始化雙向列表的頭尾節點。其中head是HeadContext的例項,tail是TailContext的例項,HeadContext與TailContext都是DefaultChannelPipeline的內部類。下面看看它們的類的繼承結構。

HeadContext類:

這裡寫圖片描述
TailContext類:
這裡寫圖片描述
從類繼承圖我們可以看出:
  1. HeadContext與TailContext都是通道的handler(中文一般叫做處理器)
  2. HeadContext既可以用於outbound過程的handler,也可以用於inbound過程的handler
  3. TailContext只可以用於inbound過程的handler
  4. HeadContext 與 TailContext 同時也是一個處理器上下文物件

下面繼續進入到HeadContext的構造方法

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    //獲取channel中的unsafe物件
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}

AbstractChannelHandlerContext的構造方法:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

此構造方法,只是設定了當前context物件對應的Pipeline以及此context是作用於outbound。最後一行程式碼setAddComplete設定當前節點的狀態,通過sun.misc.Unsafe的CAS操作來完成的。

小結:

  1. 每個channel初始化時,都會建立一個與之對應的pipeline
  2. 此pipeline內部就是一個雙向連結串列
  3. 雙向連結串列的頭結點是處理outbound過程的handler,尾節點是處理inbound過程的handler;
  4. 雙向連結串列的結點同時還是handler上下文物件;

2. 新增Handller

通過上一節 知道了ChannelPipeline 中含有兩個 ChannelHandlerContext(同時也是 ChannelHandler), 但是這個 Pipeline是如何新增我們指定的handller呢?一般在我們服務端會有如下的程式碼:

 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Someonehandller());
  }

如果看過ServerBootStrap原始碼分析Netty進階篇:ServerBootStrap原始碼分析就知道這裡只是設定了ServerBootStrap中的childHandller和AbstractBootStart中的handller屬性,當然handler()不是必須的。那麼這裡設定的handler和何時新增到PipleLine中呢?

AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister->init(channel)

ServerBootStrap重寫了init方法,比BootStartp實現稍微複雜一些。

@Override
void init(Channel channel) throws Exception {
    //省略部分程式碼
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    //...
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }

            });
        }
    }
}

這個方法此處傳入的引數channel是NioServerSocketChannel,channel.pipeline()獲取此channel關聯的PipleLine,接下來是呼叫關聯的pipeline的addLast()方法,是new了一個ChannelInitializer的匿名物件,下面看看ChannelInitializer的繼承關係

netty
通過類繼承圖,我們得知ChannelInitializer的匿名物件其實就是一個處理inbound過程的處理器。

addLast最終具體實現如下:

 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            newCtx = newContext(group, filterName(name, handler), handler);
            addLast0(newCtx);
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

newContext方法的作用就是對傳入的handler進行包裝,最後返回一個綁定了handler的context物件,也就是DefaultChannelHandlerContext例項,形成和PipeLine中head和tail相似的格式,但是前面也提到過head和tail都有in/outboud屬性,如何對這個Handler設定呢?看到原始碼後真的不得不歎服Netty開發者博大精深!

newContext方法中呼叫了下面構造方法:

DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

內部呼叫了父類AbstractChannelHandlerContext構造方法,下面看看isInbound方法

private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

接下里看addlast0方法:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

看到這裡就很舒服了,這就是在尾節點前面插入一個節點的操作。

init方法就是給NioServerSocket的pipeLine中新增一個匿名ChannelInitializer類,那麼有一個疑問,該匿名類的initChannel方法什麼時候執行?也就是ServerBootstrapAcceptor什麼時候被新增到pipleLine中?

在ServerBootStarp原始碼分析中提到過,當channel註冊到selector後,會呼叫pipeline.fireChannelRegistered() 方法,如下:

@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

引數head 是一個 AbstractChannelHandlerContext 例項, 並且它沒有重寫 invokeChannelRegistered方法,因此呼叫的AbstractChannelHandlerContext中實現的方法。

 private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

handler() 返回的, 其實就是一開始我們例項化匿名hannelInitializer 物件, 並接著呼叫了 ChannelInitializer.channelRegistered 方法. 繼續進入此方法

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.
            ctx.pipeline().fireChannelRegistered();
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.
            ctx.fireChannelRegistered();
        }
    }


private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }

initChannel(C ch) 這個方法就是我們重寫的方法,在此處呼叫, 當添加了自定義的 ServerBootstrapAcceptor 後, 會刪除 ChannelInitializer 這個 ChannelHandler, 即 remove(ctx);, 因此最後的 Pipeline 。

3. 事件傳輸機制

                                              I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

Outbound

(1). Outbound 事件的傳播方向是 tail -> customContext -> head. 在客戶端啟動分析中當呼叫了 Bootstrap.connect 方法時,就會觸發一個 Connect 請求事件,回顧一下呼叫過程,pipeline 的 connect 程式碼如下:

public final ChannelFuture connect(
            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, localAddress, promise);
}

(2). 當 outbound 事件(這裡是 connect 事件)傳遞到 Pipeline 後, 它其實是以 tail 為起點開始傳播的.而 tail.connect 其實呼叫的是 AbstractChannelHandlerContext.connect 方法:

public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

(3). findContextOutbound() 顧名思義, 它的作用是以當前 Context 為起點, 向 Pipeline 中的 Context 雙向連結串列的前端尋找第一個 outbound 屬性為真的 Context(即關聯著 ChannelOutboundHandler 的 Context), 然後返回。找到了一個 outbound 的 Context 後, 就呼叫它的 invokeConnect 方法, 這個方法中會呼叫 Context 所關聯著的 ChannelHandler 的 connect 方法:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

(4). 如果使用者沒有重寫 ChannelHandler 的 connect 方法, 那麼會呼叫 ChannelOutboundHandlerAdapter 所實現的方法:

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

ChannelOutboundHandlerAdapter.connect 僅僅呼叫了 ctx.connect,因此又回到了(2),繼續尋找outboundhandler,直到 connect 事件傳遞到DefaultChannelPipeline 的雙向連結串列的頭節點, 即 head 中,在head節點中呼叫connect:

 @Override
public void connect(
    ChannelHandlerContext ctx,
    SocketAddress remoteAddress, SocketAddress localAddress,
    ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

下面以一幅圖來描述一個整個 Connect 請求事件的處理過程: netty

和connect類似的Outbound事件還有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)

注意handler和context的的區別,比如context中connect方法是事件傳輸的介質,handler中connect是真正的處理事件。 Inbound 事件傳播方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

如果我們捕獲了一個事件, 並且想讓這個事件繼續傳遞下去, 那麼需要呼叫 Context 相應的傳播方法.

總結 對於 Outbound事件:

Outbound 事件是請求事件(由 Connect 發起一個請求, 並最終由 unsafe 處理這個請求)

Outbound 事件的發起者是 Channel

Outbound 事件的處理者是 unsafe

Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.

在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最後一個 Hnalder, 則需要呼叫 ctx.xxx (例如 ctx.connect) 將此事件繼續傳播下去. 如果不這樣做, 那麼此事件的傳播會提前終止.

Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT

對於 Inbound 事件:

Inbound 事件是通知事件, 當某件事情已經就緒後, 通知上層.

Inbound 事件發起者是 unsafe

Inbound 事件的處理者是 Channel, 如果使用者沒有實現自定義的處理方法, 那麼Inbound 事件預設的處理者是 TailContext, 並且其處理方法是空實現.

Inbound 事件在 Pipeline 中傳輸方向是 head -> tail

在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最後一個 Hnalder, 則需要呼叫 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續傳播下去. 如果不這樣做, 那麼此事件的傳播會提前終止.

Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT