1. 程式人生 > >Netty(十四)——EventLoop之式

Netty(十四)——EventLoop之式

       前邊講了ByteBuf、Channel、Unsafe、ChannelPipeline、ChannelHandler等核心的類。這篇來學習學習EventLoop(EventLoopGroup)——Netty的執行緒。Netty的執行緒模型是經過精心的設計,既提高了框架的併發效能,又能在很大程度上避免死鎖,區域性還是實現了無鎖化設計。非常值得學習的。

       一,Reactor執行緒模型:Netty執行緒模型本質上也是經典的Reactor執行緒模型,看下前邊轉過的一篇文章《

Reactor模式詳解(轉)》。其中包括了,Reactor單執行緒模型、Reactor多執行緒模型、Reactor主從多執行緒模型。大家可以參考Reactor執行緒模型圖進行對比檢視。

Reactor執行緒模型
型別 特點 不足
Reactor單執行緒模型

1,指所有的I/O操作都是在同一個NIO執行緒上完成,包括:a,接收客戶端的TCP連線;b,向服務端發起TCP連線;c,讀取通訊對端的請求或應答訊息;d,向通訊對端傳送訊息或者應答訊息。

2,簡單、容易理解並實現

1,一個NIO執行緒同時處理成百上千的鏈路,效能無法支撐;

2,當NIO執行緒負載過重,處理速度變慢,導致客戶端超時,超時後重發,更加重NIO執行緒的負載,最終導致大量訊息積壓和處理超時,成為系統瓶頸;

3,可靠性問題:一旦NIO執行緒意外,或者進入死迴圈,會導致整個系統通訊模組不可用,造成節點故障。

Reactor多執行緒模型

1,有一個專門NIO執行緒——acceptor執行緒用於監聽服務端,接受客戶端的TCP連線;

2,網路IO操作——讀寫等由一個NIO執行緒池負責,包含一個任務佇列和N個可用執行緒,由這些NIO執行緒負責訊息的讀取、解碼、編碼和傳送等。

3,一個NIO執行緒可以同時處理N條鏈路,但是一個鏈路只對應一個NIO執行緒,防止發生併發操作問題。

1,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端連線,或者服務端對客戶端握手進行安全認證(耗費效能),可能會出現效能不足。
主從Reactor多執行緒模型

1,服務端用於接受客戶端連線的不再是一個單獨的NIO執行緒,而是一個獨立的NIO執行緒池;

2,sub reactor執行緒池用來負責SocketChannel的讀寫、編解碼等工作。

1,相對於上邊兩種更加複雜吧。

       二,Netty執行緒模型:Netty可以通過配置不同的啟動引數,支援上邊的幾種執行緒模型的,看下原理圖和程式碼吧:

            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try{

                ServerBootstrap b =new ServerBootstrap();
                b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //解碼
                        socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder());
                        //編碼
                        socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder());
                        socketChannel.pipeline().addLast(new SubReqServerHandler());
                    }
                });
                ChannelFuture f =b.bind(port).sync();
                f.channel().closeFuture().sync();
Netty執行緒職責
類別 職責
用於接收Client請求的執行緒池

1,接收Client的TCP連線,初始化Channel引數;

2,將鏈路狀態變更時間通知給ChannelPipeline;

用於處理I/O操作的執行緒池

1,非同步讀取通訊對端的資料報,傳送讀時間到ChannelPipeline;

2,非同步傳送訊息到通訊對端,呼叫ChannelPipeline的訊息傳送介面;

3,執行系統呼叫Task;

