1. 程式人生 > >4.Netty執行IO事件和非IO任務

4.Netty執行IO事件和非IO任務

  • 回顧NioEventLoop的run方法流程
  • IO事件與非IO任務
    • 處理IO事件
    • 處理非IO任務
      • 聚合定時任務到普通任務佇列
      • 從普通佇列中獲取任務
      • 計算任務執行的超時時間
      • 安全執行
      • 計算是否超時
    • 總結

回顧NioEventLoop的run方法流程

上文說到NioEventLoop的run方法可以分為3個步驟:

  1. 輪詢channel中就緒的IO事件
  2. 處理輪詢出的IO事件
  3. 處理所有任務,也包括定時任務

其中步驟1已在上一節講述,這裡接著講述下面2個步驟

IO事件與非IO任務

首先看一下在步驟2和步驟3的主幹程式碼

final int ioRatio = this.ioRatio;
// 將所有任務執行完
if (ioRatio == 100) {
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        runAllTasks();
    }
} else {
    // 記錄IO事件消耗的時間,然後按比例處理分配時間處理非IO任務
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
        // ioRatio預設50,(100-ioRatio)/ioRatio剛好等於1,做到平均分配
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

ioRadio是NioEventLoop的一個成員變數,用來控制分配花費在IO事件與非IO任務時間的比例。預設情況下,ioRadio是50,表示IO事件與非IO任務
將分配相同時間。而當ioRatio為100時,該值失效,不再平衡兩種動作的時間分配比值。
瞭解了這一點,上述兩種分支程式碼就不難理解了,我們直接進入processSelectedKeys,看看netty如何執行IO事件

處理IO事件

先進入processSelectedKeys方法內部。

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

可以看到這裡又根據selectedKeys是否為空這個條件來確定是處理優化過的keys還是普通keys。關於selectedKeys,在NioEventLoop介紹這一節中,
我們介紹了NioEventLoop的建立,在建立過程中,預設會將SelectedKeys由Hashset替換為陣列實現,此處的selectedKeys正是替換過後的實現。
我們繼續跟進到processSelectedKeysOptimized方法

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

方法內部用一個for迴圈處理selectedKeys。key的attchment預設是在註冊時附加上去的NioServerSocketChannel和NioSocketChannel。
繼續跟進processSelectedKey(k, (AbstractNioChannel) a)方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop = ch.eventLoop();  
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        unsafe.close(unsafe.voidPromise());
        return;
    }

    int readyOps = k.readyOps();
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
        unsafe.finishConnect();
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}

netty首先對selectionKey的有效性做了一個判斷。當key無效時,關閉key所在的channel。當key有效時,委託NioUnsafe物件對key進行IO操作。
注意這裡先進行OP_CONNECT,再執行OP_WRITE,最後執行OP_READ和OP_ACCEPT。關於Unsafe的這些IO操作留待以後分析。

processSelectedKeysPlain方法流程類似,略過

處理非IO任務

由於IoRatio預設為50,我們先進入runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法。

protected boolean runAllTasks(long timeoutNanos) {
    // 步驟1
    fetchFromScheduledTaskQueue();
    // 步驟2
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    // 步驟3
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 步驟4
        safeExecute(task);
        runTasks ++;
        // 步驟5
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 步驟6
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

非IO任務的執行可以分為6個步驟

  1. 從定時任務佇列聚合任務到普通任務佇列
  2. 從普通佇列中獲取任務
  3. 計算任務執行的超時時間
  4. 安全執行任務
  5. 任務執行到一定次數,計算是否超時
  6. 執行完taskQueue普通佇列裡的任務後,再去執行tailTaskQueue裡的任務。但目前暫時沒有看到tailTaskQueue使用的地方,也許是一個擴充套件點吧,這裡先略過。

我們一個一個步驟講解

聚合定時任務到普通任務佇列

首先看一下整體流程

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {
            return true;
        }
        if (!taskQueue.offer(scheduledTask)) {
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

首先先判斷定時任務佇列是否有任務,然後呼叫了一個AbstractScheduledEventExecutor.nanoTime(),該方法返回ScheduledFutureTask類從初始化
到當前時刻的差值。也即將ScheduledFutureTask初始化的時刻當成零時刻。
獲取到零時刻到當前時刻的差值後,用一個for迴圈不斷去定時任務佇列裡獲取終止時刻在當前時刻之後的任務(scheduledTask.deadlineNanos() - nanoTime<=0)
當獲取到定時任務後,將它新增到普通任務佇列taskQueue裡。同時新增失敗後,還會再重新添加回定時任務佇列,防止任務直接丟失。

說到定時任務佇列,也少不了一探其實現。scheduledTaskQueue初始化程式碼如下:

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new DefaultPriorityQueue<>(
                SCHEDULED_FUTURE_TASK_COMPARATOR,
                11);
    }
    return scheduledTaskQueue;
}

採用的是一個懶載入的方式,在呼叫scheduledTaskQueue()建立定時任務時才進行初始化。從名字可以看出,它是一個優先順序佇列,初始化容量為11,
採用的Comparator是呼叫2個ScheduledFutureTask的compareTo方法,首先比較任務的終止時間,然後比較兩個任務的id。程式碼較簡單,就不列了。

然後我們看下排程方法schedule

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task.setId(nextTaskId++));
    } else {
        executeScheduledRunnable(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task.setId(nextTaskId++));
            }
        }, true, task.deadlineNanos());
    }
    return task;
}

可以發現,netty將"新增定時任務"也當做一個任務,放入任務佇列裡。

從普通佇列中獲取任務

// NioEventLoop中定義的pollTask方法
protected Runnable pollTask() {
    Runnable task = super.pollTask();
    if (needsToSelectAgain) {
        selectAgain();
    }
    return task;
}
// super.pollTask呼叫了此方法,定義在SingleThreadEventExecutor中
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task != WAKEUP_TASK) {
                return task;
            }
        }
    }

這裡依然是通過輪詢從任務佇列裡取出任務,並且忽略WAKEUP_TASK這個標記性任務。

計算任務執行的超時時間

在當前時間上,加上IO事件執行的時間,作為非IO任務執行的超時時間

安全執行

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

捕獲所有異常,使得定時任務報錯時不退出

計算是否超時

由於nanoTime()是一個相對耗時的操作,netty預設執行了64次非IO任務後,才計算是否超時。若執行了超過64個任務沒或者任務佇列已經沒有任務,
就打斷迴圈,並將當前時間更新為lastExecutionTime。

總結

到了這裡,我們已經介紹完了大部分NioEventLoop的內容,限於筆者水平和文章篇幅,nioEventLoop所使用的任務佇列MpscQueue和ScheduleFutureTask
內部執行原理不再進一步深究。但這也已經足夠對NioEventLoop塑造一個比較整體性的認識了