1. 程式人生 > >死磕Netty原始碼之Reactor執行緒模型詳解(二)NioEventLoop的執行

死磕Netty原始碼之Reactor執行緒模型詳解(二)NioEventLoop的執行

前言

在上一篇部落格介紹了NioEventLoop的啟動流程,這篇部落格將介紹NioEventLoop的執行流程

NioEventLoop執行

NioEventLoop的執行在run方法中完成,程式碼如下

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue
; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } processSelectedKeys(); runAllTasks(ioTime * (100
- ioRatio) / ioRatio); } catch (Throwable t) { handleLoopException(t); } } }

NioEventLoop執行過程大致可以分為如下三個步驟

1.輪詢檢測IO事件
2.處理產生IO事件
3.處理非同步任務佇列

輪詢檢測IO事件

輪詢檢測IO事件在以下程式碼中完成

select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
    selector.wakeup();
}

wakenUp屬性表示是否應該喚醒正在阻塞的select操作,可以看到Netty在進行一次新的輪詢之前,都會將wakeUp被設定成false,標誌新的一輪輪詢的開始。接下來我們來看一下具體的select操作,它可以分為以下三個部分

定時任務截止事時間快到了,中斷本次輪詢

int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    if (timeoutMillis <= 0) {
        if (selectCnt == 0) {
            selector.selectNow();
            selectCnt = 1;
        }
        break;
    }
    ....
}

我們可以看到NioEventLoop中Reactor執行緒的select操作也是一個for迴圈,在for迴圈第一步中如果發現當前的定時任務佇列中有任務的截止事件快到了(<=0.5ms)就跳出迴圈。此外跳出之前如果發現目前為止還沒有進行過select操作那麼就會呼叫一次selectNow(),該方法會立即返回不會阻塞。Netty裡面定時任務佇列是按照延遲時間從小到大進行排序

protected long delayNanos(long currentTimeNanos) {
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    if (scheduledTask == null) {
        return SCHEDULE_PURGE_INTERVAL;
    }

    return scheduledTask.delayNanos(currentTimeNanos);
}

delayNanos方法取出的是第一個定時任務的延遲時間,如果沒有任務預設值為1秒

輪詢過程中發現有任務加入或被喚醒,中斷本次輪詢

for (;;) {
    // 1.定時任務截至事時間快到了,中斷本次輪詢
    ...

    // 2.輪詢過程中發現有任務加入,中斷本次輪詢
    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
        selector.selectNow();
        selectCnt = 1;
        break;
    }

    ....
}

判斷任務佇列是否為空或者是否被喚醒,如果不為空或者被喚醒就執行一次非阻塞select操作,跳出迴圈立即返回

阻塞式select操作

for (;;) {
    // 1.定時任務截至事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程中發現有任務加入,中斷本次輪詢
    ...
    // 3.阻塞式select操作
    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;
    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
        break;
    }
    ....
}

執行到這一步說明Netty任務佇列裡面佇列為空,並且所有定時任務延遲時間還未到(大於0.5ms),於是在這裡進行一次阻塞式select操作,截止到第一個定時任務的截止時間。如果第一個定時任務的延遲非常長,比如一個小時,那麼執行緒有可能一直阻塞在select操作,但是隻要在這段時間內有新任務加入,該阻塞就會被釋放

外部執行緒呼叫execute方法新增任務

@Override
public void execute(Runnable task) { 
    ...
    // inEventLoop為false
    wakeup(inEventLoop); 

    ...
}

呼叫wakeup方法喚醒selector阻塞

protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

可以看到在外部執行緒新增任務的時候,會呼叫wakeup方法來喚醒selector.select(timeoutMillis)

阻塞select操作結束之後,Netty又做了一系列的狀態判斷來決定是否中斷本次輪詢,中斷本次輪詢的條件有

輪詢到IO事件
oldWakenUp引數為true
任務佇列裡面有任務hasTasks
第一個定時任務即將要被執行
使用者主動喚醒 => wakenUp.get()

避免JDK空輪詢BUG

long currentTimeNanos = System.nanoTime();
for (;;) {
    // 1.定時任務截止事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程中發現有任務加入,中斷本次輪詢
    ...
    // 3.阻塞式select操作
    selector.select(timeoutMillis);
    // 4.解決jdk的nio bug
    long time = System.nanoTime();
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
        selectCnt = 1;
    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

        rebuildSelector();
        selector = this.selector;
        selector.selectNow();
        selectCnt = 1;
        break;
    }
    currentTimeNanos = time; 
    ...
 }

