【4】Netty4原始碼分析-NioEventLoop實現的執行緒執行邏輯
轉自 http://xw-z1985.iteye.com/blog/1928244
在netty服務端啟動原始碼分析-執行緒建立一文中已分析SingleThreadEventExecutor所持有的執行緒的執行邏輯由NioEventLoop實現,那麼本文就著手分析NioEventLoop實現的執行緒執行邏輯:
// NioEventLoop protected void run() { for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
分析如下:
- ioEventLoop執行的任務分為兩大類:IO任務和非IO任務。IO任務即selectionKey中ready的事件,譬如accept、connect、read、write等;非IO任務則為新增到taskQueue中的任務,譬如之前文章中分析到的register0、bind、channelActive等任務
- 兩類任務的執行先後順序為:IO任務->非IO任務。IO任務由processSelectedKeysOptimized(selectedKeys.flip())或processSelectedKeysPlain(selector.selectedKeys())觸發;非IO任務由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)觸發
- 兩類任務的執行時間比由變數ioRatio控制,譬如:ioRatio=50(該值為預設值),則表示允許非IO任務執行的時間與IO任務的執行時間相等
- 執行IO任務前,需要先進行select,以判斷之前註冊過的channel是否已經有感興趣的事件ready
- 如果任務佇列中存在非IO任務,則執行非阻塞的selectNow()方法
// NioEventLoop void selectNow() throws IOException { try { selector.selectNow(); } finally { // restore wakup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
否則,執行阻塞的select()方法
// NioEventLoop
private void select() throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
// Selected something,
// waken up by user, or
// the task queue has a pending task.
break;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = System.nanoTime();
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}
下面分析阻塞的select方法:
- 首先執行delayNanos(currentTimeNanos):計算延遲任務佇列中第一個任務的到期執行時間(即最晚還能延遲執行的時間).注意:(每個SingleThreadEventExecutor都持有一個延遲執行任務的優先佇列:final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue
//SingleThreadEventExecutor
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return delayedTask.delayNanos(currentTimeNanos);
}
//ScheduledFutureTask
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
public long deadlineNanos() {
return deadlineNanos;
}
- 如果當前時間已經超過到期執行時間後的500000納秒(這個數字是如何定的?),則說明被延遲執行的任務不能再延遲了:如果在進入這個方法後還沒有執行過selectNow方法(由標記selectCnt是否為0來判斷),則先執行非阻塞的selectNow方法,然後立即返回;否則,立即返回
- 如果當前時間沒有超過到期執行時間後的500000L納秒,則說明被延遲執行的任務還可以再延遲,所以可以讓select的阻塞時間長一點(說不定多出的這點時間就能select到一個ready的IO任務),故執行阻塞的selector.select(timeoutMillis)方法
- 如果已經存在ready的selectionKey,或者該selector被喚醒,或者此時非IO任務佇列加入了新的任務,則立即返回
- 否則,本次執行selector.select(timeoutMillis)方法後的結果selectedKeys肯定為0,如果連續返回0的select次數還沒有超過SELECTOR_AUTO_REBUILD_THRESHOLD(預設值為512),則繼續下一次for迴圈。注意,根據以下演算法:long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L。隨著currentTimeNanos的增大,在進入第二次for迴圈時,正常情況下(即:在沒有selectionKey已ready的情況下,selector.select(timeoutMillis)確實阻塞了timeoutMillis毫秒才返回0)計算出的timeoutMillis肯定小於0,計算如下:
假設第一次和第二次進入for迴圈時的當前時間分currentTimeNanos1,currentTimeNanos2,由於在第一次迴圈中select阻塞了timeoutMillis1毫秒,所以currentTimeNanons2納秒 > currentTimeNanos1納秒+timeoutMillis1毫秒. 那麼,第二次的timeoutMillis2 = (selectDeadLineNanos – currentTimeNanos2 + 500000) / 1000000 < (selectDeadLineNanos – (currentTimeNanos1+timeoutMillis1*1000000)+ 500000) / 1000000 =
timeoutMillis1- timeoutMillis1=0
即:timeoutMillis2 < 0。因此第二次不會再進行select,直接跳出迴圈並返回
- 否則,如果連續多次返回0,說明每次呼叫selector.select(timeoutMillis)後根本就沒有阻塞timeoutMillis時間,而是立即就返回了,且結果為0. 這說明觸發了epool cpu100%的bug(https://github.com/netty/netty/issues/327)。解決方案就是對selector重新rebuild
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, a);
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
Rebuild的本質:其實就是重新建立一個selector,然後將原來的那個selector中已註冊的所有channel重新註冊到新的selector中,並將老的selectionKey全部cancel掉,最後將的selector關閉。對selector進行rebuild之後,還需要重新呼叫selectNow方法,檢查是否有已ready的selectionKey.
- 執行select()或者selectNow()後,如果已經有已ready的selectionKey,則開始執行IO操。processSelectedKeysOptimized和processSelectedKeysPlain的執行邏輯是很相似的
// NioEventLoop
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
此處僅分析processSelectedKeysOptimized方法,對於這兩個方法的區別暫時放下,後續再分析吧。processSelectedKeysOptimized的執行邏輯基本上就是迴圈處理每個select出來的selectionKey,每個selectionKey的處理首先根據attachment的型別來進行分發處理髮:如果型別為AbstractNioChannel,則執行一種邏輯;其他,則執行另外一種邏輯。此處,本文僅分析型別為AbstractNioChannel的處理邏輯,另一種邏輯的分析暫時放下,後續再分析。
在判斷attachment的型別前,首先需要弄清楚這個attatchment是何時關聯到selectionKey上的?還記得socket一文中分析的register0任務嗎? AbstractNioChannel類中有如下程式碼:
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
此處將this(即AbstractNioChannel)作為attachment關聯到selectionKey
現在開始分析型別為AbstractNioChannel的處理邏輯,首先看processSelectedKey(k, (AbstractNioChannel) a)的實現:
//NioEventLoop
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
int readyOps = -1;
try {
readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
processWritable(ch);
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
if (readyOps != -1 && (readyOps & SelectionKey.OP_WRITE) != 0) {
unregisterWritableTasks(ch);
}
unsafe.close(unsafe.voidPromise());
}
}
終於見到熟悉nio處理程式碼了,它根據selecionKey的readyOps的值進行分發,下一篇文章將分析readyOps為accept時的處理邏輯。關於final NioUnsafe unsafe = ch.unsafe(),還記得socket一文中分析的:NioUnsafe由AbstractChannel的子類AbstractNioMessageChannel例項化,其型別為NioMessageUnsafe,它裡面定義了read方法,即readyOps為accept的處理邏輯。
7. 執行完io任務後,接著執行非IO任務:runAllTasks(ioTime * (100 - ioRatio) / ioRatio)
//NioEventLoop
protected boolean runAllTasks(long timeoutNanos) {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
首先分析fetchFromDelayedQueue()方法,由父類SingleThreadEventExecutor實現
// SingleThreadEventExecutor
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
}
}
}
其功能是將延遲任務佇列(delayedTaskQueue)中已經超過延遲執行時間的任務遷移到非IO任務佇列(taskQueue)中.然後依次從taskQueue取出任務執行,每執行64個任務,就進行耗時檢查,如果已執行時間超過預先設定的執行時間,則停止執行非IO任務,避免非IO任務太多,影響IO任務的執行
總結:NioEventLoop實現的執行緒執行邏輯做了以下事情
- 先後執行IO任務和非IO任務,兩類任務的執行時間比由變數ioRatio控制,預設是非IO任務允許執行和IO任務相同的時間
- 如果taskQueue存在非IO任務,或者delayedTaskQueue存在已經超時的任務,則執行非阻塞的selectNow()方法,否則執行阻塞的select(time)方法
- 如果阻塞的select(time)方法立即返回0的次數超過某個值(預設為512次),說明觸發了epoll的cpu 100% bug,通過對selector進行rebuild解決:即重新建立一個selector,然後將原來的selector中已註冊的所有channel重新註冊到新的selector中,並將老的selectionKey全部cancel掉,最後將老的selector關閉
- 如果select的結果不為0,則依次處理每個ready的selectionKey,根據readyOps的值,進行不同的分發處理,譬如accept、read、write、connect等
- 執行完IO任務後,再執行非IO任務,其中會將delayedTaskQueue已超時的任務加入到taskQueue中。每執行64個任務,就進行耗時檢查,如果已執行時間超過通過ioRatio和之前執行IO任務的耗時計算出來的非IO任務預計執行時間,則停止執行剩下的非IO任務