netty原始碼分析之揭開reactor執行緒的面紗(一)
netty最核心的就是reactor執行緒,對應專案中使用廣泛的NioEventLoop,那麼NioEventLoop裡面到底在幹些什麼事?netty是如何保證事件迴圈的高效輪詢和任務的及時執行?又是如何來優雅地fix掉jdk的nio bug?帶著這些疑問,本篇文章將庖丁解牛,帶你逐步瞭解netty reactor執行緒的真相[原始碼基於4.1.6.Final]
reactor 執行緒的啟動
NioEventLoop的run方法是reactor執行緒的主體,在第一次新增任務的時候被啟動
NioEventLoop 父類 SingleThreadEventExecutor 的execute方法
@Override public void execute(Runnable task) { ... boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); ... } ... } 複製程式碼
外部執行緒在往任務佇列裡面新增任務的時候執行 startThread()
,netty會判斷reactor執行緒有沒有被啟動,如果沒有被啟動,那就啟動執行緒再往任務佇列裡面新增任務
private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } } 複製程式碼
SingleThreadEventExecutor 在執行 doStartThread
的時候,會呼叫內部執行器 executor
的execute方法,將呼叫NioEventLoop的run方法的過程封裝成一個runnable塞到一個執行緒中去執行
private void doStartThread() { ... executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); ... SingleThreadEventExecutor.this.run(); ... } } } 複製程式碼
該執行緒就是 executor
建立,對應netty的reactor執行緒實體。 executor
預設是 ThreadPerTaskExecutor
預設情況下, ThreadPerTaskExecutor
在每次執行 execute
方法的時候都會通過 DefaultThreadFactory
建立一個 FastThreadLocalThread
執行緒,而這個執行緒就是netty中的reactor執行緒實體
ThreadPerTaskExecutor
public void execute(Runnable command) { threadFactory.newThread(command).start(); } 複製程式碼
關於為啥是 ThreadPerTaskExecutor
和 DefaultThreadFactory
的組合來new一個 FastThreadLocalThread
,這裡就不再詳細描述,通過下面幾段程式碼來簡單說明
標準的netty程式會呼叫到 NioEventLoopGroup
的父類 MultithreadEventExecutorGroup
的如下程式碼
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } } 複製程式碼
然後通過newChild的方式傳遞給 NioEventLoop
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } 複製程式碼
關於reactor執行緒的建立和啟動就先講這麼多,我們總結一下:netty的reactor執行緒在新增一個任務的時候被建立,該執行緒實體為 FastThreadLocalThread
(這玩意以後會開篇文章重點講講),最後執行緒執行主體為 NioEventLoop
的 run
方法。
reactor 執行緒的執行
那麼下面我們就重點剖析一下 NioEventLoop
的run方法
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } processSelectedKeys(); runAllTasks(...); } } catch (Throwable t) { handleLoopException(t); } ... } 複製程式碼
我們抽取出主幹,reactor執行緒做的事情其實很簡單,用下面一幅圖就可以說明

