1. 程式人生 > >Netty原始碼分析 (六)----- 客戶端接入accept過程

Netty原始碼分析 (六)----- 客戶端接入accept過程

通讀本文,你會了解到
1.netty如何接受新的請求
2.netty如何給新請求分配reactor執行緒
3.netty如何給每個新連線增加ChannelHandler

netty中的reactor執行緒

netty中最核心的東西莫過於兩種型別的reactor執行緒,可以看作netty中兩種型別的發動機,驅動著netty整個框架的運轉

一種型別的reactor執行緒是boos執行緒組,專門用來接受新的連線,然後封裝成channel物件扔給worker執行緒組;還有一種型別的reactor執行緒是worker執行緒組,專門用來處理連線的讀寫

不管是boos執行緒還是worker執行緒,所做的事情均分為以下三個步驟

  1. 輪詢註冊在selector上的IO事件
  2. 處理IO事件
  3. 執行非同步task

對於boos執行緒來說,第一步輪詢出來的基本都是 accept 事件,表示有新的連線,而worker執行緒輪詢出來的基本都是read/write事件,表示網路的讀寫事件

新連線的建立

簡單來說,新連線的建立可以分為三個步驟
1.檢測到有新的連線
2.將新的連線註冊到worker執行緒組
3.註冊新連線的讀事件

檢測到有新連線進入

我們已經知道,當服務端綁啟動之後,服務端的channel已經註冊到boos reactor執行緒中,reactor不斷檢測有新的事件,直到檢測出有accept事件發生

NioEventLoop.java

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    //檢查該SelectionKey是否有效,如果無效,則關閉channel
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 如果準備好READ或ACCEPT則觸發unsafe.read() ,檢查是否為0,如上面的原始碼英文註釋所說:解決JDK可能會產生死迴圈的一個bug。
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件
                // Connection already closed - no need to handle write.
                return;
            }
        }
        // 如果準備好了WRITE則將緩衝區中的資料傳送出去,如果緩衝區中資料都發送完成,則清除之前關注的OP_WRITE標記
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞,這樣可能會出現cpu 100%
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況

1)OP_ACCEPT,接受客戶端連線

2)OP_READ, 可讀事件, 即 Channel 中收到了新資料可供上層讀取。

3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入資料。

4)OP_CONNECT, 連線建立事件, 即 TCP 連線已經建立, Channel 處於 active 狀態。

本篇博文主要來看下當boss執行緒 selector檢測到OP_ACCEPT事件時,內部幹了些什麼。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件
        // Connection already closed - no need to handle write.
        return;
    }
}

boos reactor執行緒已經輪詢到 SelectionKey.OP_ACCEPT 事件,說明有新的連線進入,此時將呼叫channel的 unsafe來進行實際的操作,此時的channel為 NioServerSocketChannel,則unsafe為NioServerSocketChannel的屬性NioMessageUnsafe

那麼,我們進入到它的read方法,進入新連線處理的第二步

註冊到reactor執行緒

NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }
    } while (allocHandle.continueReading());
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();
}

呼叫 doReadMessages 方法不斷地讀取訊息,用 readBuf 作為容器,這裡,其實可以猜到讀取的是一個個連線,然後呼叫 pipeline.fireChannelRead(),將每條新連線經過一層服務端channel的洗禮,之後清理容器,觸發 pipeline.fireChannelReadComplete()

下面我們具體看下這兩個方法

1.doReadMessages(List)
2.pipeline.fireChannelRead(NioSocketChannel)

doReadMessages()

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

我們終於窺探到netty呼叫jdk底層nio的邊界 javaChannel().accept();,由於netty中reactor執行緒第一步就掃描到有accept事件發生,因此,這裡的accept方法是立即返回的,返回jdk底層nio建立的一條channel

ServerSocketChannel有阻塞和非阻塞兩種模式:

a、阻塞模式:ServerSocketChannel.accept() 方法監聽新進來的連線,當 accept()方法返回的時候,它返回一個包含新進來的連線的 SocketChannel。阻塞模式下, accept()方法會一直阻塞到有新連線到達。

b、非阻塞模式:,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null.

在NioServerSocketChannel的建構函式分析中,我們知道,其通過ch.configureBlocking(false);語句設定當前的ServerSocketChannel為非阻塞的。

netty將jdk的 SocketChannel 封裝成自定義的 NioSocketChannel,加入到list裡面,這樣外層就可以遍歷該list,做後續處理

從上一篇文章中,我們已經知道服務端的建立過程中會建立netty中一系列的核心元件,包括pipeline,unsafe等等,那麼,接受一條新連線的時候是否也會建立這一系列的元件呢?

帶著這個疑問,我們跟進去

NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

我們重點分析 super(parent, socket),NioSocketChannel的父類為 AbstractNioByteChannel

AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

這裡,我們看到jdk nio裡面熟悉的影子—— SelectionKey.OP_READ,一般在原生的jdk nio程式設計中,也會註冊這樣一個事件,表示對channel的讀感興趣

我們繼續往上,追蹤到AbstractNioByteChannel的父類 AbstractNioChannel, 這裡,我相信讀了上一篇文章你對於這部分程式碼肯定是有印象的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

在建立服務端channel的時候,最終也會進入到這個方法,super(parent), 便是在AbstractChannel中建立一系列和該channel繫結的元件,如下

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

而這裡的 readInterestOp 表示該channel關心的事件是 SelectionKey.OP_READ,後續會將該事件註冊到selector,之後設定該通道為非阻塞模式,在channel中建立 unsafe 和一條 pipeline 

