1. 程式人生 > >Netty框架學習之路(五)—— EventLoop及事件迴圈機制

Netty框架學習之路(五)—— EventLoop及事件迴圈機制

在前面的博文中,我們大致分析瞭解了Channel及其相關概念。在Netty的執行緒模型中,每個channel都有唯一的一個eventLoop與之相繫結,那麼在這篇博文中我們來看一下EvenLoop及其相關概念。

在傳統的Java NIO程式設計中,我們經常使用到如下程式碼:

    public static void main(String[] args) {
        try {
            //建立選擇器
            Selector selector = Selector.open();
            //開啟通道
            ServerSocketChannel channel = ServerSocketChannel.open();
            //開啟非阻塞模式
channel.configureBlocking(false); //服務端socket監聽指定埠 channel.socket().bind(new InetSocketAddress(port), 1024); // 將 channel 註冊到 selector 中, // 通常我們都是先註冊一個 OP_ACCEPT 事件, // 然後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中。 channel.register(selector, SelectionKey.OP_ACCEPT); while
(true){ // 通過呼叫 select 方法, 阻塞地等待 channel I/O 可操作 selector.select(500); // 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經就緒. Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while
(it.hasNext()){ SelectionKey key = it.next(); // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示我們已經對這個 IO 事件進行了處理。 it.remove(); try { if(key.isAcceptable()) { //處理新的請求 三次握手 建立連線 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中. sc.register(selector, SelectionKey.OP_READ); } ……………… }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } }catch (IOException e){ e.printStackTrace(); } }

上述操作中的第一步通過Selector.open() 開啟一個 Selector,我們以NioServerSocketChannel為例,當建立NioServerSocketChannel時,Netty通過反射呼叫NioServerSocketChannel的無引數構造方法(具體過程後面專門介紹):

channel = this.channelFactory.newChannel();

NioSocketChannel的無引數構造方法如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioServerSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}

public NioServerSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

private static NioServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        //呼叫 SelectorProvider.openSocketChannel() 來開啟一個新的 Java NIO SocketChannel:
        return provider.openSocketChannel();
    } catch (IOException var2) {
        throw new ChannelException("Failed to open a socket.", var2);
    }
}

第二步 將 Channel 註冊到 Selector 中, 並設定需要監聽的事件。在channel的註冊過程中(具體過程後面專門介紹),會呼叫AbstractUnsafe.register0方法:

private void register0(ChannelPromise promise) {
    ……
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}

register0 又呼叫了 AbstractNioChannel.doRegister方法:

protected void doRegister() throws Exception {
    // 省略錯誤處理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

此處的引數0說明僅僅將 Channel 註冊到 Selector 中, 但是不設定interest set。那到底在哪裡設定的呢?其實在NioServerSocketChannel的構造方法中:

public NioServerSocketChannel(ServerSocketChannel channel) {
    //表示關注OP_ACCEPT事件
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

第一、二步都完成了,那麼第三步迴圈部分在哪呢?事實上 NioEventLoop 本身就是一個 SingleThreadEventExecutor,因此 NioEventLoop 的啟動 其實就是 NioEventLoop 所繫結的本地 Java 執行緒的啟動。在SingleThreadEventExecutor.doStartThread方法中建立執行緒並呼叫SingleThreadEventExecutor.this.run()方法,而run方法為抽象方法,具體實現在NioEventLoop的run方法中。

    protected void run() {
        for (;;) {
            try {
                //通過hasTasks方法判斷當前taskQueue是否為空
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } 
                    ……
                }
           }
    }

    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

此處for(;;) 所構成的死迴圈構成了NioEventLoop事件迴圈的核心。這裡有兩個方法需要注意,selector.selectNow()會檢查當前是否有就緒的 IO 事件,如果有,則返回就緒 IO 事件的個數,如果沒有,則返回0。selectNow() 是立即返回的,不會阻塞當前執行緒;selector.select(timeoutMillis)會阻塞住當前執行緒的,timeoutMillis 是阻塞的超時時間。

程式碼中有個名為ioRatio的屬性,它表示的是此執行緒分配給 IO 操作所佔的時間比(即執行 processSelectedKeys 耗時在整個迴圈中所佔用的時間)。計算公式:

設 IO 操作耗時為 ioTime, ioTime 佔的時間比例為 ioRatio, 則:
    ioTime / ioRatio = taskTime / taskRatio
    taskRatio = 100 - ioRatio
    => taskTime = ioTime * (100 - ioRatio) / ioRatio

再來看IO處理過程,即processSelectedKeys方法,

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

這個方法中會根據 selectedKeys 欄位是否為空,而分別呼叫 processSelectedKeysOptimized 或 processSelectedKeysPlain。 其實兩者沒有太大的區別,此處以 processSelectedKeysOptimized 為例分析一下工作流程。

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

程式碼中k.attachment()返回值是什麼呢?其實我們可以猜測一下應該是附著在SelectionKey的事物,聯想到在selector上註冊channel時候指定了SelectionKey,可以想到返回值其實就是channel自身。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        ……
        try {
            int readyOps = k.readyOps();

            //OP_CONNECT事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            //OP_WRITE事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            //OP_READ事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

OP_WRITE 可寫事件比較簡單,沒有詳細分析的必要了。這裡寫程式碼片
OP_READ事件處理過程有點長,具體可以看一下read方法:

public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                ……
            }
        }

歸納一下大概做了三件事情:分配 ByteBuf;從 SocketChannel 中讀取資料;呼叫 pipeline.fireChannelRead 傳送一個 inbound 事件。如果瞭解過channel相關內容,產生inbound事件之後便是channelPipeline的事了,具體如何處理請翻閱之前的博文。

OP_CONNECT 事件處理過程:將 OP_CONNECT 從就緒事件集中清除;呼叫 unsafe.finishConnect() 通知上層連線已建立。

unsafe.finishConnect方法最後會呼叫到 pipeline().fireChannelActive(),產生一個 inbound 事件,通知 pipeline 中的各個 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法會被呼叫)。

到了這裡, 我們整個 NioEventLoop 的 IO 操作部分已經瞭解完了