Netty在每次進行selector.select(timeoutMillis)之前記錄一下開始時間currentTimeNanos,在select之後記錄一下結束時間,判斷select操作是否至少持續timeoutMillis秒(將time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或許更好理解一些),如果持續的時間大於等於timeoutMillis說明就是一次有效的輪詢重置selectCnt標誌,否則表明該阻塞方法並沒有阻塞這麼長時間,可能觸發了JDK的空輪詢BUG,當空輪詢的次數超過一個閥值的時候(預設是512)就開始重建selector。接下來分析一下Netty的rebuildSelector過程

public void rebuildSelector() {
    final Selector oldSelector = selector;
    final Selector newSelector;
    newSelector = openSelector();

    int nChannels = 0;
     try {
        for (;;) {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                     if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                         continue;
                     }
                     int interestOps = key.interestOps();
                     key.cancel();
                     SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                     if (a instanceof AbstractNioChannel) {
                         ((AbstractNioChannel) a).selectionKey = newKey;
                      }
                     nChannels ++;
                }
                break;
        }
    } catch (ConcurrentModificationException e) {
        continue;
    }
    selector = newSelector;
    oldSelector.close();
}

rebuildSelector的操作其實很簡單:new一個新的selector,將之前註冊到老的selector上的channel重新轉移到新的selector上。它是通過openSelector()方法建立一個新的selector然後執行一個死迴圈,只要執行過程中出現過一次併發修改selectionKeys異常就重新開始轉移,具體的轉移步驟為

1.拿到有效的key
2.取消該key在舊的selector上的事件註冊
3.將該key對應的channel註冊到新的selector上
4.重新繫結channel和新的key的關係

轉移完成之後就可以將原有的selector廢棄,後面所有的輪詢都是在新的selector進行

處理產生IO事件

處理IO事件的過程是在processSelectedKeys()中完成

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

這裡的selectedKeys是SelectedSelectionKeySet物件的例項,它是在NioEventLoop的構造方法中呼叫的openSelector時初始化的

private Selector NioEventLoop.openSelector() {
    //...
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    // selectorImplClass -> sun.nio.ch.SelectorImpl
    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    selectedKeysField.setAccessible(true);
    publicSelectedKeysField.setAccessible(true);
    selectedKeysField.set(selector, selectedKeySet);
    publicSelectedKeysField.set(selector, selectedKeySet);
    //...
    selectedKeys = selectedKeySet;
}

通過反射將selectedKeys與sun.nio.ch.SelectorImpl中的兩個field繫結,這兩個field其實是兩個HashSet。selector在呼叫select()方法的時候如果有IO事件發生,就會往裡面的兩個field中塞相應的selectionKey,相當於往一個hashSet中add元素,Netty通過反射將jdk中的兩個field替換掉,接下來我們看一下Netty自定義SelectedSelectionKeySet的add方法做了哪些優化?

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;

    SelectedSelectionKeySet() {
        keysA = new SelectionKey[1024];
        keysB = keysA.clone();
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        if (isA) {
            int size = keysASize;
            keysA[size ++] = o;
            keysASize = size;
            if (size == keysA.length) {
                doubleCapacityA();
            }
        } else {
            int size = keysBSize;
            keysB[size ++] = o;
            keysBSize = size;
            if (size == keysB.length) {
                doubleCapacityB();
            }
        }

        return true;
    }

    private void doubleCapacityA() {
        SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
        System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
        keysA = newKeysA;
    }

    private void doubleCapacityB() {
        SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
        System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
        keysB = newKeysB;
    }

    SelectionKey[] flip() {
        if (isA) {
            isA = false;
            keysA[keysASize] = null;
            keysBSize = 0;
            return keysA;
        } else {
            isA = true;
            keysB[keysBSize] = null;
            keysASize = 0;
            return keysB;
        }
    }

    @Override
    public int size() {
        if (isA) {
            return keysASize;
        } else {
            return keysBSize;
        }
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        throw new UnsupportedOperationException();
    }
}

該類繼承了AbstractSet說明該類可以當作一個set來用,但是底層使用兩個陣列來交替使用,在add方法中判斷當前使用哪個陣列,找到對應的陣列,然後經歷下面三個步驟

1.將SelectionKey塞到該陣列的邏輯尾部
2.更新該陣列的邏輯長度+1
3.如果該陣列的邏輯長度等於陣列的物理長度,就將該陣列擴容

