1. 程式人生 > >Netty原始碼分析 (五)----- 資料如何在 pipeline 中流動

Netty原始碼分析 (五)----- 資料如何在 pipeline 中流動

在上一篇文章中,我們已經瞭解了pipeline在netty中所處的角色,像是一條流水線,控制著位元組流的讀寫,本文,我們在這個基礎上繼續深挖pipeline在事件傳播

Unsafe

顧名思義,unsafe是不安全的意思,就是告訴你不要在應用程式裡面直接使用Unsafe以及他的衍生類物件。

netty官方的解釋如下

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread

Unsafe 在Channel定義,屬於Channel的內部類,表明Unsafe和Channel密切相關

下面是unsafe介面的所有方法

interface Unsafe {
   RecvByteBufAllocator.Handle recvBufAllocHandle();
   
   SocketAddress localAddress();
   SocketAddress remoteAddress();

   void register(EventLoop eventLoop, ChannelPromise promise);
   void bind(SocketAddress localAddress, ChannelPromise promise);
   void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
   void disconnect(ChannelPromise promise);
   void close(ChannelPromise promise);
   void closeForcibly();
   void beginRead();
   void write(Object msg, ChannelPromise promise);
   void flush();
   
   ChannelPromise voidPromise();
   ChannelOutboundBuffer outboundBuffer();
}

按功能可以分為分配記憶體,Socket四元組資訊,註冊事件迴圈,繫結網絡卡埠,Socket的連線和關閉,Socket的讀寫,看的出來,這些操作都是和jdk底層相關

Unsafe 繼承結構

 

 

NioUnsafe 在 Unsafe基礎上增加了以下幾個介面

public interface NioUnsafe extends Unsafe {
    SelectableChannel ch();
    void finishConnect();
    void read();
    void forceFlush();
}

從增加的介面以及類名上來看,NioUnsafe 增加了可以訪問底層jdk的SelectableChannel的功能,定義了從SelectableChannel讀取資料的read方法

Unsafe的分類

從以上繼承結構來看,我們可以總結出兩種型別的Unsafe分類,一個是與連線的位元組資料讀寫相關的NioByteUnsafe,一個是與新連線建立操作相關的NioMessageUnsafe

NioByteUnsafe中的讀:委託到外部類NioSocketChannel

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

最後一行已經與jdk底層以及netty中的ByteBuf相關,將jdk的 SelectableChannel的位元組資料讀取到netty的ByteBuf

NioMessageUnsafe中的讀:委託到外部類NioSocketChannel

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
    return 0;
}

NioMessageUnsafe 的讀操作很簡單,就是呼叫jdk的accept()方法,新建立一條連線

NioByteUnsafe中的寫:委託到外部類NioSocketChannel

@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

最後一行已經與jdk底層以及netty中的ByteBuf相關,將netty的ByteBuf中的位元組資料寫到jdk的 SelectableChannel

pipeline中的head

NioEventLoop

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
     //新連線的已準備接入或者已存在的連線有資料可讀
     if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
         unsafe.read();
     }
}

NioByteUnsafe

@Override
public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // 建立ByteBuf分配器
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    do {
        // 分配一個ByteBuf
        byteBuf = allocHandle.allocate(allocator);
        // 將資料讀取到分配的ByteBuf中去
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        if (allocHandle.lastBytesRead() <= 0) {
            byteBuf.release();
            byteBuf = null;
            close = allocHandle.lastBytesRead() < 0;
            break;
        }

        // 觸發事件,將會引發pipeline的讀事件傳播
        pipeline.fireChannelRead(byteBuf);
        byteBuf = null;
    } while (allocHandle.continueReading());
    pipeline.fireChannelReadComplete();
}

