小白帶你認識netty(三)之NioEventLoop的執行緒(或者reactor執行緒)啟動(三)
在上一章中,我們看了處理IO事件的過程,今天,我們瞅瞅處理非同步任務佇列。
3、處理非同步任務佇列
在執行完processSelectedKeys方法後,netty會繼續執行runAllTasks方法,在觀摩這個方法之前,我們瞭解下netty的task。
在初始化NioEventLoop的時候,會例項化兩種task:普通task和scheduledTask,我們分別看看他們:
(1)普通的task
當我們呼叫NioEventLoop的execute方法,看看是NioEventLoop都做了什麼操作:
無論是不是外部執行緒呼叫execute方法,都會執行addTask方法,進入該方法:
進入offerTask方法:
這個taskQueue是什麼?還記的mpsc佇列麼?這個是一個多生產者單消費者安全執行緒,因此,execute方法會向mpsc佇列中塞入task,該操作是執行緒安全的。
(2)scheduledTask
在呼叫NioEventLoop的schedule方法時候,瞅瞅NioEventLoop做了啥?
進入schedule方法:
如果是reactor執行緒,就執行scheduledTaskQueue().add(task);否則就通過execute執行scheduledTaskQueue().add(task);這兩個有什麼區別呢?
首先,reactor執行緒內是本執行緒執行scheduledTaskQueue().add(task);,所以是執行緒安全。當外部執行緒呼叫schedule方法時,就有可能會出現執行緒安全問題,那麼這裡通過execute方法執行scheduledTaskQueue().add(task);,說明scheduledTaskQueue應該不是執行緒安全的佇列。為了驗證我們的猜想,我們進入scheduledTaskQueue方法瞧瞧:
很明顯,這個scheduledTaskQueue是一個非現場安全的佇列,因此證明了我們的觀點。先讓scheduledTaskQueue().add(task);是執行緒安全,就得在把該操作放入執行緒安全的mpsc佇列中。
OK,知道了NioEventLoop的兩個任務佇列,我們進入主題,瞧瞧runAllTasks方法。進入該方法:
/** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. */ protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
先看一下聚合聚合方法fetchFromScheduledTaskQueue:
進入pollScheduledTask方法:
該方法是從scheduledTaskQueue獲取第一個scheduledTask。再回到fetchFromScheduledTaskQueue:
迴圈從scheduledTaskQueue中獲取scheduledTask塞入到taskQueue中,此處的處理還是非常小心的,如果塞入失敗後,再將該task放回到scheduledTaskQueue。回到runAllTasks方法中,我們繼續向下看。
進入pooTask方法:
繼續進入pollTaskFrom方法:
從taskQueue中獲取第一個task。回到runAllTasks方法上,繼續向下看:
這一步就是本章核心程式碼了。首先呼叫safeExecute方法,進入該方法摟一眼:
僅僅是執行task的run方法。
執行完safeExecute方法後,就將runTasks自增1。然後每隔0x3F即64個任務就判斷當前時間是否超過了本次reactor任務迴圈的截至時間,如果超過了就執行break,否則就繼續取task執行。
好了,處理非同步任務佇列講完了,總結一下:
首先,netty會將已經到期的定時任務放入到MPSC佇列中,然後迴圈執行task的run方法。