1. 程式人生 > >Netty原始碼分析之ChannelPipeline(一)—ChannelPipeline的構造與初始化

Netty原始碼分析之ChannelPipeline(一)—ChannelPipeline的構造與初始化

Netty中ChannelPipeline實際上類似與一條資料管道,負責傳遞Channel中讀取的訊息,它本質上是基於責任鏈模式的設計與實現,無論是IO事件的攔截器,還是使用者自定義的ChannelHandler業務邏輯都做為一個個節點被新增到任務鏈上。

一、ChannelPipeline的設計與構成

 ChannelPipeline中做為Netty中的資料管道,作用就是通過控制與聯通不同的ChannelHandler,傳遞Channel中的訊息。每一個Channel,都對應一個ChannelPipeline作為ChannelHandler的容器,而ChannelHandlerContext則把ChannelHandler的封裝成每個節點,以雙向連結串列方式在容器中存在;我們可以通過下圖簡單看下它們之間的關係。

1、ChannelHandler

 使用過Netty的朋友們都清楚,ChannelHandler就是做為攔截器和業務處理邏輯的存在,它會處理Channel中讀寫的訊息;

首先看下ChannelHandler介面的定義

public interface ChannelHandler {

    //當ChannelHandler新增到Pipeline中時呼叫
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    //當ChannelHandler在Pipeline中被移除時呼叫
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    //在執行過程中 發生異常時呼叫
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

ChannelHandler在處理或攔截IO操作時,分為出站和入站兩個方向,對應channelde讀寫兩個操作,所以Netty中又從ChannelHandler中派生出入站ChannelInboundHandler和出站ChannelOutboundHandler兩個介面

ChannelInboundHandler處理入站資料以及各種狀態的變化,下面列出了ChannelInboundHandler中資料被接收或者Channel狀態發生變化時被呼叫的方法,這些方法和Channel的生命週期密切相關

public interface ChannelInboundHandler extends ChannelHandler {
    //當Channel註冊對應的EventLoop並且能夠處理I/O操作是被呼叫
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    //當Channel從它對應的EventLoop上登出,並且無法處理I/O操作是被呼叫
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    //當Channel已經連線時被呼叫
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    //當Channel為非活動狀態,也就是斷開時被呼叫
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    //當從Channel讀取資料時被呼叫
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    //當Channel的上一個讀資料完成後被呼叫
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    //當呼叫fireUserEventTriggered方法時被呼叫
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    //當Channel的可寫狀態發生改變時被呼叫。使用者可以確保寫操作不會完成太快
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    @Override
    @SuppressWarnings("deprecation")
    //入站操作發生異常時呼叫
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelOutboundHandler處理出站操作和資料

public interface ChannelOutboundHandler extends ChannelHandler {
    //當請求將Channel繫結到本地地址時被呼叫
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    //當請求將Channel連線到遠端節點時被呼叫
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    //當請求將Channel從遠端節點斷開時被呼叫
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    //當請求關閉Channel時被呼叫
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    //當請求從對應的EventLoop中登出時被呼叫
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    //當請求從Channel讀取資料時被呼叫
    void read(ChannelHandlerContext ctx) throws Exception;

    //當請求通過Channel將資料寫到遠端節點時被呼叫
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    //當請求通過Channel將入佇列資料沖刷到遠端節點時被呼叫
    void flush(ChannelHandlerContext ctx) throws Exception;
}

2、ChannelHandlerContext

ChannelHandlerContext可以說是ChannelPipeline的核心,它代表了ChannelHandler和ChannelPipeline之間的關聯,我們首先要知道一個ChannelPipeline內部會維護一個雙向連結串列,每當一個ChannelHandler被新增到ChannelPipeline中時,它都會被包裝成為一個ChannelHandlerContext,組成連結串列的各個節點。

我們看下ChannelHandlerContext介面中定義的API介面

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    /**
     * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
     */
    //每個ChannelHandlerContext都會對一個Channel
    Channel channel();

    /**
     * Returns the {@link EventExecutor} which is used to execute an arbitrary task.
     */
    //返回用於執行的EventExecutor任務
    EventExecutor executor();

