Netty執行緒模型及EventLoop和EventLoopGroup原始碼解析
阿新 • • 發佈:2019-02-09
1、netty執行緒模型
一般在討論netty的執行緒模型的時候,我們會考慮到經典的Reactor執行緒模型,下面分別說明下經典的Reactor執行緒模型1、1 Reactor單執行緒模型
這個執行緒模型指的是所有的nio操作都是在一個執行緒中去完成的。nio執行緒的職責如下: 作為nio服務端,接收客戶端的tcp連線; 作為no客戶端,向服務端發起tcp連線; 讀取通訊對端的請求或是應答訊息; 向通訊對端傳送訊息或者應答訊息。 其單執行緒模型如下圖所示: 由於Reactor模式是非同步非阻塞i/o,所有的i/o操作都不會導致阻塞。理論上一個執行緒可以獨立處理所有的i/o相關的操作。例如,通過Acceptor接收客戶端的連線,當鏈路建立完成後通過Dispatch將對應的ByteBuffer派發到指定的handler上,進行訊息解碼。使用者執行緒訊息編碼後通過nio執行緒將訊息傳送給客戶端。1、2 Reactor多執行緒模型
它與單執行緒模型最大的區別就是多了一組nio執行緒池來處理io操作,其模型如下:1、3 Reactor主從多執行緒模型
Reactor主從多執行緒模型,用於監聽客戶端的連線不在是一個Nio執行緒了,它是一個nio執行緒池進行監聽客戶端的連線包括安全認證,當鏈路建立後就將網路讀取的操作放在另外一個執行緒去進行讀取。其模型如下:1、4 Netty執行緒模型
netty的執行緒模型不是一層不變的,它取決於使用者的啟動引數配置。通過設定不同的啟動引數,netty可以同時支援Reactor單執行緒模型、多執行緒模型、主從Reactor執行緒模型。 netty的執行緒模型如下: netty服務端啟動程式碼如下:- /配置服務端的nio執行緒組,
- //EventLoopGroup是一個執行緒組,它包含了一組nio執行緒,專門用於網路事件的處理,實際上他們就是Reactor執行緒組
- //這裡建立2個的原因是一個用於服務端接受客戶的連線,另一個用於SockentChannel的網路讀寫。
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
- //繫結埠,同步等待成功
- ChannelFuture f = b.bind(port).sync();
- //等待服務端監聽埠關閉;
- f.channel().closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
2、NioEventLoop設計原理
NioEventLoop的設計並不是一個單純的io讀寫,還兼顧處理了以下2類任務: 系統task:通過呼叫NioEventLoop的execute(Runnable task)方法實現,Netty有很多系統task,建立他們的主要原因是:當io執行緒和使用者執行緒同時操作網路資源的時候,為了防止併發操作導致的鎖競爭,將使用者執行緒的操作封裝成Task放入訊息佇列中,由i/o執行緒負責執行,這樣就實現了局部無鎖化。 定時任務:執行schedule()方法 其類圖的結構如下:NioEventLoopGroup是NioEventLoop的組合,用於管理NioEventLoop。 nioEventLoop需要處理網路的讀寫,因此它必須會有一個多路複用器物件。下面是Selector的定義: Selector selector; private SelectedSelectionKeySet selectedKeys; private final SelectorProvider provider; 它的初始化非常簡單直接是Selector.open方法進行初始化。 網路的讀取操作是在run方法中去執行的,首先看有沒有未執行的任務,有的話直接執行,否則就去輪訓看是否有就緒的Channel,如下: @Override protected void run() { for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } } 在進行輪訓的時候,可能為空,也沒有wakeup操作或是最新的訊息處理,則說明本次輪訓是一個空輪訓,此時會觸發jdk的epoll bug,它會導致Selector進行空輪訓,使i/o執行緒處於100%。為了避免這個bug。需要對selector進行統計: 1)對selector操作週期進行統計 2)每完成一次輪訓進行一次計數 3)當在某個週期內超過一定次數說明觸發了bug,此時需要進行重新建立Selector,並賦值新值,將原來的進行關閉。 呼叫rebuildSelector方法。 當輪訓到有就緒的Channel時,就進行網路的讀寫操作,其程式碼如下:
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
此時會去呼叫processSelectedKeysPlain方法,預設沒有開啟SelectedKey的優化方法。這裡執行的方法如下:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
這裡會去判斷selectedkey是否為空,如果不為空就去獲取selectedkey上的channel(為NioServerSocketChannel或是NioSocketChannel),獲取到channel後執行,判斷其型別,這裡netty的都是AbstractNioChannel類,執行的方法如下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
這裡會看選擇鍵是否可用,可用然後對位進行判斷,如果是讀或者是連線操作,則呼叫Unsafe的read方法。此處的Unsafe的實現是一個多型。(可能是呼叫NioServerSocketChannel或是NioSocketChannel的doReadBytes方法),對於服務端處理連線的請求如下:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
對於客戶端的呼叫如下:
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}
當網路位為寫的時候,則說明有半包訊息沒有傳送完成,需要繼續呼叫flush方法進行傳送。
後面的如果網路操作位為連線狀態,則需要對連線結果進行判斷。
處理完網路的io後,Eventloop要執行一些非io的系統task和定時任務,程式碼如下:
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
由於要同時執行io和非io的操作,為了充分使用cpu,會按一定的比例去進行執行,如果io的任務大於定時任務和task,則可以將io比例調大。反之調小,預設是50%,其執行方法如下:
protected boolean runAllTasks(long timeoutNanos) {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
這裡先去獲取task,如果task為空,則退出,然後迴圈去執行task,當執行的task為64的時候,這時候會去比較上次執行時間和延時的關係,如果大於延時那麼就退出,這裡獲取naotime是每64次進行獲取一次。這樣做一來是獲取naotime比較耗時,另外也不能長時間執行task,讓io阻塞,所以一般每64個任務就會返回。
最後eventloop的run方法,會判斷是否優雅關閉,如果是優雅關閉會執行closeAll方法,如下:
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
上述的程式碼解析是netty5的。