reactor執行緒大概做的事情分為對三個步驟不斷迴圈
1.首先輪詢註冊到reactor執行緒對用的selector上的所有的channel的IO事件
select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } 複製程式碼
2.處理產生網路IO事件的channel
processSelectedKeys(); 複製程式碼
3.處理任務佇列
runAllTasks(...); 複製程式碼
下面對每個步驟詳細說明
select操作
select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } 複製程式碼
wakenUp
表示是否應該喚醒正在阻塞的select操作,可以看到netty在進行一次新的loop之前,都會將 wakeUp
被設定成false,標誌新的一輪loop的開始,具體的select操作我們也拆分開來看
1.定時任務截止事時間快到了,中斷本次輪詢
int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } .... } 複製程式碼
我們可以看到,NioEventLoop中reactor執行緒的select操作也是一個for迴圈,在for迴圈第一步中,如果發現當前的定時任務佇列中有任務的截止事件快到了(<=0.5ms),就跳出迴圈。此外,跳出之前如果發現目前為止還沒有進行過select操作( if (selectCnt == 0)
),那麼就呼叫一次 selectNow()
,該方法會立即返回,不會阻塞
這裡說明一點,netty裡面定時任務佇列是按照延遲時間從小到大進行排序, delayNanos(currentTimeNanos)
方法即取出第一個定時任務的延遲時間
protected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; } return scheduledTask.delayNanos(currentTimeNanos); } 複製程式碼
關於netty的任務佇列(包括普通任務,定時任務,tail task)相關的細節後面會另起一片文章,這裡不過多展開
2.輪詢過程中發現有任務加入,中斷本次輪詢
for (;;) { // 1.定時任務截至事時間快到了,中斷本次輪詢 ... // 2.輪詢過程中發現有任務加入,中斷本次輪詢 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } .... } 複製程式碼
netty為了保證任務佇列能夠及時執行,在進行阻塞select操作的時候會判斷任務佇列是否為空,如果不為空,就執行一次非阻塞select操作,跳出迴圈
3.阻塞式select操作
for (;;) { // 1.定時任務截至事時間快到了,中斷本次輪詢 ... // 2.輪詢過程中發現有任務加入,中斷本次輪詢 ... // 3.阻塞式select操作 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } .... } 複製程式碼
執行到這一步,說明netty任務佇列裡面佇列為空,並且所有定時任務延遲時間還未到(大於0.5ms),於是,在這裡進行一次阻塞select操作,截止到第一個定時任務的截止時間
這裡,我們可以問自己一個問題,如果第一個定時任務的延遲非常長,比如一個小時,那麼有沒有可能執行緒一直阻塞在select操作,當然有可能!But,只要在這段時間內,有新任務加入,該阻塞就會被釋放
外部執行緒呼叫execute方法新增任務
@Override public void execute(Runnable task) { ... wakeup(inEventLoop); // inEventLoop為false ... } 複製程式碼
呼叫wakeup方法喚醒selector阻塞
protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } 複製程式碼
可以看到,在外部執行緒新增任務的時候,會呼叫wakeup方法來喚醒 selector.select(timeoutMillis)
阻塞select操作結束之後,netty又做了一系列的狀態判斷來決定是否中斷本次輪詢,中斷本次輪詢的條件有
selectedKeys != 0 hasTasks hasScheduledTasks() wakenUp.get()
4.解決jdk的nio bug
關於該bug的描述見 ofollow,noindex">bugs.java.com/bugdatabase…
該bug會導致Selector一直空輪詢,最終導致cpu 100%,nio server不可用,嚴格意義上來說,netty沒有解決jdk的bug,而是通過一種方式來巧妙地避開了這個bug,具體做法如下
long currentTimeNanos = System.nanoTime(); for (;;) { // 1.定時任務截止事時間快到了,中斷本次輪詢 ... // 2.輪詢過程中發現有任務加入,中斷本次輪詢 ... // 3.阻塞式select操作 selector.select(timeoutMillis); // 4.解決jdk的nio bug long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; ... } 複製程式碼
netty 會在每次進行 selector.select(timeoutMillis)
之前記錄一下開始時間 currentTimeNanos
,在select之後記錄一下結束時間,判斷select操作是否至少持續了 timeoutMillis
秒(這裡將 time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
改成 time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
或許更好理解一些), 如果持續的時間大於等於timeoutMillis,說明就是一次有效的輪詢,重置 selectCnt
標誌,否則,表明該阻塞方法並沒有阻塞這麼長時間,可能觸發了jdk的空輪詢bug,當空輪詢的次數超過一個閥值的時候,預設是512,就開始重建selector
空輪詢閥值相關的設定程式碼如下
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512); if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) { selectorAutoRebuildThreshold = 0; } SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold; 複製程式碼
下面我們簡單描述一下netty 通過 rebuildSelector
來fix空輪詢bug的過程, rebuildSelector
的操作其實很簡單:new一個新的selector,將之前註冊到老的selector上的的channel重新轉移到新的selector上。我們抽取完主要程式碼之後的骨架如下
public void rebuildSelector() { final Selector oldSelector = selector; final Selector newSelector; newSelector = openSelector(); int nChannels = 0; try { for (;;) { for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); if (!key.isValid() || key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelector, interestOps, a); if (a instanceof AbstractNioChannel) { ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } break; } } catch (ConcurrentModificationException e) { // Probably due to concurrent modification of the key set. continue; } selector = newSelector; oldSelector.close(); } 複製程式碼
首先,通過 openSelector()
方法建立一個新的selector,然後執行一個死迴圈,只要執行過程中出現過一次併發修改selectionKeys異常,就重新開始轉移
具體的轉移步驟為
- 拿到有效的key
- 取消該key在舊的selector上的事件註冊
- 將該key對應的channel註冊到新的selector上
- 重新繫結channel和新的key的關係
轉移完成之後,就可以將原有的selector廢棄,後面所有的輪詢都是在新的selector進行
最後,我們總結reactor執行緒select步驟做的事情:不斷地輪詢是否有IO事件發生,並且在輪詢的過程中不斷檢查是否有定時任務和普通任務,保證了netty的任務佇列中的任務得到有效執行,輪詢過程順帶用一個計數器避開了了jdk空輪詢的bug,過程清晰明瞭
由於篇幅原因,下面兩個過程將分別放到一篇文章中去講述,盡請期待