netty原始碼閱讀之NioEventLoop之NioEventLoop執行-----runAllTask
processSelectedKey()和runAllTask()
final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); }
這裡有個ioRatio,就是執行io任務和非io任務的時間比。使用者可以自行設定。預設為50,可以看原始碼。如果是100的話,先執行完io任務,執行完之後才執行非io任務。我們現在學習比較複雜的第二種情況。
在runAllTasks(long timeoutNanos)裡面,主要分享以下三件事情:
1、task的分類和新增
2、任務的聚合
3、任務的真正執行
點進去runAllTasks(long timeoutNanos)方法的實現:
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
第一步是把定時任務讀出來, 第二步是從任務佇列裡面取任務,第三步是safeExecute()執行任務,通過(runTasks & 0x3F) == 0判斷執行的任務次數是否達到64個,如果到達,那麼就判斷是否到達我們非io任務的時間,是的話就退出,並且記錄下最後執行任務的時間lastExecutionTime,否則繼續取任務去執行。所有任務執行完了,會呼叫afterRunningAllTasks()進行收尾工作。
關於64,在原始碼裡面也就解釋,為什麼不執行完一次任務檢查一次?檢查也是相對耗時的操作,這裡他們用了硬編碼的方式,如果實在需要,可以修改為可配置的。
我們首先分析fetchFromScheduledTaskQueue()
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
也就是每次從定時任務佇列scheduledTaskQueue裡面取出到期的任務,新增到我們taskQueue裡面,這就是任務的聚合。如果新增不成功,也就是有可能taskQueue滿了,就要添加回定時任務佇列,否者,這個任務可能丟失。
在深入一點看pollScheduledTask():
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
在這裡可以看到,到期的定時任務會返回並從定時任務佇列裡面刪除。最快到期的定時任務會在最前面,關於這個定時任務佇列,我們繼續看this.scheduledTaskQueue這個的實現:
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
使用了優先佇列的實現,所以可以比較。而比較的方法在它的定時任務ScheduledFutureTask裡面:
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
...
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
...
}
可以看到,deadline時間最小的,是排在最前面的,如果deadline時間相同,那麼就比較id,可以嚴格保證任務的先後順序。
關於NioEventLoop的定時任務的實現,還有一個細節,在它父類AbstractScheduledEventExecutor的schedule方法裡面:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
如果丟定時任務的執行緒是NioEventLoop的執行緒,那麼就把它放到定時任務佇列裡面,這是新增定時任務,但是我們這裡關注的是,如果不是NioEventLoop的執行緒,那就會呼叫execute方法,新建一個執行緒來把任務丟到定時任務佇列,這個新建的執行緒最終會繫結到NioEventLoop,通過這種方式保證了執行緒的安全。細細體會作者這種方法,特別巧妙。
接下去就是Runnable task = pollTask();,進入這個方法,最終可以看到就是從我們的taskQueue裡面把在最前面的任務取出來,這裡的任務已經包括可能的定時任務了。
protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
最後是safeExecute(task),直接呼叫task的run方法:
/**
* Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
*/
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
如果執行錯誤,會列印日誌,不影響別的任務的使用。有的原始碼的實現,一個任務不行,別的任務也都崩潰了。