pipeline.fireChannelRead(NioSocketChannel)

對於 pipeline我們前面已經瞭解過,在netty的各種型別的channel中,都會包含一個pipeline,字面意思是管道,我們可以理解為一條流水線工藝,流水線工藝有起點,有結束,中間還有各種各樣的流水線關卡,一件物品,在流水線起點開始處理,經過各個流水線關卡的加工,最終到流水線結束

對應到netty裡面,流水線的開始就是HeadContxt,流水線的結束就是TailConextHeadContxt中呼叫Unsafe做具體的操作,TailConext中用於向用戶丟擲pipeline中未處理異常以及對未處理訊息的警告

通過前面的文章中,我們已經知道在服務端的channel初始化時,在pipeline中,已經自動添加了一個pipeline處理器 ServerBootstrapAcceptor, 並已經將使用者程式碼中設定的一系列的引數傳入了建構函式,接下來,我們就來看下ServerBootstrapAcceptor

ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;

    ServerBootstrapAcceptor(
            EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
}

前面的 pipeline.fireChannelRead(NioSocketChannel); 最終通過head->unsafe->ServerBootstrapAcceptor的呼叫鏈,呼叫到這裡的 ServerBootstrapAcceptor 的channelRead方法,而 channelRead 一上來就把這裡的msg強制轉換為 Channel

然後,拿到該channel,也就是我們之前new出來的 NioSocketChannel中對應的pipeline,將使用者程式碼中的 childHandler,新增到pipeline,這裡的 childHandler 在使用者程式碼中的體現為

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoServerHandler());
     }
 });

其實對應的是 ChannelInitializer,到了這裡,NioSocketChannel中pipeline對應的處理器為 head->ChannelInitializer->tail,牢記,後面會再次提到!

接著,設定 NioSocketChannel 對應的 attr和option,然後進入到 childGroup.register(child),這裡的childGroup就是我們在啟動程式碼中new出來的NioEventLoopGroup

我們進入到NioEventLoopGroupregister方法,代理到其父類MultithreadEventLoopGroup

MultithreadEventLoopGroup.java

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

這裡又扯出來一個 next()方法,我們跟進去

MultithreadEventLoopGroup.java

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

回到其父類

MultithreadEventExecutorGroup.java

@Override
public EventExecutor next() {
    return chooser.next();
}

這裡的chooser對應的類為 EventExecutorChooser,字面意思為事件執行器選擇器,放到我們這裡的上下文中的作用就是從worker reactor執行緒組中選擇一個reactor執行緒

public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         */
        EventExecutor next();
    }
}

chooser的實現有兩種

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

預設情況下,chooser通過 DefaultEventExecutorChooserFactory被建立,在建立reactor執行緒選擇器的時候,會判斷reactor執行緒的個數,如果是2的冪,就建立PowerOfTowEventExecutorChooser,否則,建立GenericEventExecutorChooser

兩種型別的選擇器在選擇reactor執行緒的時候,都是通過Round-Robin的方式選擇reactor執行緒,唯一不同的是,PowerOfTowEventExecutorChooser是通過與運算,而GenericEventExecutorChooser是通過取餘運算,與運算的效率要高於求餘運算

選擇完一個reactor執行緒,即 NioEventLoop 之後,我們回到註冊的地方

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

其實,這裡已經和服務端啟動的過程一樣了,可以參考我前面的文章

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}

和服務端啟動過程一樣,先是呼叫 doRegister();做真正的註冊過程,如下

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

將該條channel繫結到一個selector上去,一個selector被一個reactor執行緒使用,後續該channel的事件輪詢,以及事件處理,非同步task執行都是由此reactor執行緒來負責

繫結完reactor執行緒之後,呼叫 pipeline.invokeHandlerAddedIfNeeded()

前面我們說到,到目前為止NioSocketChannel 的pipeline中有三個處理器,head->ChannelInitializer->tail,最終會呼叫到 ChannelInitializer 的 handlerAdded 方法

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

handlerAdded方法呼叫 initChannel 方法之後,呼叫remove(ctx);將自身刪除,如下

AbstractNioChannel.java

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

而這裡的 initChannel 方法又是神馬玩意?讓我們回到使用者方法,比如下面這段使用者程式碼

使用者程式碼

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoServerHandler());
     }
 });

原來最終跑到我們自己的程式碼裡去了啊!完了之後,NioSocketChannel繫結的pipeline的處理器就包括 head->LoggingHandler->EchoServerHandler->tail

註冊讀事件

接下來,我們還剩下這些程式碼沒有分析完

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    // ..
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}

pipeline.fireChannelRegistered();,其實沒有幹啥有意義的事情,最終無非是再呼叫一下業務pipeline中每個處理器的 ChannelHandlerAdded方法處理下回調

isActive()在連線已經建立的情況下返回true,所以進入方法塊,進入到 pipeline.fireChannelActive();在這裡我詳細步驟先省略,直接進入到關鍵環節

AbstractNioChannel.java

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

這裡其實就是將 SelectionKey.OP_READ事件註冊到selector中去,表示這條通道已經可以開始處理read事件了

總結

至此,netty中關於新連線的處理已經向你展示完了,我們做下總結

1.boos reactor執行緒輪詢到有新的連線進入
2.通過封裝jdk底層的channel建立 NioSocketChannel以及一系列的netty核心元件
3.將該條連線通過chooser,選擇一條worker reactor執行緒繫結上去
4.註冊讀事件,開始新連線的讀寫