待程式跑過一段時間,等陣列的長度足夠長每次在輪詢到NIO事件的時候,Netty只需要O(1)的時間複雜度就能將 SelectionKey塞到set中去,而JDK底層使用的hashSet需要O(lgn)的時間複雜度,接下來我們繼續跟進processSelectedKeysOptimized方法

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
     for (int i = 0;; i ++) {
         // 1.取出IO事件以及對應的channel
         final SelectionKey k = selectedKeys[i];
         if (k == null) {
             break;
         }
         selectedKeys[i] = null;
         final Object a = k.attachment();
         // 2.處理該channel
         if (a instanceof AbstractNioChannel) {
             processSelectedKey(k, (AbstractNioChannel) a);
         } else {
             NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
             processSelectedKey(k, task);
         }
         // 3.判斷是否該再來次輪詢
         if (needsToSelectAgain) {
             for (;;) {
                 i++;
                 if (selectedKeys[i] == null) {
                     break;
                 }
                 selectedKeys[i] = null;
             }
             selectAgain();
             selectedKeys = this.selectedKeys.flip();
             i = -1;
         }
     }
}

取出IO事件以及對應的Netty Channel類

final SelectionKey k = selectedKeys[i];
if (k == null) {
    break;
}
selectedKeys[i] = null;
final Object a = k.attachment();

這裡其實也能體會到優化過的SelectedSelectionKeySet的好處,遍歷的時候遍歷的是陣列相對JDK原生的HashSet效率有所提高

拿到當前SelectionKey之後將selectedKeys[i]置為null,這裡解釋一下這麼做的理由:想象一下這種場景,假設一個NioEventLoop平均每次輪詢出N個IO事件高峰期輪詢出3N個事件,那麼selectedKeys的物理長度要大於等於3N,如果每次處理這些key不設定selectedKeys[i]為空,高峰期一過這些存在陣列尾部的selectedKeys[i]對應的SelectionKey將一直無法被回收,SelectionKey對應的物件可能不大,但是它的attachment可能很大,這樣一來這些元素是GC Root可達的很容易造成GC不掉,記憶體洩漏就發生了

處理該channel

processSelectedKey(k, (AbstractNioChannel) a);

接下來分析一下這裡的attachment物件是啥玩意,我們回顧一下Channel註冊的過程

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

javaChannel()返回Netty類SelectableChannel對應的JDK底層channel物件

protected SelectableChannel javaChannel() {
    return ch;
}

檢視SelectableChannel的register方法,不難推斷出Netty的輪詢註冊機制其實是將SelectableChannel物件註冊到JDK類Selctor物件上去,並且將AbstractNioChannel類作為一個attachment附屬上,這樣在JDK輪詢出某條SelectableChannel有IO事件發生時,就可以直接取出AbstractNioChannel進行後續操作,關於processSelectedKey(SelectionKey k, AbstractNioChannel ch)我們將在下一篇部落格中詳細介紹

判斷是否該再來次輪詢

if (needsToSelectAgain) {
    for (;;) {
        i++;
        if (selectedKeys[i] == null) {
            break;
        }
        selectedKeys[i] = null;
    }
    selectAgain();
    selectedKeys = this.selectedKeys.flip();
    i = -1;
}

每次在抓到IO事件之後都會將needsToSelectAgain重置為false,那麼什麼時候needsToSelectAgain會重新被設定成true呢?這裡的needsToSelectAgain物件通過開發工具可以很方便的找到被引用的地方

void cancel(SelectionKey key) {
    key.cancel();
    cancelledKeys ++;
    if (cancelledKeys >= CLEANUP_INTERVAL) {
        cancelledKeys = 0;
        needsToSelectAgain = true;
    }
}

繼續檢視cancel函式被呼叫的地方

protected void doDeregister() throws Exception {
    eventLoop().cancel(selectionKey());
}

不難看出在Channel從selector上移除的時候呼叫cancel函式將key取消,並且當被去掉的key到達CLEANUP_INTERVAL(預設值為256)的時候設定needsToSelectAgain為true。每滿256次就會進入到if的程式碼塊,首先將selectedKeys的內部陣列全部清空,方便被JVM垃圾回收,然後重新呼叫selectAgain重新填裝一下selectionKey。Netty這麼做的目的我想應該是每隔256次channel斷線,重新清理一下selectionKey保證現存的selectionKey及時有效

