1. 程式人生 > >netty原始碼閱讀之NioEventLoop之NioEventLoop執行-----runAllTask

netty原始碼閱讀之NioEventLoop之NioEventLoop執行-----runAllTask

processSelectedKey()和runAllTask()

                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }

這裡有個ioRatio,就是執行io任務和非io任務的時間比。使用者可以自行設定。預設為50,可以看原始碼。如果是100的話,先執行完io任務,執行完之後才執行非io任務。我們現在學習比較複雜的第二種情況。

在runAllTasks(long timeoutNanos)裡面,主要分享以下三件事情:

1、task的分類和新增

2、任務的聚合

3、任務的真正執行

點進去runAllTasks(long timeoutNanos)方法的實現:

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

第一步是把定時任務讀出來, 第二步是從任務佇列裡面取任務,第三步是safeExecute()執行任務,通過(runTasks & 0x3F) == 0判斷執行的任務次數是否達到64個,如果到達,那麼就判斷是否到達我們非io任務的時間,是的話就退出,並且記錄下最後執行任務的時間lastExecutionTime,否則繼續取任務去執行。所有任務執行完了,會呼叫afterRunningAllTasks()進行收尾工作。

關於64,在原始碼裡面也就解釋,為什麼不執行完一次任務檢查一次?檢查也是相對耗時的操作,這裡他們用了硬編碼的方式,如果實在需要,可以修改為可配置的。

我們首先分析fetchFromScheduledTaskQueue()

    private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime);
        }
        return true;
    }

也就是每次從定時任務佇列scheduledTaskQueue裡面取出到期的任務,新增到我們taskQueue裡面,這就是任務的聚合。如果新增不成功,也就是有可能taskQueue滿了,就要添加回定時任務佇列,否者,這個任務可能丟失。

在深入一點看pollScheduledTask():

  protected final Runnable pollScheduledTask(long nanoTime) {
        assert inEventLoop();

        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
        if (scheduledTask == null) {
            return null;
        }

        if (scheduledTask.deadlineNanos() <= nanoTime) {
            scheduledTaskQueue.remove();
            return scheduledTask;
        }
        return null;
    }

在這裡可以看到,到期的定時任務會返回並從定時任務佇列裡面刪除。最快到期的定時任務會在最前面,關於這個定時任務佇列,我們繼續看this.scheduledTaskQueue這個的實現:

  Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
        }
        return scheduledTaskQueue;
    }

使用了優先佇列的實現,所以可以比較。而比較的方法在它的定時任務ScheduledFutureTask裡面:

@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
    ...

    @Override
    public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }

        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
        long d = deadlineNanos() - that.deadlineNanos();
        if (d < 0) {
            return -1;
        } else if (d > 0) {
            return 1;
        } else if (id < that.id) {
            return -1;
        } else if (id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }

    ...
}

可以看到,deadline時間最小的,是排在最前面的,如果deadline時間相同,那麼就比較id,可以嚴格保證任務的先後順序。

關於NioEventLoop的定時任務的實現,還有一個細節,在它父類AbstractScheduledEventExecutor的schedule方法裡面:

    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

如果丟定時任務的執行緒是NioEventLoop的執行緒,那麼就把它放到定時任務佇列裡面,這是新增定時任務,但是我們這裡關注的是,如果不是NioEventLoop的執行緒,那就會呼叫execute方法,新建一個執行緒來把任務丟到定時任務佇列,這個新建的執行緒最終會繫結到NioEventLoop,通過這種方式保證了執行緒的安全。細細體會作者這種方法,特別巧妙。

接下去就是Runnable task = pollTask();,進入這個方法,最終可以看到就是從我們的taskQueue裡面把在最前面的任務取出來,這裡的任務已經包括可能的定時任務了。

 protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task == WAKEUP_TASK) {
                continue;
            }
            return task;
        }
    }

最後是safeExecute(task),直接呼叫task的run方法:

/**
     * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
     */
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

如果執行錯誤,會列印日誌,不影響別的任務的使用。有的原始碼的實現,一個任務不行,別的任務也都崩潰了。