同樣,我抽出了核心程式碼,細枝末節先剪去,NioByteUnsafe 要做的事情可以簡單地分為以下幾個步驟

  1. 拿到Channel的config之後拿到ByteBuf分配器,用分配器來分配一個ByteBuf,ByteBuf是netty裡面的位元組資料載體,後面讀取的資料都讀到這個物件裡面
  2. 將Channel中的資料讀取到ByteBuf
  3. 資料讀完之後,呼叫 pipeline.fireChannelRead(byteBuf); 從head節點開始傳播至整個pipeline
  4. 最後呼叫fireChannelReadComplete();

這裡,我們的重點其實就是 pipeline.fireChannelRead(byteBuf);

DefaultChannelPipeline

final AbstractChannelHandlerContext head;
//...
head = new HeadContext(this);

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

結合這幅圖

 

 

可以看到,資料從head節點開始流入,在進行下一步之前,我們先把head節點的功能過一遍

HeadContext

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }

    @Override
    public ChannelHandler handler() {
        return this;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.disconnect(promise);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.close(promise);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.deregister(promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();

        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();

        readIfIsAutoRead();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();

        readIfIsAutoRead();
    }

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
}

從head節點繼承的兩個介面看,TA既是一個ChannelHandlerContext,同時又屬於inBound和outBound Handler

在傳播讀寫事件的時候,head的功能只是簡單地將事件傳播下去,如ctx.fireChannelRead(msg);

在真正執行讀寫操作的時候,例如在呼叫writeAndFlush()等方法的時候,最終都會委託到unsafe執行,而當一次資料讀完,channelReadComplete方法會被呼叫

pipeline中的inBound事件傳播

我們接著上面的 AbstractChannelHandlerContext.invokeChannelRead(head, msg); 這個靜態方法看,引數傳入了 head,我們知道入站資料都是從 head 開始的,以保證後面所有的 handler 都由機會處理資料流。

我們看看這個靜態方法內部是怎麼樣的:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

呼叫這個 Context (也就是 head) 的 invokeChannelRead 方法,並傳入資料。我們再看看head中 invokeChannelRead 方法的實現,實際上是在headContext的父類AbstractChannelHandlerContext中:

AbstractChannelHandlerContext

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

public ChannelHandler handler() {
    return this;
}

上面 handler()就是headContext中的handler,也就是headContext自身,也就是呼叫 head 的 channelRead 方法。那麼這個方法是怎麼實現的呢?

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

什麼都沒做,呼叫 Context 的 fire 系列方法,將請求轉發給下一個節點。我們這裡是 fireChannelRead 方法,注意,這裡方法名字都挺像的。需要細心區分。下面我們看看 Context 的成員方法 fireChannelRead:

AbstractChannelHandlerContext

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

這個是 head 的抽象父類 AbstractChannelHandlerContext 的實現,該方法再次呼叫了靜態 fire 系列方法,但和上次不同的是,不再放入 head 引數了,而是使用 findContextInbound 方法的返回值。從這個方法的名字可以看出,是找到入站型別的 handler。我們看看方法實現:

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

該方法很簡單,找到當前 Context 的 next 節點(inbound 型別的)並返回。這樣就能將請求傳遞給後面的 inbound handler 了。我們來看看 invokeChannelRead(findContextInbound(), msg);

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }

}

上面我們找到了next節點(inbound型別的),然後直接呼叫 next.invokeChannelRead(m);如果這個next是我們自定義的handler,此時我們自定義的handler的父類是AbstractChannelHandlerContext,則又回到了AbstractChannelHandlerContext中實現的invokeChannelRead,程式碼如下:

AbstractChannelHandlerContext

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

public ChannelHandler handler() {
    return this;
}

此時的handler()就是我們自定義的handler了,然後呼叫我們自定義handler中的 channelRead(this, msg);

請求進來時,pipeline 會從 head 節點開始輸送,通過配合 invoker 介面的 fire 系列方法,實現 Context 鏈在 pipeline 中的完美傳遞。最終到達我們自定義的 handler。

注意:此時如果我們想繼續向後傳遞該怎麼辦呢?我們前面說過,可以呼叫 Context 的 fire 系列方法,就像 head 的 channelRead 方法一樣,呼叫 fire 系列方法,直接向後傳遞就 ok 了。