4,執行定時任務Task,例如鏈路空閒狀態監測定時任務。

       序列操作(無鎖化設計):上篇Netty(十二)——ChannelPipeline之觀Netty(十三)——ChannelHandler之意 我們看到事件的處理是在ChannelPipeline中傳輸像職責鏈一樣,經過ChannelHandler時進行處理,這種啟動多個序列化的執行緒並行執行(避免鎖的競爭),比一個佇列一個工作執行緒效能更優。

       三,Netty實踐建議:

      1,建立兩個NioEventLoopGroup,用於邏輯隔離NIO Acceptor和NIO IO操作執行緒;

      2,儘量不要在ChannelHandler中啟動使用者執行緒(解碼後用於將POJO訊息派發到後端業務執行緒的除外);

      3,解碼要放在NIO執行緒呼叫的解碼Handler中進行,不要切換到使用者執行緒中完成訊息的解碼;

      4,如果業務邏輯操作簡單,沒有複雜的業務邏輯計算,沒有可能導致執行緒被阻塞的磁碟操作、資料庫操作、網路操作等,可以直接在NIO執行緒上完成業務邏輯編排,不需要切換到使用者執行緒。

      5,如果業務邏輯處理複雜,不要在NIO執行緒上完成,建議將解碼後的POJO訊息封裝成Task,派發到業務執行緒中執行,以保證NIO執行緒儘快被釋放,處理其他的IO操作。

       四,NioEventLoop的原始碼分析:

       1,先看下NioEventLoop的類關係圖:

       2,看一下NioEventLoop的原始碼:

       2.1,首先看下多路複用器Selector在NioEventLoop中的初始化。

    /**
     * 一,聚合的多路複用器selector
     */
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

    /**
     * 二,初始化NioEventLoop,selector = openSelector();
     */
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
        super(parent, executor, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }

    /**
     * 二-1,selector = openSelector();
     */
    private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        //如果沒有開啟selectedKeys優化開關,直接返回provider.openSelector()。
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
        
        //如果開啟了,通過反射從selector中獲取selectedKeys和publicSelectedKeys,將其設定為可寫,通過反射的方式使用Netty構造的selectedKeys = selectedKeySet;替換JDK的。
        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());

            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

        return selector;
    }

       2.2,分析run方法的實現,這個方法基本呼叫到NioEventLoop中的所有封裝方法,有的呼叫層級還比較深,不過耐心檢視,跟下去,大概流程就清楚了。(這裡我在註釋中用數字表示了層級跟蹤,例如4-1-1 為三級方法呼叫)

    @Override
    protected void run() {
        //1-將wakeup還原false,並將之前的狀態儲存到oldWakeUp中
        boolean oldWakenUp = wakenUp.getAndSet(false);
        try {
            //2判斷當前訊息佇列中是否有訊息尚未處理,如果有,selectNow返回一次select操作。
            if (hasTasks()) {
                selectNow();
            } else {
                //3,輪詢,看是否有準備就緒的Channel
                select(oldWakenUp);
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            //先後執行IO任務和非IO任務,兩類任務的執行時間比由變數ioRatio控制,預設是非IO任務允許執行和IO任務相同的時間
            //Netty中控制IO執行比例佔比-分析,得到就緒狀態的SocketChannel
            if (ioRatio == 100) {
                //4
                processSelectedKeys();
                //5
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
            //如果關閉,則進行優雅停機,呼叫closeAll,釋放資源
            if (isShuttingDown()) {
                //6,
                closeAll();
                if (confirmShutdown()) {
                    cleanupAndTerminate(true);
                    return;
                }
            }
        } catch (Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }

        scheduleExecution();
    }


    /**
     * [email protected] {@link Queue#isEmpty()}
     */
    protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty();
    }
    /**
     * 2-2 selectNow
     */
    void selectNow() throws IOException {
        try {
            selector.selectNow();
        } finally {
            // restore wakup state if needed
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }
    }


    /**
     * 3-1 select
     */
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            //取當前系統的納秒時間
            long currentTimeNanos = System.nanoTime();
            //呼叫delayNanos計算定時任務的觸發時間
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            //死迴圈
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                //如果需要立即執行,或已經超時,則selector.selectNow();並退出當前迴圈
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                
                //將定時任務剩餘的超時時間作為引數進行select操作,沒完成一次select操作,對selectCnt+1
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                //如果有下列情況,則進行退出迴圈
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                //thread interrupted也break退出
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                //如果死迴圈了,則進行重建Selector方式,讓系統恢復正常rebuildSelector
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                            selectCnt);
                    //3-1-1
                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }
            
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
            }
            // Harmless exception - log anyway
        }
    }


    /**
     * 3-1-1
     * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
     * around the infamous epoll 100% CPU bug.
     */
    public void rebuildSelector() {
        //如果為其它執行緒發起,則為了避免多執行緒併發操作,將rebuildSelector()封裝成task放到訊息佇列中,由NioEventLoop負責呼叫。
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector();
                }
            });
            return;
        }

        final Selector oldSelector = selector;
        final Selector newSelector;

        if (oldSelector == null) {
            return;
        }

        try {
            //建立新的Selector
            newSelector = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.(將SocketChannel從舊的Selector移動到新的上)
        int nChannels = 0;
        for (;;) {
            try {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    try {
                        if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                            continue;
                        }

                        int interestOps = key.interestOps();
                        key.cancel();
                        SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                        if (a instanceof AbstractNioChannel) {
                            // Update SelectionKey
                            ((AbstractNioChannel) a).selectionKey = newKey;
                        }
                        nChannels ++;
                    } catch (Exception e) {
                        logger.warn("Failed to re-register a Channel to the new Selector.", e);
                        if (a instanceof AbstractNioChannel) {
                            AbstractNioChannel ch = (AbstractNioChannel) a;
                            ch.unsafe().close(ch.unsafe().voidPromise());
                        } else {
                            @SuppressWarnings("unchecked")
                            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                            invokeChannelUnregistered(task, key, e);
                        }
                    }
                }
            } catch (ConcurrentModificationException e) {
                // Probably due to concurrent modification of the key set.
                continue;
            }

            break;
        }

        selector = newSelector;

        try {
            // time to close the old selector as everything else is registered to the new one 銷燬舊的selector
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }


    /**
     * 4-1
     */
    private void processSelectedKeys() {
        //如果開啟selectedKeys優化功能走processSelectedKeysOptimized,否則走processSelectedKeysPlain
        if (selectedKeys != null) {
            //4-1-1
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            //4-1-2
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    /**
     *4-1-1
     **/
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                    i++;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

    /**
     *4-1-2
     **/
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
        //迴圈遍歷selectedKeys,進行操作
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            //獲取單個的進行,並從迭代器中刪除
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            //如果為AbstractNioChannel型別,進行IO讀寫相關操作
            if (a instanceof AbstractNioChannel) {
                //4-1-2-1
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                //taks型別
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

    /**
     *4-1-2-1
     **/
    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //判斷是否可用,不可用直接關閉返回
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        //進行readyOps判斷並呼叫unsafe進行相應的操作,讀、寫
        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }


    /**
     * 5-1 執行定時任務
     * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
     *
     * @return {@code true} if and only if at least one task was run
     */
    protected boolean runAllTasks() {
        //5-1-1
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                return true;
            }
        }
    }

    /**
     * 5-1-1將到時間的任務加入到taskqueue中供執行
     */
    private void fetchFromScheduledTaskQueue() {
        if (hasScheduledTasks()) {
            long nanoTime = AbstractScheduledEventExecutor.nanoTime();
            for (;;) {
                Runnable scheduledTask = pollScheduledTask(nanoTime);
                if (scheduledTask == null) {
                    break;
                }
                taskQueue.add(scheduledTask);
            }
        }
    }


    /**
     * 6-1 關閉所有鏈路,釋放執行緒、各種資源
     */
    private void closeAll() {
        selectAgain();
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }

        Netty的執行緒模型,Reactor的設計非常牛逼的,直接決定了軟體的效能和併發處理能力。多學習,多思考,多反覆,多總結……  繼續中……