1. 程式人生 > >netty原始碼解解析(4.0)-5 執行緒模型-EventExecutorGroup框架

netty原始碼解解析(4.0)-5 執行緒模型-EventExecutorGroup框架

上一章講了EventExecutorGroup的整體結構和原理,這一章我們來探究一下它的具體實現。 EventExecutorGroup和EventExecutor介面 io.netty.util.concurrent.EventExecutorGroup java.util.concurrent.ScheduledExecutorService EventExecutorGroup繼承了ScheduledExecutorService介面,它自己定義瞭如下的新方法
方法 說明
EventExecutor next() 取出一個EventExecutor, 這個方法要實現派發任務的策略。
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); 優雅地關閉這個executor, 一旦這個方法被呼叫,isShuttingDown()方法總是總是返回true。和 shutdown方法不同,這個方法需要確保在關閉的平靜期(由quietPeriod引數決定)沒有新的任務被提交,如果平靜期有新任務提交,它會接受這個任務,同時中止關閉動作,等任務執行完畢後從新開始關閉流程。
Future<?> shutdownGracefully() shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)快捷呼叫方式。
boolean isShuttingDown() 檢查是否已經呼叫了shutdownGracefully或shutdown方法。
io.netty.util.concurrent.EventExecutor implement EventExecutorGroup EventExecutor定義的介面如下
方法 說明
boolean inEventLoop() 如果當前執行緒是這個Executor返回true
boolean inEventLoop(Thread thread) 如果thread是這個Executor的執行緒返回true
EventExecutorGroup parent() 返回持有這個Executor的EventExecutorGroup
<V> Promise<V> newPromise() 建立一個新的Promise例項
<V> ProgressivePromise<V> newProgressivePromise() 建立一個新的ProgressivePromise例項
<V> Future<V> newSucceededFuture(V result); 建立一個標記為success的Future例項,Future#isSuccess()返回true
<V> Future<V> newFailedFuture(Throwable cause) 建立一個標記為failed的Future例項,Future#isSuccess()返回false
抽象實現AbstractEventExecutorGroup和AbstractEventExecutor io.netty.util.concurrent.AbstractEventExecutorGroup implement EventExecutorGroup AbstractEventExecutorGroup實現了EventExecutorGroup介面,它實現方法的形式為: XXX(){ next().XXX() } 如:execute方法的實現為 public void execute(Runnable command) { next().execute(command); } 這裡實現了EventExecutorGroup派發任務的方式,使用next方法取出一EventExecutor, 然後把任務提交給這個executor。其他提交認任務的方法實submit, schedule, scheduleAtFixedRate, scheduleWithFixedDelay, invokeAll, invokeAny都和這個類似。 io.netty.util.concurrent.AbstractEventExecutor extends AbstractExecutorService implements EventExecutor 形如newXXX的方法,直接new一個JDK提供的型別的例項返回, 如: public <V> Promise<V> newPromise() { return new DefaultPromise<V>(this); } sumbit方法是呼叫AbstractExecutorService的實現。 不支援schedule, scheduleAtFixedRate, scheduleWithFixedDelay方法,這幾個方法都會丟擲UnsupportedOperationException異常。 多執行緒實現MultithreadEventExecutorGroup和SingleThreadEventExecutor io.netty.util.concurrent.MultithreadEventExecutorGroup extends AbstractEventExecutorGroup MultithreadEventExecutorGroup 主要實現了一下兩個方面的功能:
  1. EventExecutor管理: 建立, 結束SingleThreadEventExecutor,EventExecutor的資料是固定的,由傳入的引數決定。
  2. 任務派發策略: 實現了EventExecutor選擇器,next方法使選擇器選中一個Executor。
