1. 程式人生 > >小白帶你認識netty(三)之NioEventLoop的執行緒(或者reactor執行緒)啟動(三)

小白帶你認識netty(三)之NioEventLoop的執行緒(或者reactor執行緒)啟動(三)

上一章中,我們看了處理IO事件的過程,今天,我們瞅瞅處理非同步任務佇列。

3、處理非同步任務佇列

在執行完processSelectedKeys方法後,netty會繼續執行runAllTasks方法,在觀摩這個方法之前,我們瞭解下netty的task。

在初始化NioEventLoop的時候,會例項化兩種task:普通task和scheduledTask,我們分別看看他們:

(1)普通的task

當我們呼叫NioEventLoop的execute方法,看看是NioEventLoop都做了什麼操作:

無論是不是外部執行緒呼叫execute方法,都會執行addTask方法,進入該方法:

進入offerTask方法:

這個taskQueue是什麼?還記的mpsc佇列麼?這個是一個多生產者單消費者安全執行緒,因此,execute方法會向mpsc佇列中塞入task,該操作是執行緒安全的。

(2)scheduledTask

在呼叫NioEventLoop的schedule方法時候,瞅瞅NioEventLoop做了啥?

進入schedule方法:

如果是reactor執行緒,就執行scheduledTaskQueue().add(task);否則就通過execute執行scheduledTaskQueue().add(task);這兩個有什麼區別呢?

首先,reactor執行緒內是本執行緒執行scheduledTaskQueue().add(task);,所以是執行緒安全。當外部執行緒呼叫schedule方法時,就有可能會出現執行緒安全問題,那麼這裡通過execute方法執行scheduledTaskQueue().add(task);,說明scheduledTaskQueue應該不是執行緒安全的佇列。為了驗證我們的猜想,我們進入scheduledTaskQueue方法瞧瞧:

很明顯,這個scheduledTaskQueue是一個非現場安全的佇列,因此證明了我們的觀點。先讓scheduledTaskQueue().add(task);是執行緒安全,就得在把該操作放入執行緒安全的mpsc佇列中。

OK,知道了NioEventLoop的兩個任務佇列,我們進入主題,瞧瞧runAllTasks方法。進入該方法:

/**
     * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
     * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
     */
    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

先看一下聚合聚合方法fetchFromScheduledTaskQueue:

進入pollScheduledTask方法:

該方法是從scheduledTaskQueue獲取第一個scheduledTask。再回到fetchFromScheduledTaskQueue:

迴圈從scheduledTaskQueue中獲取scheduledTask塞入到taskQueue中,此處的處理還是非常小心的,如果塞入失敗後,再將該task放回到scheduledTaskQueue。回到runAllTasks方法中,我們繼續向下看。

進入pooTask方法:

繼續進入pollTaskFrom方法:

從taskQueue中獲取第一個task。回到runAllTasks方法上,繼續向下看:

這一步就是本章核心程式碼了。首先呼叫safeExecute方法,進入該方法摟一眼:

僅僅是執行task的run方法。

執行完safeExecute方法後,就將runTasks自增1。然後每隔0x3F即64個任務就判斷當前時間是否超過了本次reactor任務迴圈的截至時間,如果超過了就執行break,否則就繼續取task執行。

好了,處理非同步任務佇列講完了,總結一下:

首先,netty會將已經到期的定時任務放入到MPSC佇列中,然後迴圈執行task的run方法。