1. 程式人生 > >【Netty 專欄】Netty原始碼分析之NioEventLoop

【Netty 專欄】Netty原始碼分析之NioEventLoop

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

上一章節中,我們分析了Netty服務的啟動過程,本章節分析Netty的NioEventLoop是如工作的。

img

NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務:

I/O任務
即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法觸發。

非IO任務
新增到taskQueue中的任務,如register0、bind0等任務,由runAllTasks方法觸發。

兩種任務的執行時間比由變數ioRatio控制,預設為50,則表示允許非IO任務執行的時間與IO任務的執行時間相等。

NioEventLoop.run 方法實現

protected void run() {
   for (;;) {
       boolean oldWakenUp = wakenUp.getAndSet(false);
       try {
           if (hasTasks()) {
               selectNow();
           } else {
               select(oldWakenUp);
               if (wakenUp.get()) {
                   selector.wakeup();
               }
           }
           cancelledKeys = 0
;
           needsToSelectAgain = false;
           final int ioRatio = this.ioRatio;
           if (ioRatio == 100) {
               processSelectedKeys();
               runAllTasks();
           } else {
               final long ioStartTime = System.nanoTime();
               processSelectedKeys();
               final
long ioTime = System.nanoTime() - ioStartTime;
               runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
           }
           if (isShuttingDown()) {
               closeAll();
               if (confirmShutdown()) {
                   break;
               }
           }
       } catch (Throwable t) {
           logger.warn("Unexpected exception in the selector loop.", t);
           // Prevent possible consecutive immediate failures that lead to
           // excessive CPU consumption.
           try {
               Thread.sleep(1000);
           } catch (InterruptedException e) {
               // Ignore.
           }
       }
   }
}

hasTasks()方法判斷當前taskQueue是否有元素。
1、 如果taskQueue中有元素,執行 selectNow() 方法,最終執行selector.selectNow(),該方法會立即返回。

void selectNow() throws IOException {
   try {
       selector.selectNow();
   } finally {
       // restore wakup state if needed
       if (wakenUp.get()) {
           selector.wakeup();
       }
   }
}

2、 如果taskQueue沒有元素,執行 select(oldWakenUp) 方法,程式碼如下:

private void select(boolean oldWakenUp) throws IOException {
   Selector selector = this.selector;
   try {
       int selectCnt = 0;
       long currentTimeNanos = System.nanoTime();
       long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
       for (;;) {
           long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
           if (timeoutMillis <= 0) {
               if (selectCnt == 0) {
                   selector.selectNow();
                   selectCnt = 1;
               }
               break;
           }
           int selectedKeys = selector.select(timeoutMillis);
           selectCnt ++;
           if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
               // - Selected something,
               // - waken up by user, or
               // - the task queue has a pending task.
               // - a scheduled task is ready for processing
               break;
           }
           if (Thread.interrupted()) {
               // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
               // As this is most likely a bug in the handler of the user or it's client library we will
               // also log it.
               //
               // See https://github.com/netty/netty/issues/2426
               if (logger.isDebugEnabled()) {
                   logger.debug("Selector.select() returned prematurely because " +
                           "Thread.currentThread().interrupt() was called. Use " +
                           "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
               }
               selectCnt = 1;
               break;
           }
           long time = System.nanoTime();
           if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
               // timeoutMillis elapsed without anything selected.
               selectCnt = 1;
           } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                   selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
               // The selector returned prematurely many times in a row.
               // Rebuild the selector to work around the problem.
               logger.warn(
                       "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                       selectCnt);
               rebuildSelector();
               selector = this.selector;
               // Select again to populate selectedKeys.
               selector.selectNow();
               selectCnt = 1;
               break;
           }
           currentTimeNanos = time;
       }
       if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
           if (logger.isDebugEnabled()) {
               logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
           }
       }
   } catch (CancelledKeyException e) {
       if (logger.isDebugEnabled()) {
           logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
       }
       // Harmless exception - log anyway
   }
}

這個方法解決了Nio中臭名昭著的bug:selector的select方法導致cpu100%。
1、delayNanos(currentTimeNanos):計算延遲任務佇列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),預設返回1s。每個SingleThreadEventExecutor都持有一個延遲執行任務的優先佇列PriorityQueue,啟動執行緒時,往佇列中加入一個任務。

