Netty原始碼分析第2章(NioEventLoop)---->第1節: NioEventLoopGroup之建立執行緒執行器
第二章: NioEventLoop
概述:
通過上一章的學習, 我們瞭解了Server啟動的大致流程, 有很多元件與模組並沒有細講, 從這個章開始, 我們開始詳細剖析netty的各個元件, 並結合啟動流程, 將這些元件的使用場景及流程進行一個詳細的說明
這一章主要學習NioEventLoop相關的知識, 何為NioEventLoop? NioEventLoop是netty的一個執行緒, 在上一節我們建立兩個NioEventLoopGroup:
EventLoopGroup bossGroup = newNioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
這裡建立了兩個group, 我們提過這是boss執行緒組和worker執行緒組, 其實這兩個執行緒組就相當於兩個NioEventLoop的集合, 預設每個NioEventLoopGroup建立時, 如果不傳入執行緒數, 會建立cpu核數*2個NioEventLoop執行緒, 其中boss執行緒通過輪詢處理Server的accept事件, 而完成accept事件之後, 就會建立客戶端channel, 通過一定的策略, 分發到worker執行緒進行處理, 而worker執行緒, 則主要用於處理客戶端的讀寫事件
除了輪詢事件, EventLoop執行緒還維護了兩個佇列, 一個是延遲任務佇列, 另一個是普通任務佇列, 在進行事件輪詢的同時, 如果佇列中有任務需要執行則會去執行佇列中的任務
一個NioEventLoop繫結一個selector用於處理多個客戶端channel, 但是一個客戶端channel只能被一個NioEventLoop處理, 具體關係如圖2-0-1所示:
2-0-1
圖中我們看到, 一個NioEventLoopGroup下有多個NioEventLoop執行緒, 而一個執行緒可以處理多個channel, 其中有個叫pipeline和handler的東西, 同學們可能比較陌生, 這是netty的事件傳輸機制, 每個pipeline和channel唯一繫結, 這裡只需要稍作了解, 之後章節會帶大家詳細剖析
瞭解了這些概念, 我們繼續以小節的形式對NioEventLoop進行剖析
第一節: NioEventLoopGroup之建立執行緒執行器
首先回到第一章最開始的demo, 我們最初建立了兩個執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
這裡, 我們跟隨建立EventLoopGroup的構造方法, 來繼續學習NioEventLoopGroup的建立過程
以workerGroup為例我們跟進其構造方法:
public NioEventLoopGroup() { this(0); }
繼續跟進this(0):
public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
這裡的nThreads就是剛傳入的0, 繼續跟進:
public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }
這裡nThreads仍然為0, executor為null, 這個execute是用於開啟NioEventLoop執行緒所需要的執行緒執行器, SelectorProvider.provider()是用於建立selector, 這個之後會講到, 我們一直跟到構造方法最後:
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory, final RejectedExecutionHandler rejectedExecutionHandler) { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); }
這裡呼叫了父類的構造方法, 跟進super, 進入了其父類MultithreadEventExecutorGroup的構造方法中:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args); }
這裡我們看到, 如果傳入的執行緒數量引數為0, 則會給一個預設值, 這個預設值就是兩倍的CPU核數, chooserFactory是用於建立執行緒選擇器, 之後會講到, 繼續跟程式碼之後, 我們就看到了建立NioEventLoop的真正邏輯, 在MultithreadEventExecutorGroup類的構造方法中:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //程式碼省略 if (executor == null) { //建立一個新的執行緒執行器(1) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //構造NioEventLoop(2) children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { //程式碼省略 } } //建立執行緒選擇器(3) chooser = chooserFactory.newChooser(children); //程式碼省略 }
這邊將程式碼主要分為三個步驟:
1.建立執行緒執行器
2.建立EventLoop
3.建立執行緒選擇器
這一小節我們主要剖析第1步, 建立執行緒執行器:
這裡有個new DefaultThreadFactory()建立一個DefaultThreadFactory物件, 這個物件作為引數傳入ThreadPerTaskExecutor的建構函式, DefaultThreadFactory顧名思義, 是一個執行緒工廠, 用於建立執行緒的, 簡單看下這個類的繼承關係:
public class DefaultThreadFactory implements ThreadFactory{//類體}
這裡繼承了jdk底層ThreadFactory類, 用於建立執行緒
我們繼續跟進該類的構造方法:
protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); }
其中getClass()就是當前類的class物件, 而當前類是NioEventLoopGroup, 繼續跟進DefaultThreadFactory的構造方法中:
public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); }
poolType是NioEventLoop的class物件, Thread.NORM_PRIORITY是設定預設的優先順序為5, 繼續跟進去:
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { this(toPoolName(poolType), daemon, priority); }
這裡的toPoolName(poolType)是將執行緒組命名, 這裡返回後結果是"nioEventLoopGroup"(開n頭小寫), daemon為false, priority為5, 繼續跟進去:
public DefaultThreadFactory(String poolName, boolean daemon, int priority) { this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); }
System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()這段程式碼是通過三目運算建立jdk底層的執行緒組, 繼續跟this():
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { //省略驗證程式碼 //執行緒名字字首 prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; //優先順序 this.priority = priority; //初始化執行緒組 this.threadGroup = threadGroup; }
這裡初始化了DefaultThreadFactory的屬性, prefix為poolName(也就是nioEventLoopGroup)+'-'+執行緒組id(原子自增)+'-'
以及初始化了優先順序和jdk底層的執行緒組等屬性
回到最初MultithreadEventExecutorGroup類的構造方法中, 我們看繼續看第一步:
//建立一個新的執行緒執行器(1) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
我們繼續跟進ThreadPerTaskExecutor的類中:
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { //起一個執行緒 threadFactory.newThread(command).start(); } }
我們發現這個類非常簡單, 繼承了jdk的Executor類, 從繼承關係中我就能猜想到, 而這個類就是用於開啟執行緒的執行緒執行器
構造方法傳入ThreadFactory型別的引數, 這個ThreadFactory就是我們剛才剖析的DefaultThreadFactory, 這個類繼承了ThreadFactory, 所以在構造方法中初始化了ThreadFactory型別的屬性
我們再看重寫的execute(Runnable command)方法, 傳入一個任務, 然後由threadFactory物件建立一個執行緒執行該任務
這個execute(Runnable command)方法, 其實就是用開開啟NioEventLoop執行緒用的, 那麼NioEventLoop什麼時候開啟的, 後面章節會進行剖析
這樣, 通過executor = new ThreadPerTaskExecutor(newDefaultThreadFactory())這種方式就返回了一個執行緒執行器Executor, 用於開啟NioEventLoop執行緒