Netty框架學習之路(五)—— EventLoop及事件迴圈機制
在前面的博文中,我們大致分析瞭解了Channel及其相關概念。在Netty的執行緒模型中,每個channel都有唯一的一個eventLoop與之相繫結,那麼在這篇博文中我們來看一下EvenLoop及其相關概念。
在傳統的Java NIO程式設計中,我們經常使用到如下程式碼:
public static void main(String[] args) {
try {
//建立選擇器
Selector selector = Selector.open();
//開啟通道
ServerSocketChannel channel = ServerSocketChannel.open();
//開啟非阻塞模式
channel.configureBlocking(false);
//服務端socket監聽指定埠
channel.socket().bind(new InetSocketAddress(port), 1024);
// 將 channel 註冊到 selector 中,
// 通常我們都是先註冊一個 OP_ACCEPT 事件,
// 然後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中。
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
// 通過呼叫 select 方法, 阻塞地等待 channel I/O 可操作
selector.select(500);
// 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經就緒.
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()){
SelectionKey key = it.next();
// 當獲取一個 SelectionKey 後, 就要將它刪除, 表示我們已經對這個 IO 事件進行了處理。
it.remove();
try {
if(key.isAcceptable()) {
//處理新的請求 三次握手 建立連線
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中.
sc.register(selector, SelectionKey.OP_READ);
}
………………
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}
}catch (IOException e){
e.printStackTrace();
}
}
上述操作中的第一步通過Selector.open() 開啟一個 Selector,我們以NioServerSocketChannel為例,當建立NioServerSocketChannel時,Netty通過反射呼叫NioServerSocketChannel的無引數構造方法(具體過程後面專門介紹):
channel = this.channelFactory.newChannel();
NioSocketChannel的無引數構造方法如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static NioServerSocketChannel newSocket(SelectorProvider provider) {
try {
//呼叫 SelectorProvider.openSocketChannel() 來開啟一個新的 Java NIO SocketChannel:
return provider.openSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a socket.", var2);
}
}
第二步 將 Channel 註冊到 Selector 中, 並設定需要監聽的事件。在channel的註冊過程中(具體過程後面專門介紹),會呼叫AbstractUnsafe.register0方法:
private void register0(ChannelPromise promise) {
……
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
}
register0 又呼叫了 AbstractNioChannel.doRegister方法:
protected void doRegister() throws Exception {
// 省略錯誤處理
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
此處的引數0說明僅僅將 Channel 註冊到 Selector 中, 但是不設定interest set。那到底在哪裡設定的呢?其實在NioServerSocketChannel的構造方法中:
public NioServerSocketChannel(ServerSocketChannel channel) {
//表示關注OP_ACCEPT事件
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
第一、二步都完成了,那麼第三步迴圈部分在哪呢?事實上 NioEventLoop 本身就是一個 SingleThreadEventExecutor,因此 NioEventLoop 的啟動 其實就是 NioEventLoop 所繫結的本地 Java 執行緒的啟動。在SingleThreadEventExecutor.doStartThread方法中建立執行緒並呼叫SingleThreadEventExecutor.this.run()方法,而run方法為抽象方法,具體實現在NioEventLoop的run方法中。
protected void run() {
for (;;) {
try {
//通過hasTasks方法判斷當前taskQueue是否為空
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
}
……
}
}
}
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
此處for(;;) 所構成的死迴圈構成了NioEventLoop事件迴圈的核心。這裡有兩個方法需要注意,selector.selectNow()會檢查當前是否有就緒的 IO 事件,如果有,則返回就緒 IO 事件的個數,如果沒有,則返回0。selectNow() 是立即返回的,不會阻塞當前執行緒;selector.select(timeoutMillis)會阻塞住當前執行緒的,timeoutMillis 是阻塞的超時時間。
程式碼中有個名為ioRatio的屬性,它表示的是此執行緒分配給 IO 操作所佔的時間比(即執行 processSelectedKeys 耗時在整個迴圈中所佔用的時間)。計算公式:
設 IO 操作耗時為 ioTime, ioTime 佔的時間比例為 ioRatio, 則:
ioTime / ioRatio = taskTime / taskRatio
taskRatio = 100 - ioRatio
=> taskTime = ioTime * (100 - ioRatio) / ioRatio
再來看IO處理過程,即processSelectedKeys方法,
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
這個方法中會根據 selectedKeys 欄位是否為空,而分別呼叫 processSelectedKeysOptimized 或 processSelectedKeysPlain。 其實兩者沒有太大的區別,此處以 processSelectedKeysOptimized 為例分析一下工作流程。
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
程式碼中k.attachment()返回值是什麼呢?其實我們可以猜測一下應該是附著在SelectionKey的事物,聯想到在selector上註冊channel時候指定了SelectionKey,可以想到返回值其實就是channel自身。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
……
try {
int readyOps = k.readyOps();
//OP_CONNECT事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//OP_WRITE事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//OP_READ事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
OP_WRITE 可寫事件比較簡單,沒有詳細分析的必要了。這裡寫程式碼片
OP_READ事件處理過程有點長,具體可以看一下read方法:
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
……
}
}
歸納一下大概做了三件事情:分配 ByteBuf;從 SocketChannel 中讀取資料;呼叫 pipeline.fireChannelRead 傳送一個 inbound 事件。如果瞭解過channel相關內容,產生inbound事件之後便是channelPipeline的事了,具體如何處理請翻閱之前的博文。
OP_CONNECT 事件處理過程:將 OP_CONNECT 從就緒事件集中清除;呼叫 unsafe.finishConnect() 通知上層連線已建立。
unsafe.finishConnect方法最後會呼叫到 pipeline().fireChannelActive(),產生一個 inbound 事件,通知 pipeline 中的各個 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法會被呼叫)。
到了這裡, 我們整個 NioEventLoop 的 IO 操作部分已經瞭解完了