protected long delayNanos(long currentTimeNanos) {  
   ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();  
   if (delayedTask == null) {  
       return SCHEDULE_PURGE_INTERVAL;  
   }  
   return delayedTask.delayNanos(currentTimeNanos);  
}  
//ScheduledFutureTask  
public long delayNanos(long currentTimeNanos) {  
   return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));  
}  
public long deadlineNanos() {  
   return deadlineNanos;  
}  

2、如果延遲任務佇列中第一個任務的最晚還能延遲執行的時間小於500000納秒,且selectCnt == 0(selectCnt 用來記錄selector.select方法的執行次數和標識是否執行過selector.selectNow()),則執行selector.selectNow()方法並立即返回。
3、否則執行selector.select(timeoutMillis),這個方法已經在深入淺出NIO Socket分析過。
4、如果已經存在ready的selectionKey,或者selector被喚醒,或者taskQueue不為空,或則scheduledTaskQueue不為空,則退出迴圈。
5、如果 selectCnt 沒達到閾值SELECTOR_AUTO_REBUILD_THRESHOLD(預設512),則繼續進行for迴圈。其中 currentTimeNanos 在select操作之後會重新賦值當前時間,如果selector.select(timeoutMillis)行為真的阻塞了timeoutMillis,第二次的timeoutMillis肯定等於0,此時selectCnt 為1,所以會直接退出for迴圈。
6、如果觸發了epool cpu100%的bug,會發生什麼?
selector.select(timeoutMillis)操作會立即返回,不會阻塞timeoutMillis,導致 currentTimeNanos 幾乎不變,這種情況下,會反覆執行selector.select(timeoutMillis),變數selectCnt 會逐漸變大,當selectCnt 達到閾值,則執行rebuildSelector方法,進行selector重建,解決cpu佔用100%的bug。

public void rebuildSelector() {  
       if (!inEventLoop()) {  
           execute(new Runnable() {  
               @Override  
               public void run() {  
                   rebuildSelector();  
               }  
           });  
           return;  
       }  
       final Selector oldSelector = selector;  
       final Selector newSelector;  
       if (oldSelector == null) {  
           return;  
       }  
       try {  
           newSelector = openSelector();  
       } catch (Exception e) {  
           logger.warn("Failed to create a new Selector.", e);  
           return;  
       }  
       // Register all channels to the new Selector.  
       int nChannels = 0;  
       for (;;) {  
           try {  
               for (SelectionKey key: oldSelector.keys()) {  
                   Object a = key.attachment();  
                   try {  
                       if (key.channel().keyFor(newSelector) != null) {  
                           continue;  
                       }  
                       int interestOps = key.interestOps();  
                       key.cancel();  
                       key.channel().register(newSelector, interestOps, a);  
                       nChannels ++;  
                   } catch (Exception e) {  
                       logger.warn("Failed to re-register a Channel to the new Selector.", e);  
                       if (a instanceof AbstractNioChannel) {  
                           AbstractNioChannel ch = (AbstractNioChannel) a;  
                           ch.unsafe().close(ch.unsafe().voidPromise());  
                       } else {  
                           @SuppressWarnings("unchecked")  
                           NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;  
                           invokeChannelUnregistered(task, key, e);  
                       }  
                   }  
               }  
           } catch (ConcurrentModificationException e) {  
               // Probably due to concurrent modification of the key set.  
               continue;  
           }  
           break;  
       }    
       selector = newSelector;  
       try {  
           // time to close the old selector as everything else is registered to the new one  
           oldSelector.close();  
       } catch (Throwable t) {  
           if (logger.isWarnEnabled()) {  
               logger.warn("Failed to close the old Selector.", t);  
           }  
       }    
       logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");  
   }  

rebuildSelector過程:
1、通過方法openSelector建立一個新的selector。
2、將old selector的selectionKey執行cancel。
3、將old selector的channel重新註冊到新的selector中。

對selector進行rebuild後,需要重新執行方法selectNow,檢查是否有已ready的selectionKey。