這個類的核心功能都在它的構造方法中實現, 構造方法有三個引數: protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args). nThreads: 執行緒數,即SingleThreadEventExecutor的數量, threadFactory: 執行緒工程,傳遞給SingleThreadEventExecutor例項,SingleThreadEventExecutor使用它建立一個工作執行緒。 args: 傳遞給SingleThreadEventExecutor工作執行緒的引數。 構造方法主要乾了兩件事: 1. 建立SingleThreadEventExecutor children = new SingleThreadEventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(threadFactory, args); } 它把建立的SingleThreadEventExecutor例項放在children屬性中維護。 newChild是個抽象方法,需要子類實現。 2. 建立選擇器 if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } MultithreadEventExecutorGroup內部實現兩種型別的選擇器,PowerOfTwoEventExecutorChooser--chooserA, GenericEventExecutorChooser--chooserB, 當執行緒數是2^n時使用chooserA, 否則使用chooserB。選擇器的實現使用了一點小技巧,從本質上講,這兩種選擇器都是使用取模輪詢的方式選擇下一個executor, 不同的是當執行緒數(children的長度)為2^n時可以把取模運算優化成位運算,效能比位運算要好一些。下面是兩個選擇器的演算法: chooserA: children[childIndex.getAndIncrement() & children.length - 1], 當children.length == 2^n時,它等價於 children[Math.abs(childIndex.getAndIncrement() % children.length) chooserB: children[Math.abs(childIndex.getAndIncrement() % children.length) 這裡我們可以得出結論, nThreads儘量設定成2^n(2, 4, 8, 16, 32 ....), 這樣效能會好一些。 io.netty.util.concurrent.SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor 派生關係 SingleThreadEventExecutor AbstractScheduledEventExecutor AbstractEventExecutor SingleThreadEventExecutor實現了一個單執行緒的Executor, 它使用外部傳進來的ThreadFactory例項建立一個唯一的執行緒,executor方法把任務放進taskQueue中,執行緒消費taskQueue中排隊的任務。這個executor不僅要執行由executor提交的任務,還要執行由schedule方法提交定時任務和由invokeAll, invokeAny提交的批量任務。 除了任務呢排隊,這類還實現了一個重要的功能--gracefulShutdown, 優雅地關閉。 下面來詳細分析這些功能的實現。 狀態: ST_NOT_STARTED = 1: 初始狀態,SingleThreadEventExecutor 例項被建立時處於這個狀態,這個時候只是建立了一個執行緒,這個執行緒還沒有執行。 ST_STARTED = 2: 執行狀態,ST_NOT_STARTED時,提交的第一個任務會把它變成這個狀態,執行緒已經開發執行。 ST_SHUTTING_DOWN = 3: 正在執行關閉操作。執行緒主迴圈run方法返回或丟擲異常,或呼叫shutdownGracefully 都會變成這個狀態。 ST_SHUTDOWN = 4: 已經關閉。呼叫shutdown會變成這個狀態。 ST_TERMINATED = 5: 已經結束。這個是最終狀態,ST_SHUTTING_DOWN和ST_SHUTDOWN 狀態的過程執行完畢後會變成這個狀態。 狀態判定方法 是否處於SHUTTING_DOWN狀態 public boolean isShuttingDown() {return state >= ST_SHUTTING_DOWN;} 是否處於SHUTDOWN狀態 public boolean isShutDown() {return state >= ST_SHUT_DOWN;} 實時任務排隊: public方法execute, 是提供給使用者提交實時任務的方法,它的呼叫棧如下: execute addTask offerTask taskQueue.offer execute最終會呼叫taskQueue的offer方法把任務放到佇列中排隊,在此之前,如果檢測到處於SHUTDOWN狀態,就拒絕這個任務,或offer失敗也會拒絕任務。 定時任務排隊: 使用者呼叫schedule把定時任務到scheduledTaskQueue佇列中,這個佇列是PriorityQueue型別的例項,他是一個優先順序佇列。線上程的主迴圈run中,會呼叫takeTask,taskTask會優先呼叫peekScheduledTask,看一看scheduledTaskQueue有沒有定時任務,如果有就嘗試把所有已經到時間的定時任務放到taskQueue中排隊。 批量任務排隊: 批量任務排隊比較簡單,只是簡單地對invokeAll或invokeAny的tasks引數中的所有任務呼叫一次execute。 取出任務: takeTask的主要功能是從taskQueue中取出任務,同時它還確保到期的定時任務能夠及時地進入taskQueue中排隊。這是一個比較重要的方法,我們來詳細分析它的實現: BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue; for (;;) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 先看看優先順序佇列中是否存在定時任務 if (scheduledTask == null) { // 如果沒有定時任務,直接從taskQueue中取出一個任務返回 Runnable task = null; try { task = taskQueue.take(); if (task == WAKEUP_TASK) { task = null; } } catch (InterruptedException e) { // Ignore } return task; } else { // 執行到這裡表示有定時任務 long delayNanos = scheduledTask.delayNanos(); Runnable task = null; if (delayNanos > 0) { // 沒有到期的定時任務, try { task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { return null; } } if (task == null) { // 有到期的定時任務,把所有優先順序佇列中到期的定時任務放入taskQueue中排隊 fetchFromScheduledTaskQueue(); task = taskQueue.poll(); } if (task != null) { // 在有定時任務但taskQueue為空的時候,for迴圈會一直空轉,直到有定時任務到期才會跳出 return task; } } } 優雅地關閉: 優雅地關閉是這個類的重要的功能,所謂優雅是指在正在關閉之前要確保已經在taskQueue中排隊的任務都能被執行,在關閉過程中,如果使用者提交了一個任務,是否提交成功要有明確的反饋,如果一個任務被成功提交,就要確保他最終一定會被執行。 執行緒的主迴圈run方法返回的時候,就會觸發優雅關閉的過程。run方法返回肯由多種原因引起:使用者主動呼叫了shutdown或shutdownGracefully,run方法丟擲異常。執行優雅關閉的過程在confirmShutdown方法中實現,執行這個過程的前提是: 確保當前處於SHUTTINGDOWN狀態即狀態值>=ST_SHUTTING_DOWN if (!isShuttingDown()) { return false; } 確保這個方法在eventLoop執行緒中執行 if (!inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); } 然後才是優雅關閉的過程: 清除掉定時任務 cancelScheduledTasks(); 如果是第一次嘗試關閉,設定gracefulShutdownStartTime我當前時間 if (gracefulShutdownStartTime == 0) { gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } 把已在佇列中排隊的任務都執行掉。 if (runAllTasks() || runShutdownHooks()) { 檢查當前狀態,如果是關閉狀態:>= ST_SHUTDOWN,已經關閉完成。 if (isShutdown()) { return true; } 如果gracefulShutdownQuietPeriod==0表示, 關閉過程沒有安靜期,現在可以立即結束。 if (gracefulShutdownQuietPeriod == 0) { return true; } 執行到這裡,表示關閉過程還沒結束,如果當前狀態是SHUTTINGDOWN向taskQueue中新增一個WAKEUP_TASK, 喚醒在taskQueue阻塞的執行緒。 wakeup(true); return false;

相關推薦

netty原始碼解析(4.0)-5 執行模型-EventExecutorGroup框架

上一章講了EventExecutorGroup的整體結構和原理,這一章我們來探究一下它的具體實現。 EventExecutorGroup和EventExecutor介面 io.netty.util.concurrent.EventExecutorGroup j

netty原始碼解析(4.0)-6 執行模型-IO執行EventLoopGroup和NIO實現(一)

介面定義 io.netty.channel.EventLoopGroup extends EventExecutorGroup 方法 說明

netty原始碼解析(4.0)-7 執行模型-IO執行EventLoopGroup和NIO實現(二)

把NIO事件轉換成對channel unsafe的呼叫或NioTask的呼叫 processSelectedKeys()方法是處理NIO事件的入口: private void processSelectedKeys() { if (selectedKeys != null) {

netty原始碼解析(4.0)-18 ChannelHandler: codec--編解碼框架

  編解碼框架和一些常用的實現位於io.netty.handler.codec包中。   編解碼框架包含兩部分:Byte流和特定型別資料之間的編解碼,也叫序列化和反序列化。不型別資料之間的轉換。   下圖是編解碼框架的類繼承體系:   其中MessageToByteEncoder和ByteToMessage

netty原始碼解析(4.0)-1 核心架構

netty是java開源社群的一個優秀的網路框架。使用netty,我們可以迅速地開發出穩定,高效能,安全的,擴充套件性良好的伺服器應用程式。netty封裝簡化了在伺服器開發領域的一些有挑戰性的問題:jdk nio的使用;多執行緒併發;擴充套件性。它還提供了多種應用層協議的支援:http/https/web

netty原始碼解析(4.0)-3 Channel的抽象實現

AbstractChannel和AbstractUnsafe抽象類 io.netty.channel.AbstractChannel 從本章開始,會有大量的篇幅涉及到程式碼分析。為了能夠清晰簡潔的地說明程式碼的結構和功能,我會用程式碼註釋+獨立段落的方式加以呈現。 所以,為你能更好地理解程

netty原始碼解析(4.0)-8 ChannelPipeline的設計

io.netty.channel.ChannelPipeline   設計原理      上圖中,為了更直觀地展示事件處理順序, 故意有規律地放置兩種handler的順序,實際上ChannelInboundHandler和ChanneOutbo

netty原始碼解析(4.0)-10 ChannelPipleline的預設實現--事件傳遞及處理 netty原始碼解析(4.0)-2 Chanel的介面設計 netty原始碼解析(4.0)-8 ChannelPipeline的設計

  事件觸發、傳遞、處理是DefaultChannelPipleline實現的另一個核心能力。在前面在章節中粗略地講過了事件的處理流程,本章將會詳細地分析其中的所有關鍵細節。這些關鍵點包括: 事件觸發介面和對應的ChannelHandler處理方法。 inbound事件的傳遞。  outbou

netty原始碼解析(4.0)-17 ChannelHandler: IdleStateHandler實現

   io.netty.handler.timeout.IdleStateHandler功能是監測Channel上read, write或者這兩者的空閒狀態。當Channel超過了指定的空閒時間時,這個Handler會觸發一個IdleStateEvent事件。   在第一次檢測到Channel變成a

netty原始碼解析(4.0)-19 ChannelHandler: codec--常用編解碼實現

  資料包編解碼過程中主要的工作就是:在編碼過程中進行序列化,在解碼過程中從Byte流中分離出資料包然後反序列化。在MessageToByteEncoder中,已經解決了序列化之後的問題,ByteToMessageDecoder中已經部分第解決了從Byte流中分離出資料包的問題。實現具體的資料包編解碼,只需要

netty原始碼解析(4.0)-20 ChannelHandler: 自己實現一個自定義協議的伺服器和客戶端

  本章不會直接分析Netty原始碼,而是通過使用Netty的能力實現一個自定義協議的伺服器和客戶端。通過這樣的實踐,可以更深刻地理解Netty的相關程式碼,同時可以瞭解,在設計實現自定義協議的過程中需要解決的一些關鍵問題。   本週章涉及到的程式碼可以從github上下載: https://git

netty原始碼解析(4.0)-22 ByteBuf的I/O

    ByteBuf的I/O主要解決的問題有兩個: 管理readerIndex和writerIndex。這個在在AbstractByteBuf中解決。 從記憶體中讀寫資料。ByteBuf的不同實現主要使用兩種記憶體:堆記憶體表示為byte[];直接內,可能是Dire

netty原始碼解析(4.0)-23 ByteBuf記憶體管理:分配和釋放

  ByteBuf記憶體分配和釋放由具體實現負責,抽象型別只定義的記憶體分配和釋放的時機。     記憶體分配分兩個階段: 第一階段,初始化時分配記憶體。第二階段: 記憶體不夠用時分配新的記憶體。ByteBuf抽象層沒有定義第一階段的行為,但定義了第二階段的方法:   public abstract

netty原始碼解析(4.0)-24 ByteBuf基於記憶體池的記憶體管理

 io.netty.buffer.PooledByteBuf<T>使用記憶體池中的一塊記憶體作為自己的資料記憶體,這個塊記憶體是PoolChunk<T>的一部分。PooledByteBuf<T>是一個抽象型別,它有4個派生類: PooledHeapByteBuf,&nbs

netty原始碼解析(4.0)-25 ByteBuf記憶體池:PoolArena-PoolChunk

  PoolArena實現了用於高效分配和釋放記憶體,並儘可能減少記憶體碎片的記憶體池,這個記憶體管理實現使用PageRun/PoolSubpage演算法。分析程式碼之前,先熟悉一些重要的概念: page: 頁,一個頁是可分配的最小的記憶體塊單元,頁的大小:pageSize = 1 << n (

netty源碼解析(4.0)-1 核心架構

coder style out ava 默認 mage 網絡框架 分享圖片 輸入 netty是java開源社區的一個優秀的網絡框架。使用netty,我們可以迅速地開發出穩定,高性能,安全的,擴展性良好的服務器應用程序。netty封裝簡化了在服務器開發領域的一些有挑戰性的問題

netty源碼解析(4.0)-8 ChannelPipeline的設計

rst 分配 active ace 時間 rev remove 設置 bject io.netty.channel.ChannelPipeline 設計原理      上圖中,為了更直觀地展示事件處理順序, 故意有規律地放置兩種handler的順序,實際上Chann

netty源碼解析(4.0)-14 Channel NIO實現:讀取數據

isa index 消息 soc 發現 接收數據 boolean 是的 cte    本章分析Nio Channel的數據讀取功能的實現。   Channel讀取數據需要Channel和ChannelHandler配合使用,netty設計數據讀取功能包括三個要素:Chan

EventBus原始碼分析(四):執行模型分析(2.4版本)

EventBus有四種執行緒模型 PostThread模式不需執行緒切換,直接在釋出者執行緒進行事件處理。 MainThread模式分類討論:釋出者執行緒是主執行緒則直接呼叫事件處理方法,否則通過Handler進行執行緒切換,切換到主執行緒處理事件,該模

android 4.0執行訪問網路異常

    android出現如下異常資訊,是因為4.0以上不允許在主執行緒中訪問網路。     1.可以再Activity的onCreate()方法中加入這樣一段程式碼,如下: if (Build.VERSION.SDK_INT >= 11) {StrictMode.s