    /**
     * The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
     * was added to the {@link ChannelPipeline}. This name can also be used to access the registered
     * {@link ChannelHandler} from the {@link ChannelPipeline}.
     */
    //返回定義的name名稱
    String name();

    /**
     * The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
     */
    ChannelHandler handler();

    /**
     * Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
     * from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
     * {@link EventLoop}.
     */
    //如果繫結到ChannelPipeline的ChannelHandler被刪除,返回true
    boolean isRemoved();

    //觸發下一個ChannelInboundHandler中fireChannelRegistered方法
    @Override
    ChannelHandlerContext fireChannelRegistered();
    //觸發下一個ChannelInboundHandler中fireChannelUnregistered方法
    @Override
    ChannelHandlerContext fireChannelUnregistered();
    //觸發下一個ChannelInboundHandler中fireChannelActive方法
    @Override
    ChannelHandlerContext fireChannelActive();
    //觸發下一個ChannelInboundHandler中fireChannelInactive方法
    @Override
    ChannelHandlerContext fireChannelInactive();
    //觸發下一個ChannelInboundHandler中fireExceptionCaught方法
    @Override
    ChannelHandlerContext fireExceptionCaught(Throwable cause);
    //觸發下一個ChannelInboundHandler中fireUserEventTriggered方法
    @Override
    ChannelHandlerContext fireUserEventTriggered(Object evt);
    //觸發下一個ChannelInboundHandler中fireChannelRead方法
    @Override
    ChannelHandlerContext fireChannelRead(Object msg);
    //觸發下一個ChannelInboundHandler中fireChannelReadComplete方法
    @Override
    ChannelHandlerContext fireChannelReadComplete();
    //觸發下一個ChannelInboundHandler中fireChannelWritabilityChanged方法
    @Override
    ChannelHandlerContext fireChannelWritabilityChanged();
    //觸發下一個ChannelInboundHandler中channelRead方法,如果是最後一個ChannelInboundHandler,則讀取完成後觸發channelReadComplete
    @Override
    ChannelHandlerContext read();
    //觸發下一個ChannelOutboundHandler中flush方法
    @Override
    ChannelHandlerContext flush();

    /**
     * Return the assigned {@link ChannelPipeline}
     */
    ChannelPipeline pipeline();

    /**
     * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
     */
    //返回繫結該channel 的 ByteBufAllocator
    ByteBufAllocator alloc();

    /**
     * @deprecated Use {@link Channel#attr(AttributeKey)}
     */
    @Deprecated
    @Override
    //返回Attribute
    <T> Attribute<T> attr(AttributeKey<T> key);

    /**
     * @deprecated Use {@link Channel#hasAttr(AttributeKey)}
     */
    @Deprecated
    @Override
    //是否包含指定的AttributeKey
    <T> boolean hasAttr(AttributeKey<T> key);
} 

二、ChannelPipeline的初始化

在AbstractChannel的建構函式中我們可以看到對ChannelPipeline的初始化

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();//初始化ChannelPipeline
    }

看下newChannelPipeline()內部的實現

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

在這裡建立了一個DefaultChannelPipeline 物件,並傳入Channel物件。DefaultChannelPipeline 實現了ChannelPipeline的介面

進入DefaultChannelPipeline類內部,看下其具體構造

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);//定義一個頭部節點
        head = new HeadContext(this);//定義一個尾部節點

        //連線頭尾節點,構成雙向連結串列
        head.next = tail;
        tail.prev = head;
    }

在這裡我們可以看到DefaultChannelPipeline內部通過宣告頭尾兩個Context節點物件,構建了一個雙向連結串列結構我們;其實這裡的TailContext與HeadContext都是ChannelHandlerContext介面的具體實現;

三、總結

通過上面的內容,我們可以看出ChannelPipeline就是一個用於攔截Channel入站和出站事件的ChannelHandler例項鏈,而ChannelHandlerContext就是這個例項鏈上的節點,每一個新建立的Channel都會被分配一個新的ChannelPipeline。這篇文章我們對ChannelPipeline的構造和設計進行了大概的總結,其中如有不足與不正確的地方還望指出與海涵。後面我會對ChannelPipeline中ChannelHandler的新增、刪除等具體操作與事件如何在管道中流通傳遞進行具體的分析。

 

關注微信公眾號,檢視更多技術文章。

 

&n