方法selectNow()或select(oldWakenUp)返回後,執行方法processSelectedKeys和runAllTasks。
1、processSelectedKeys 用來處理有事件發生的selectkey,這裡對優化過的方法processSelectedKeysOptimized進行分析:

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
   for (int i = 0;; i ++) {
       final SelectionKey k = selectedKeys[i];
       if (k == null) {
           break;
       }
       // null out entry in the array to allow to have it GC'ed once the Channel close
       // See https://github.com/netty/netty/issues/2363
       selectedKeys[i] = null;
       final Object a = k.attachment();
       if (a instanceof AbstractNioChannel) {
           processSelectedKey(k, (AbstractNioChannel) a);
       } else {
           @SuppressWarnings("unchecked")
           NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
           processSelectedKey(k, task);
       }
       if (needsToSelectAgain) {
           // null out entries in the array to allow to have it GC'ed once the Channel close
           // See https://github.com/netty/netty/issues/2363
           for (;;) {
               i++;
               if (selectedKeys[i] == null) {
                   break;
               }
               selectedKeys[i] = null;
           }
           selectAgain();
           // Need to flip the optimized selectedKeys to get the right reference to the array
           // and reset the index to -1 which will then set to 0 on the for loop
           // to start over again.
           //
           // See https://github.com/netty/netty/issues/1523
           selectedKeys = this.selectedKeys.flip();
           i = -1;
       }
   }
}

在優化過的方法中,有事件發生的selectkey存放在陣列selectedKeys中,通過遍歷selectedKeys,處理每一個selectkey,具體處理過程,會在後續進行分析。

2、runAllTasks 處理非I/O任務。
如果 ioRatio 不為100時,方法runAllTasks的執行時間只能為ioTime * (100 - ioRatio) / ioRatio,其中ioTime 是方法processSelectedKeys的執行時間。