處理非同步任務佇列

我們取三種典型的Task使用場景來分析

使用者自定義普通任務

ctx.channel().eventLoop().execute(new Runnable() {
    @Override
    public void run() {
        //...
    }
});

我們跟進execute方法,看重點

public void execute(Runnable task) {
    //...
    addTask(task);
    //...
}

execute方法呼叫addTask方法

protected void addTask(Runnable task) {
    // ...
    if (!offerTask(task)) {
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    // ...
    return taskQueue.offer(task);
}

跟到offerTask方法基本上task就落地了,Netty內部使用一個taskQueue將task儲存起來,那麼這個taskQueue又是何方神聖

private final Queue<Runnable> taskQueue;

taskQueue = newTaskQueue(this.maxPendingTasks);

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return PlatformDependent.newMpscQueue(maxPendingTasks);
}

taskQueue在SingleThreadEventExecutor構造方法中被初始化,我們發現taskQueue在NioEventLoop中預設是mpsc佇列,mpsc佇列即多生產者單消費者佇列,Netty使用mpsc方便的將外部執行緒的task聚集,在Reactor執行緒內部用單執行緒來序列執行,我們可以借鑑Netty的任務執行模式來處理類似多執行緒資料上報,定時聚合的應用。在本節討論的任務場景中,所有程式碼的執行都是在Reactor執行緒中的,所以所有呼叫inEventLoop()的地方都返回true,既然都是在reactor執行緒中執行那麼其實這裡的mpsc佇列其實沒有發揮真正的作用,mpsc大顯身手的地方其實在第二種場景

非當前reactor執行緒呼叫channel的各種方法

channel.write(...)

它會呼叫AbstractChannelHandlerContext的write方法

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // ...
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

外部執行緒在呼叫write的時候executor.inEventLoop()會返回false,直接進入到else分支將write封裝成一個WriteTask(這裡僅僅是write而沒有flush,因此flush引數為false), 然後呼叫safeExecute方法

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
    // ...
    executor.execute(runnable);
    // ...
}

接下來的呼叫鏈就進入到第一種場景了,但是和第一種場景有個明顯的區別就是,第一種場景的呼叫鏈的發起執行緒是Reactor執行緒,第二種場景的呼叫鏈的發起執行緒是使用者執行緒,使用者執行緒可能會有很多個,顯然多個執行緒併發寫taskQueue可能出現執行緒同步問題,於是這種場景下Netty的mpsc queue就有了用武之地

使用者自定義定時任務

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {

    }
}, 60, TimeUnit.SECONDS);

第三種場景就是定時任務邏輯了,用的最多的便是如上方法:在一定時間之後執行任務,我們跟進schedule方法

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // ...
    return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

通過ScheduledFutureTask將使用者自定義任務再次包裝成一個Netty內部的任務

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        execute(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

在執行定時任務前Netty會先判斷當前是否在內部執行緒進行呼叫,如果是在內部執行緒則直接將任務新增進佇列,如果是在外部執行緒呼叫schedule,Netty將新增定時任務的邏輯封裝成一個普通的task,這個task的任務是新增[新增定時任務]的任務,而不是新增定時任務,其實也就是第二種場景,這樣對PriorityQueue的訪問就變成單執行緒即只有Reactor執行緒,確保執行緒安全

scheduledTaskQueue()方法會返回一個優先順序佇列,然後呼叫add方法將定時任務加入到佇列中去

Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
    }
    return scheduledTaskQueue;
}

接下來我們分析一下優先順序佇列中定時任務的比較規則,程式碼如下

public int compareTo(Delayed o) {
    if (this == o) {
        return 0;
    }

    ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
    long d = deadlineNanos() - that.deadlineNanos();
    if (d < 0) {
        return -1;
    } else if (d > 0) {
        return 1;
    } else if (id < that.id) {
        return -1;
    } else if (id == that.id) {
        throw new Error();
    } else {
        return 1;
    }
}

兩個定時任務的比較是先比較任務的截止時間,截止時間相同的情況下再比較id(即任務新增的順序),若ID再相同拋異常,這樣在執行定時任務的時候就能保證最近截止時間的任務先執行。下面我們再來看下Netty是如何來保證各種定時任務的執行的,Netty裡面的定時任務分以下三種

1.若干時間後執行一次
2.每隔一段時間執行一次
3.每次執行結束,隔一定時間再執行一次

Netty使用一個periodNanos來區分這三種情況

