netty原始碼解解析(4.0)-6 執行緒模型-IO執行緒EventLoopGroup和NIO實現(一)
介面定義
io.netty.channel.EventLoopGroup extends EventExecutorGroup
方法 |
說明 |
ChannelFuture register(Channel channel) |
把一個channel註冊到一個EventLoop |
ChannelFuture register(Channel channel, ChannelPromise promise); |
同上 |
io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup
方法 |
說明 |
EventLoopGroup parent() |
得到建立這個eventLoop的EventLoopGroup |
EventLoopGroup定義的主要方法是register, 這個方法的語義是把channel和eventLoop繫結在一起。一個channel對應一個eventLoop, 一個eventLoop會持有多個channel。
I/O執行緒EventLoopGroup的抽象實現
io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
兩個類主功能都是實現了EventLoopGroup定義的register方法
MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
channel.unsafe().register(this, promise);
return promise;
}
register的實現主要是為了呼叫Channel.Unsafe例項的register方法。
NIO實現
io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup
io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop
NioEventLoopGroup是在MultithreadEventLoopGroup基礎上實現了對JDK NIO Selector的封裝, 它實現以下幾個功能:
- 建立selector
- 在selector上註冊channel感興趣的NIO事件
- 實現EventExecutor的run方法,定義NIO事件和Executor任務的處理流程。
- 把NIO事件轉換成對channel unsafe的呼叫或NioTask的呼叫
- 控制執行緒執行I/O操作和排隊任務的用時比例
- 處理epoll selector cpu 100%的bug
下面來具體分析這幾個功能的實現。
建立Selector
NioEventLoop#openSelector()實現了建立selector的功能,預設情況下,使用SelectorProvider#openSelector()方法建立一個新個selector:
final Selector unwrappedSelector = provider.openSelector();
如果設定環境變數io.netty.noKeySetOptimization=true, 會建立一個selectedKeySet = new SelectedSelectionKeySet(), 然後使用java的反射機制把selector的selectedKeys和publicSelectedKeys替換成selectedKeySet,具體步驟是:
1.得到selector的真正型別: sun.nio.ch.SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl ",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
2.替換selector是屬性unwrappedSelector
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys ");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys ");
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
之所以會設計一個這樣的優化選項,是因為一般情況下呼叫完selector的select或selectNow方法後需要呼叫Selector#selectedKeys()得到觸發NIO事件的的SelectableChannel,這樣優化之後,可以直接從selectedKeySet中得到已經觸發了NIO事件的SelectableChannel。
在selector上註冊channel感興趣的NIO事件
NioEventLoop提供了unwrappedSelector方法,這個方法返回了它建立好的Selector例項。這樣任何的外部類都可以把任意的SelectableChannel註冊到這selector上。在AbstractNioChannel中, doRegister方法的實現就是使用了這個方法:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
另外,它還提供了一個register方法:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
這個方法會把task當成SelectableChannel的附件註冊到selector上:
ch.register(selector, interestOps, task);
實現EventExecutor的run方法,定義NIO事件和Executor任務的處理流程
在NioEventLoop的run方法中實現NIO事件和EventExecutor的任務處理邏輯,這個run方法在io.netty.util.concurrent.SingleThreadEventExecutor中定義。在上一章中,我們看到了DefaultEventExecutor中是如何實現這個run方法的,這裡我們將要看到這run方法的另一個實現。和SingleThreadEventExecutor中的run方法相比,NioEventLoop的run方法不僅要及時地執行taskQueue中的任務,還要能及時地處理NIO事件,因此它會同時檢查selector中的NIO事件和和taskQueue佇列,任何一箇中有事件需要處理或有任務需要執行,它不會阻塞執行緒。同時它也保證了在沒有NIO事件和任務的情況下執行緒不會無謂的空轉浪費CUP資源。
run主要實現如下,為了更清晰的說明它的主要功能,我對原來的程式碼進行了一些刪減。
for(;;){
try{
//phase1: 同時檢查NIO事件和任務
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); //在taskQueue中沒有任務的時候執行select
}
//phase2: 進入處理NIO事件,執行executor任務
try{
//處理NIO事件
processSelectedKeys();
}finally{
//處理taskQueu中的任務
runAllTasks();
}
}catch(Throwable t){
handleLoopException(t);
}
}
run方法有兩個階段構成:
phase1: 檢查NIO事件或executor任務,如果有任何的NIO事件或executor任務進入phase2。
這樣階段的主要工作在selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())和select中完成。
selectStrategy.calculateStrategy實現
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
這行程式碼的含義是: 如果hasTasks() == true, 呼叫以下selector#selectNow, 然後進入phase2。 否則呼叫select。這裡使用了strategy模式,預設的strategy實現是io.netty.channe.DefaultSelectStrategy implements SelectStrategy
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
DefaultSelectStrategy實現了SelectStrategy介面,這介面定義了兩個常量:
int SELECT = -1;
int CONTINUE = -2;
執行時selectSuppler引數傳入的是selectNowSupplier, 它的實現如下:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
這裡的get方法呼叫了selectNow, selectNow呼叫的是Selector#selectNew方法,這個方法的返回值是>=0。
hashTasks的傳入的引數是hasTask()的返回值: return !taskQueue.isEmpty();
程式碼讀到這裡就會發現,使用預設的的SelectStrategy實現,calculateStrategy在hasTasks()==true時返回值>=0, hasTasks() == false時返回值是SelectStrategy.SELECT,不會返回SelectStrategy.CONTINUE。
select實現
select的執行邏輯是:
1. 計算超select方法的結束時間selectDeadLineNanos
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
2. 進入迴圈,檢查超時--超時跳出迴圈。
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
3. 如果在select執行過程中有executor任務提交或可以當前的wakeUp由false變成true, 跳出迴圈
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
4. 呼叫selector#select等待NIO事件。
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
5. 如果滿足這些條件的任何一個,跳出迴圈: 有NIO事件、wakeUp的新舊值都是true、taskQueue中有任務、有定時任務到期。
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
6. 如果執行緒被中斷,跳出迴圈。
if (Thread.interrupted()) {
break;
}
7. 如果selector.select超時,沒有檢查到任何NIO事件, 會在下次迴圈開始時跳出迴圈。 如果每次超時,跳到第2步繼續下一次迴圈。
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
}
currentTimeNanos = time;
select 最遲會在當前時間>= selectDeadLineNanos時返回,這個時間是最近一個到期的定時任務執行的時間,換言之如果沒有任何的NIO事件或executor任務,select會在定時任務到期時返回。如果沒有定時任務,delayNanos(currentTimeNanos)返回的值是 TimeUnit.SECONDS.toNanos(1),即1秒。 select會在檢查到任何NIO事件或executor任務時返回,為了保證這點,在selector.select(timeoutMillis)前後都會呼叫hasTasks檢查executor任務,為了能在呼叫executet提交任務時喚醒selector.select,NioEventLoop覆蓋了SingleThreadEventExecutor的wake方法:
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
這個方法會及時的喚醒selector.select, 保證新提交的任務可以得到及時的執行。
phase2: 進入處理NIO事件,執行executor任務
這個階段是先呼叫processSelectedKeys()處理NIO事件,然後掉用 runAllTasks()處理所有已經到期的定時任務和已經在排隊的任務。這個階段還實現了NIO事件和executor任務的用時比例管理,這個特性稍後會詳細分析。