protected boolean runAllTasks(long timeoutNanos) {
   fetchFromScheduledTaskQueue();
   Runnable task = pollTask();
   if (task == null) {
       return false;
   }
   final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
   long runTasks = 0;
   long lastExecutionTime;
   for (;;) {
       try {
           task.run();
       } catch (Throwable t) {
           logger.warn("A task raised an exception.", t);
       }
       runTasks ++;
       // Check timeout every 64 tasks because nanoTime() is relatively expensive.
       // XXX: Hard-coded value - will make it configurable if it is really a problem.
       if ((runTasks & 0x3F) == 0) {
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           if (lastExecutionTime >= deadline) {
               

相關推薦

Netty 專欄Netty原始碼分析NioEventLoop

點選上方“芋道原始碼”,選擇“置頂公眾號”技術文章第一時間送達!原始碼精品專欄 上一章節中,我們

Netty 專欄Netty原始碼分析ChannelPipeline

點選上方“芋道原始碼”,選擇“置頂公眾號”技術文章第一時間送達!原始碼精品專欄 本章節分析Net

kubernetes/k8s原始碼分析kubelet原始碼分析cdvisor原始碼分析

  資料流 UnsecuredDependencies -> run   1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv

kubernetes/k8s原始碼分析kubelet原始碼分析容器網路初始化原始碼分析

一. 網路基礎   1.1 網路名稱空間的操作 建立網路名稱空間: ip netns add 名稱空間內執行命令: ip netns exec 進入名稱空間: ip netns exec bash   1.2 bridge-nf-c

kubernetes/k8s原始碼分析kubelet原始碼分析資源上報

0. 資料流   路徑: pkg/kubelet/kubelet.go   Run函式() ->   syncNodeStatus ()  ->   registerWithAPIServer() ->

kubernetes/k8s原始碼分析kubelet原始碼分析啟動容器

主要是呼叫runtime,這裡預設為docker 0. 資料流 NewMainKubelet(cmd/kubelet/app/server.go) -> NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime

NLPjieba原始碼分析關鍵字提取(TF-IDF/TextRank)

【一】綜述 利用jieba進行關鍵字提取時,有兩種介面。一個基於TF-IDF演算法,一個基於TextRank演算法。TF-IDF演算法,完全基於詞頻統計來計算詞的權重,然後排序,在返回TopK個詞作為關鍵字。TextRank相對於TF-IDF,基本思路一致,也是基於統計的思想,只不過其計算詞的權

NLPjieba原始碼分析分詞

【一】詞典載入 利用jieba進行分詞時,jieba會自動載入詞典,這裡jieba使用python中的字典資料結構進行字典資料的儲存,其中key為word,value為frequency即詞頻。 1. jieba中的詞典如下: jieba/dict.txt X光 3 n X光線 3

NLPjieba原始碼分析詞性標註

【一】詞性標註 詞性標註分為2部分,首先是分詞,然後基於分詞結果做詞性標註。 【二】jieba的詞性標註程式碼流程詳解 1. 程式碼位置 jieba/posseg/_init_.py 2. 流程分析 def cut(sentence, HMM=True): """

我的區塊鏈- golang原始碼分析select的實現

最近本人再找工作,恩,雖然本人使用go有2年左右了,但是其實還只是停留在語言使用的技巧位面,語言的很多底層實現機制還不是很清楚的,所以面試被問到很多底層,就很懵逼。這篇文章主要是自己對go學習的筆記。(本人還是一隻菜雞,各位海涵) 文章參考: 那麼se

nettynetty原始碼分析NioEventLoop和NioEventLoopGroup

NioEventLoop 繼承Executor介面. NioEventLoop持有Selector selector;  通過openSelector() 獲取Selector .   @Override protected void run() {

我的區塊鏈- golang原始碼分析協程排程器底層實現( G、M、P)

本人的原始碼是基於go 1.9.7 版本的哦! 緊接著之前寫的 【我的區塊鏈之路】- golang原始碼分析之select的底層實現 和 【我的區塊鏈之路】- golang原始碼分析之channel的底層實現 我們這一次需要對go的排程器做一番剖析。

我的區塊鏈- golang原始碼分析channel的底層實現

【轉載請標明出處】https://blog.csdn.net/qq_25870633/article/details/83388952 接上篇文章 【我的區塊鏈之路】- golang原始碼分析之select的底層實現 我這裡因為面試的時候也有被問到過 channel的底層實現

我的區塊鏈- golang原始碼分析select的底層實現

【轉載請標明出處】https://blog.csdn.net/qq_25870633/article/details/83339538 最近本人再找工作,恩,雖然本人使用go有2年左右了,但是其實還只是停留在語言使用的技巧位面,語言的很多底層實現機制還不是很清楚的,所以面試被問到很多底層,就很懵

專欄 - Netty3 原始碼分析

Netty3 原始碼分析 Netty是一個事件驅動的非同步網路框架,利用它可以很容易寫出具有可擴充套件性的客戶服務端程式,層次分明,只需要專注於業務邏輯的處理。通過閱讀Jboss Netty3的原始碼,可以深入理解這個架構的設計精

JVMJVM原始碼分析Metaspace解密

概述 metaspace,顧名思義,元資料空間,專門用來存元資料的,它是jdk8裡特有的資料結構用來替代perm,這塊空間很有自己的特點,前段時間公司這塊的問題太多了,主要是因為升級了中介軟體所致,看到大家討論來討論去,看得出很多人對metaspace還是模稜

netty(十)原始碼分析ByteBuf

通過對netty的API的學習,可以更加遊刃有餘的使用netty的相關類庫 對原始碼的學習不僅能夠從原始碼層面掌握netty框架,方便日後的維護,拓展和定製而且可以起到觸類旁通的作用,拓展讀者的知識面,提升程式設計技能。當我們進行資料傳輸的時候,往往需要使用到緩衝區,常用的

Netty原始碼分析NioEventLoop(三)—NioEventLoop的執行

前面兩篇文章Netty原始碼分析之NioEventLoop(一)—NioEventLoop的建立與Netty原始碼分析之NioEventLoop(二)—NioEventLoop的啟動中我們對NioEventLoop的建立與啟動做了具體的分析,本篇文章中我們會對NioEventLoop的

逆向工程原始碼分析網站反爬蟲措施

       從事網頁爬蟲工作有兩年了,從最開始的新聞,bbs論壇,論文網站,到現在的全國企業信用資訊公示系統,無論是PC網頁,到手機移動APP,還是現在的支付寶微信小程式一直採集別人家網站上的資料,也算得上也是身經百戰。如今,領導安排我注意收集整理歸納一下反

jdk1.8String原始碼分析

String 類的宣告 public final class String implements java.io.Serializable, Comparable<String>, CharSequence 首先可以看到String