public void run() {
    if (periodNanos == 0) {
        V result = task.call();
        setSuccessInternal(result);
    } else { 
        task.call();
        long p = periodNanos;
        if (p > 0) {
            deadlineNanos += p;
        } else {
            deadlineNanos = nanoTime() - p;
        }
            scheduledTaskQueue.add(this);
        }
    }
}

if(periodNanos == 0)對應若干時間後執行一次的定時任務型別執行完了該任務就結束了,否則進入到else程式碼塊先執行任務然後再區分是哪種型別的任務,periodNanos大於0表示是以固定頻率執行某個任務和任務的持續時間無關,然後設定該任務的下一次截止時間為本次的截止時間加上間隔時間periodNanos,否則就是每次任務執行完畢之後間隔多長時間之後再次執行,截止時間為當前時間加上間隔時間(-p就表示加上一個正的間隔時間),最後將當前任務物件再次加入到佇列實現任務的定時執行

任務的排程

在瞭解了Netty內部的任務新增機制後,我們回到處理非同步任務佇列方法中

runAllTasks(long timeoutNanos);

這行程式碼表示了儘量在一定的時間內將所有的任務都取出來run一遍,timeoutNanos表示該方法最多執行這麼長時間(這裡有個ioRatio的變數意思是IO任務所佔的比重(預設是50),也就是說IO任務和非IO任務所分配的時間是1:1)。因為如果Reactor執行緒在此停留的時間過長,那麼將積攢許多的IO事件無法處理最終導致大量客戶端請求阻塞,因此預設情況下Netty將控制內部佇列的執行時間

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    //...

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        safeExecute(task);
        runTasks ++;
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

這段程式碼便是Reactor執行task的所有邏輯,可以拆解成下面幾個步驟

1.從scheduledTaskQueue轉移定時任務到taskQueue(mpsc queue)
2.計算本次任務迴圈的截止時間
3.執行任務
4.收尾

從scheduledTaskQueue轉移定時任務到taskQueue(mpsc queue)

首先呼叫fetchFromScheduledTaskQueue()方法,將到期的定時任務轉移到mpsc queue裡面

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

NioEventLoop會維護兩個任務佇列(一個定時任務佇列一個正常任務佇列),這裡的邏輯就是把定時任務佇列中已經到執行時間的任務取出來放到正常的任務佇列中去,來看下pollScheduledTask這個方法

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }

    if (scheduledTask.deadlineNanos() <= nanoTime) {
        scheduledTaskQueue.remove();
        return scheduledTask;
    }
    return null;
}

可以看到每次pollScheduledTask的時候,只有在當前任務的截止時間已經到了才會取出來

計算本次任務迴圈的截止時間

Runnable task = pollTask();
// ...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;

這一步將取出第一個任務用Reactor執行緒傳入的超時時間timeoutNanos來計算出當前任務迴圈的deadline,並且使用runTasks,lastExecutionTime來時刻記錄任務的狀態

迴圈執行任務

for (;;) {
    safeExecute(task);
    runTasks ++;
    if ((runTasks & 0x3F) == 0) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        if (lastExecutionTime >= deadline) {
            break;
        }
    }

    task = pollTask();
    if (task == null) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        break;
    }
}

這一步便是Netty裡面執行所有任務的核心程式碼了。首先呼叫safeExecute來確保任務安全執行忽略任何異常

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

然後將已執行任務runTasks加一,每隔0x3F任務即每執行完64個任務之後判斷當前時間是否超過本次reactor任務迴圈的截止時間了,如果超過那就break掉,如果沒有超過那就繼續執行。可以看到Netty對效能的優化考慮地相當的周到,假設Netty任務佇列裡面如果有海量小任務,如果每次都要執行完任務都要判斷一下是否到截止時間,那麼效率是比較低下的

收尾

afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;

收尾工作很簡單,呼叫一下afterRunningAllTasks方法

protected void afterRunningAllTasks() {
    runAllTasksFrom(tailTasks);
}

NioEventLoop可以通過父類SingleThreadEventLoop的executeAfterEventLoopIteration方法向tailTasks中新增收尾任務,比如你想統計一下一次執行一次任務迴圈花了多長時間就可以呼叫此方法

public final void executeAfterEventLoopIteration(Runnable task) {
        // ...
        if (!tailTasks.offer(task)) {
            reject(task);
        }
        // ...
}

至此Reactor執行緒模型原始碼分析完畢,在下一篇部落格中將介紹新連線的接入過程