如果所有的handler都呼叫了fire系列方法,則會傳遞到最後一個inbound型別的handler,也就是——tail節點,那我們就來看看tail節點

pipeline中的tail

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }

    @Override
    public ChannelHandler handler() {
        return this;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // This may not be a configuration error and so don't log anything.
        // The event may be superfluous for the current pipeline configuration.
        ReferenceCountUtil.release(evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        onUnhandledInboundException(cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}

正如我們前面所提到的,tail節點的大部分作用即終止事件的傳播(方法體為空)

channelRead

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

tail節點在發現位元組資料(ByteBuf)或者decoder之後的業務物件在pipeline流轉過程中沒有被消費,落到tail節點,tail節點就會給你發出一個警告,告訴你,我已經將你未處理的資料給丟掉了

總結一下,tail節點的作用就是結束事件傳播,並且對一些重要的事件做一些善意提醒

pipeline中的outBound事件傳播

上一節中,我們在闡述tail節點的功能時,忽略了其父類AbstractChannelHandlerContext所具有的功能,這一節中,我們以最常見的writeAndFlush操作來看下pipeline中的outBound事件是如何向外傳播的

典型的訊息推送系統中,會有類似下面的一段程式碼

Channel channel = getChannel(userInfo);
channel.writeAndFlush(pushInfo);

這段程式碼的含義就是根據使用者資訊拿到對應的Channel,然後給使用者推送訊息,跟進 channel.writeAndFlush

NioSocketChannel

public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

從pipeline開始往外傳播

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

Channel 中大部分outBound事件都是從tail開始往外傳播, writeAndFlush()方法是tail繼承而來的方法,我們跟進去

AbstractChannelHandlerContext

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

AbstractChannelHandlerContext

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

先呼叫findContextOutbound()方法找到下一個outBound()節點

AbstractChannelHandlerContext

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

找outBound節點的過程和找inBound節點類似,反方向遍歷pipeline中的雙向連結串列,直到第一個outBound節點next,然後呼叫next.invokeWriteAndFlush(m, promise)

AbstractChannelHandlerContext

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

呼叫該節點的ChannelHandler的write方法,flush方法我們暫且忽略,後面會專門講writeAndFlush的完整流程

AbstractChannelHandlerContext

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

可以看到,資料開始出站,從後向前開始流動,和入站的方向是反的。那麼最後會走到哪裡呢,當然是走到 head 節點,因為 head 節點就是 outbound 型別的 handler。

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

呼叫了 底層的 unsafe 操作資料,這裡,加深了我們對head節點的理解,即所有的資料寫出都會經過head節點

當執行完這個 write 方法後,方法開始退棧。逐步退到 unsafe 的 read 方法,回到最初開始的地方,然後繼續呼叫 pipeline.fireChannelReadComplete() 方法

總結

總結一下一個請求在 pipeline 中的流轉過程:

  1. 呼叫 pipeline 的 fire 系列方法,這些方法是介面 invoker 設計的,pipeline 實現了 invoker 的所有方法,inbound 事件從 head 開始流入,outbound 事件從 tail 開始流出。
  2. pipeline 會將請求交給 Context,然後 Context 通過抽象父類 AbstractChannelHandlerContext 的 invoke 系列方法(靜態和非靜態的)配合 AbstractChannelHandlerContext 的 fire 系列方法再配合 findContextInbound 和 findContextOutbound 方法完成各個 Context 的資料流轉。
  3. 當入站過程中,呼叫 了出站的方法,那麼請求就不會向後走了。後面的處理器將不會有任何作用。想繼續相會傳遞就呼叫 Context 的 fire 系列方法,讓 Netty 在內部幫你傳遞資料到下一個節點。如果你想在整個通道傳遞,就在 handler 中呼叫 channel 或者 pipeline 的對應方法,這兩個方法會將資料從頭到尾或者從尾到頭的流轉一遍。

&n