1. 程式人生 > >Netty原始碼分析第2章(NioEventLoop)---->第8節: 執行任務佇列

Netty原始碼分析第2章(NioEventLoop)---->第8節: 執行任務佇列

 

Netty原始碼分析第二章: NioEventLoop

 

第八節: 執行任務佇列

繼續回到NioEventLoop的run()方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue
; case SelectStrategy.SELECT: //輪詢io事件(1) select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys
= 0; needsToSelectAgain = false; //預設是50 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } }
else { //記錄下開始時間 final long ioStartTime = System.nanoTime(); try { //處理輪詢到的key(2) processSelectedKeys(); } finally { //計算耗時 final long ioTime = System.nanoTime() - ioStartTime; //執行task(3) runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //程式碼省略 } }

我們看到處理完輪詢到的key之後, 首先記錄下耗時, 然後通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執行taskQueue中的任務

我們知道ioRatio預設是50, 所以執行完ioTime * (100 - ioRatio) / ioRatio, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執行時間:

 

跟進方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定時任務佇列中聚合任務
    fetchFromScheduledTaskQueue();
    //從普通taskQ裡面拿一個任務
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務執行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果佇列不為空
    //首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執行每一個任務
    for (;;) {
        safeExecute(task);
        //標記當前跑完的任務
        runTasks ++;
        //當跑完64個任務的時候, 會計算一下當前時間
        if ((runTasks & 0x3F) == 0) {
            //定時任務初始化到當前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時間則不執行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個時間, 則繼續從普通任務佇列拿任務
        task = pollTask();
        //直到沒有任務執行
        if (task == null) {
            //記錄下最後執行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先會執行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務佇列中聚合任務, 也就是將定時任務中找到可以執行的任務新增到taskQueue, 我們跟進去:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時任務佇列中抓取第一個定時任務
    //尋找截止時間為nanoTime的任務
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時任務佇列不為空, 則塞到普通任務佇列裡面
    while (scheduledTask != null) {
        //如果新增到普通任務佇列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新新增到定時任務佇列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續從定時任務佇列中拉取任務
        //方法執行完成之後, 所有符合執行條件的定時任務佇列, 都新增到了普通任務佇列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

long nanoTime = AbstractScheduledEventExecutor.nanoTime()代表從定時任務初始化到現在過去了多長時間

Runnable scheduledTask= pollScheduledTask(nanoTime)代表從定時任務佇列中拿到小於nanoTime時間的任務, 因為小於初始化到現在的時間, 說明該任務需要執行了

跟到其父類AbstractScheduledEventExecutorpollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    //拿到定時任務佇列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    //peek()方法拿到第一個任務
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }

    if (scheduledTask.deadlineNanos() <= nanoTime) {
        //從佇列中刪除
        scheduledTaskQueue.remove();
        //返回該
        return scheduledTask;
    }
    return null;
}

我們看到首先獲得當前類繫結的定時任務佇列的成員變數

如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個任務

如果當前任務小於傳來的時間, 說明該任務需要執行, 則從定時任務佇列中刪除

我們繼續回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時任務佇列中抓取第一個定時任務
    //尋找截止時間為nanoTime的任務
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時任務佇列不為空, 則塞到普通任務佇列裡面
    while (scheduledTask != null) {
        //如果新增到普通任務佇列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新新增到定時任務佇列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續從定時任務佇列中拉取任務
        //方法執行完成之後, 所有符合執行條件的定時任務佇列, 都新增到了普通任務佇列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

彈出需要執行的定時任務之後, 我們通過taskQueue.offer(scheduledTask)新增到taskQueue, 如果新增失敗, 則通過scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新新增到定時任務佇列中

如果新增成功, 則通過pollScheduledTask(nanoTime)方法繼續新增, 直到沒有需要執行的任務

這樣就將定時任務佇列需要執行的任務新增到了taskQueue

回到runAllTasks(long timeoutNanos)方法中:

首先通過Runnable task = pollTask()taskQueue中拿一個任務

任務不為空, 則通過final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos計算一個截止時間, 任務的執行時間不能超過這個時間

然後在for迴圈中通過safeExecute(task)執行task, 我們跟到safeExecute(task):

protected static void safeExecute(Runnable task) {
    try {
        //直接呼叫run()方法執行
        task.run();
    } catch (Throwable t) {
        //發生異常不終止
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

這裡直接呼叫taskrun()方法進行執行, 其中發生異常, 只打印一條日誌, 代表發生異常不終止, 繼續往下執行

回到runAllTasks(long timeoutNanos)方法:

每次執行完task, runTasks自增

這裡if ((runTasks & 0x3F) == 0)代表是否執行了64個任務, 如果執行了64個任務, 則會通過lastExecutionTime = ScheduledFutureTask.nanoTime()記錄定時任務初始化到現在的時間, 如果這個時間超過了截止時間, 則退出迴圈

如果沒有超過截止時間, 則通過task = pollTask()繼續彈出任務執行

這裡執行64個任務統計一次時間, 而不是每次執行任務都統計, 主要原因是因為獲取系統時間是個比較耗時的操作, 這裡是netty的一種優化方式

如果沒有task需要執行, 則通過afterRunningAllTasks()做收尾工作, 最後記錄下最後的執行時間

以上就是有關執行任務佇列的相關邏輯

 

 

本章總結

 

    本章學習了有關NioEventLoopGroup的建立, NioEventLoop的建立和啟動, 以及多路複用器的輪詢處理和task執行的相關邏輯, 通過本章學習, 我們應該掌握如下內容:

    1.NioEventLoopGroup如何選擇分配NioEventLoop

    2.NioEventLoop如何開啟

    3.NioEventLoop如何進行select操作

    4.NioEventLoop如何執行task