1. 程式人生 > >Netty原始碼分析之流水線

Netty原始碼分析之流水線

上一篇分析Netty執行緒模型,今天分析Netty另外一個重點流水線Pipe

一、流水線處理邏輯

Netty把各個事件放到Pipe中,進行自動化處理,這個做法非常棒!!思想非常獨特,使用者不需要關心是哪個事件?怎麼處理?使用者只需要把ChannelHandler設定到Pipe中就行了。下圖是Netty流水線經典處理流程:


左側是接收到的訊息處理流程,右側是傳送的訊息處理流程。Netty流水線實現方式很簡單,就是雙向連結串列(非迴圈連結串列)。它把所有的ChannelHandler放到連結串列中,如果是read事件,它從連結串列頭開始處理,如果是write事件,它從連結串列尾開處理。

二、流水線

Flags

流水線Pipe不需要使用者建立,這部分對使用者來說,完全是透明的。那麼在什麼地方建立的呢?

protected AbstractChannel(Channel parent, EventLoop eventLoop) {
    this.parent = parent;
    this.eventLoop = validate(eventLoop);
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

由上面程式碼可知,流水線是在建立Channel的時候,new出來的Pipe。下面先看兩個函式,這個兩個函式邏輯比較繞,首先是

skipFlags0方法

private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
    int flags = 0;
    try {
        if (handlerType.getMethod(
                "handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
            flags |= MASK_HANDLER_ADDED;
        }
        if (handlerType.getMethod(
                "handlerRemoved", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
            flags |= MASK_HANDLER_REMOVED;
        }
        ...
    } catch (Exception e) {
        // Should never reach here.
        PlatformDependent.throwException(e);
    }

    return flags;
}

這個裡面大部分都是if判斷,主要檢查某個方法是否存在,並且是否設定了skip(註解),才會設定flags bit位置。我們從反方向考慮:ChannelHandlerAdapter父類這些方法都已經實現並且都存在skip註解,那麼要是想某個方法不進入這個if分支,該如何做呢?

1)繼承ChannelHandlerAdapter

2)複寫父類中某個方法並且不要設定skip標誌。

第二個方法,是findContextInbound/findContextOutbound

public ChannelHandlerContext fireChannelRegistered() {
    DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
    next.invoker.invokeChannelRegistered(next);
    return this;
}

private DefaultChannelHandlerContext findContextInbound(int mask) {
    DefaultChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while ((ctx.skipFlags & mask) != 0);
    return ctx;
}

最繞人的邏輯就是 (ctx.skipflags & mask) != 0

假設maskMASK_CHANNEL_REGISTERED,如果skipFlagsmask&運算是非0,那麼while迴圈會繼續,會取下一個上下文。

從方法名findContextInbound可知,是要根據mask查詢上下文。如果根據mask要想查詢某個上下文,需要怎麼做呢?

1) 繼承ChannelHandlerAdapter

2) 複寫父類中某個方法並且不要設定skip標誌。

這段邏輯不好理解,可能描述也沒有描述太清楚,希望網友自己在去理會啊!!說了這麼多flags,它到到底有什麼作用呢?它的最大作用是用於區別InBound/OutBound。不知道大家有沒有疑慮,通過介面channel.pipeline().addLast(...),設定流水線,那麼netty是否怎麼知道,這個handler是用於序列化?還是反序列化呢?其實就是用flags判斷。這個也是netty5.0netty4.0最大的區別。

三、流水線動態新增

Netty流水線中ChannelHandler是支援動態插入、刪除的。這個特點在《Netty權威指南》中也有提到。接下來分析一下動態插入,首先看一下程式碼。

public void bind(int port) throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChildChannelHandler());
        ChannelFuture f = b.bind(port).sync();
        f.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel ch) throws Exception {
        // 將監聽事件 註冊到ChannelPipe流水線中 放到連結串列中  也可以註冊多個監聽事件 可以指定名字如果沒有名字 會自動生成
        ch.pipeline().addLast("GetTime", new TimeServerHandler());
    }
}

由上面程式碼可知,在啟動服務的時候,設定了一個內部類ChildChannelHandler。該類主要用於將解碼器,新增到流水線中。最開始我認為,服務啟動之後,initChannel方法就會呼叫,其實不然,initChannel方法是在客戶端與服務建鏈成功之後才會被呼叫到。那麼是在具體什麼程式碼中呼叫到呢?

我們在上一篇介紹執行緒模型時提到了register0方法,在該方法中有一行這樣的程式碼pipeline.fireChannelRegistered(),然後呼叫下面幾個方法:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
    next.invoker.invokeChannelRegistered(next);
    return this;
}

ChannelInitializer.java中的

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    ChannelPipeline pipeline = ctx.pipeline();
    boolean success = false;
    try {
        initChannel((C) ctx.channel());
        pipeline.remove(this);
        ctx.fireChannelRegistered();
        success = true;
    } catch (Throwable t) {
        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
    } finally {
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
        if (!success) {
            ctx.close();
        }
    }
}

在這個方法中有initChannel((C)ctx.channel());這段程式碼就會呼叫到上面initChannel方法,然後在流水線中動態新增解碼器。

通過上面程式碼,有一個地方需要再詳細介紹一下,在查詢ChannelHandleContext上下文,是通過MASK_CHANNEL_REGISTERED,那麼在什麼地方設定的這個標誌呢?

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel ch) throws Exception {
        // 將監聽事件 註冊到ChannelPipe流水線中 放到連結串列中  也可以註冊多個監聽事件 可以指定名字如果沒有名字 會自動生成
        ch.pipeline().addLast("GetTime", new TimeServerHandler());
    }
}

在設定childHandler時候定義了一個內部私有類,這個類繼承了ChannelInitializer。父類ChannelInitializer覆寫了方法channelRegistered,該方法就是MASK_CHANNEL_REGISTERED

至此,分析Netty的流水線已經完成,有分析不到的位請大家多多指點。