(2.1.2.4)Java多執行緒(四)、執行緒池
系統啟動一個新執行緒的成本是比較高的,因為它涉及到與作業系統的互動。在這種情況下,使用執行緒池可以很好的提供效能,尤其是當程式中需要建立大量生存期很短暫的執行緒時,更應該考慮使用執行緒池。
與資料庫連線池類似的是,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個Runnable物件傳給執行緒池,執行緒池就會啟動一條執行緒來執行該物件的run方法,當run方法執行結束後,該執行緒並不會死亡,而是再次返回執行緒池中成為空閒狀態,等待執行下一個Runnable物件的run方法。
除此之外,使用執行緒池可以有效地控制系統中併發執行緒的數量,但系統中包含大量併發執行緒時,會導致系統性能劇烈下降,甚至導致JVM崩潰。而執行緒池的最大執行緒數引數可以控制系統中併發的執行緒不超過此數目。
在JDK1.5之前,開發者必須手動的實現自己的執行緒池,從JDK1.5之後,Java內建支援執行緒池。
與多執行緒併發的所有支援的類都在java.lang.concurrent包中。我們可以使用裡面的類更加的控制多執行緒的執行。
一、new Thread的弊端
-
每次new Thread新建物件,效能差
-
執行緒缺乏統一管理,可能無限制的新建執行緒,相互競爭,有可能佔用過多系統資源導致宕機或者OOM(OutOfMemory)
-
缺少更多功能,如更多執行、定期執行、執行緒中斷
二、 執行緒池的優勢
- 重用存在的執行緒,減少物件建立、消亡的開銷,效能好
- 可有效控制最大併發執行緒數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞
- 提供定時執行、定期執行、單執行緒、併發數控制等功能
三、ThreadPoolExecutor
ThreadPoolExecutor是執行緒池的核心實現類,我們來看下他的構造方法
ThreadPoolExecutor(int corePoolSize
, int maximumPoolSize, long keepAliveTime
, TimeUnit unit
, BlockingQueue<Runnable> workQueue
, ThreadFactory threadFactory
, RejectedExecutionHandler handler)
- corePoolSize :核心執行緒數量
- 預設情況下(可預建立執行緒)執行緒池後執行緒池中的執行緒數為0,當有任務提交時才會建立執行緒;
- 如果當前執行的執行緒數小於 corePoolSize, 則直接建立一個新執行緒來執行任務;
- 如果多於或者等於 corePoolSize, 則不再建立;執行後續步驟
- 如果呼叫 prestartAllcoreThread方法,執行緒池會提前建立並啟動所有的核心執行緒來等待任務
- 如果呼叫public boolean prestartCoreThread(),執行緒池會提前建立並啟動一個核心執行緒來等待任務
- 預設情況下(可預建立執行緒)執行緒池後執行緒池中的執行緒數為0,當有任務提交時才會建立執行緒;
- workQueue :阻塞任務佇列,儲存等待執行的任務
- 如果當前執行緒數大於corePoolSize,則將任務新增到該阻塞佇列;
- BlockingQueue只是一個介面,它所表達的是當佇列為空或者已滿的時候,需要阻塞以等待生產者/消費者協同操作並喚醒執行緒。其有很多不同的具體實現類,各有特點。有的可以規定佇列的長度,也有一些則是無界的。
- 有三種取值,ArrayBlockQueue(基於陣列的先進先出佇列,建立時必須指定大小)、LinkedBlockingQueue(基於連結串列的先進先出佇列,如果沒有指定此佇列大小,預設為Integer.MAX_VALUE)、SynchronousQueue(不會儲存提交的任務,直接新建一個執行緒來執行新的任務)
- ArrayBlockingQueue :可以限定佇列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒(非核心執行緒)執行任務,又如果匯流排程數到了maximumPoolSize,並且佇列也滿了,則發生錯誤
- LinkedBlockingQueue:
- 這個佇列接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。
- 由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了maximumPoolSize的設定失效,因為匯流排程數永遠不會超過corePoolSize
- SynchronousQueue:這個佇列接收到任務的時候,會直接提交給執行緒處理,而不保留它,如果所有執行緒都在工作怎麼辦?那就新建一個執行緒來處理這個任務!所以為了保證不出現<執行緒數達到了maximumPoolSize而不能新建執行緒>的錯誤,使用這個型別佇列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大
- DelayQueue:佇列內元素必須實現Delayed介面,這就意味著你傳進去的任務必須先實現Delayed介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務
- maximumPoolSize :執行緒允許建立的最大執行緒數
- 如果任務佇列滿了並且執行緒數小於 maximumPoolSize 則執行緒池仍舊會建立新的執行緒來完成任務
- keepAliveTime :非核心執行緒閒置的超時時間,超過時回收該執行緒
- 預設情況只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTIme才會起作用。
- 當執行緒池中的執行緒數大於corePoolSize,如果一個執行緒的空閒時間達到keepAliveTime,則會被終止
- 如果任務很多,並且每個任務的執行時間很短,則可以調大 keepAliveTime來提高非核心執行緒的存活時間來提高利用率
- 如果呼叫allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時keepAliveTime引數也會起作用,直到執行緒池的執行緒數為0
- unit:keepAliveTime的時間單位,有7中取值,如:TimeUnit.DAYS; 天,可具體到納秒
- threadFactory:執行緒工廠,用來建立執行緒
- 可以用執行緒工廠給每個創建出來的執行緒設定名字
- rejectHandler:飽和策略,當拒絕處理任務時的策略
- 當任務佇列和執行緒池個數都滿了時候,採取的策略,通常有四種取值
- ThreadPoolExecutor.AbortPolicy:表示無法處理新任務,丟棄任務並丟擲RejectedExecutionException異常;
- ThreadPoolExecutor.DiscardPolicy:丟棄任務,但不丟擲異常;
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,重新嘗試執行任務(重複此過程);
- ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
執行緒池的對新任務的處理流程可以如下圖所示:
【執行緒池的對新任務的處理流程】
- 如果當前poolsize小於corePoolSize,建立新執行緒執行任務
- 如果當前poolsize大於corePoolsize,且等待佇列未滿,進入等待佇列
- 如果當前poolsize大於corePoolsize且小於maximumPoolSize,且等待佇列已滿,建立新執行緒執行任務
- 如果當前poolsize大於corePoolSize且大於maximumPoolSize,且等待佇列已滿則用拒絕策略來處理該任務
- 執行緒池中的執行緒執行完任務後不會立刻退出,而是去檢查等待佇列是否有新的執行緒去執行,如果在keepAliveTime裡等不到新任務,執行緒就會退出
3.1 ThreadPoolExecutor方法
- execute():提交任務,交給執行緒池執行
- submit():提交任務,能夠返回執行結果 execute+Future
- shutdown():關閉執行緒池,等待任務都執行完
- shutdownNow():關閉執行緒池,不等待任務執行完
- getTaskCount():執行緒池已執行的和未執行的任務總數
- getCompletedTaskCount():已完成的任務數量
- getPoolSize():執行緒池當前執行緒數量
- getActiveCount():當前執行緒池正在執行任務的執行緒數量
3.2 執行緒池的種類
通過直接或著間接的配置ThreadPoolExecutor的引數可以建立不同的執行緒池物件,Java通過Executors(一個工具類,類似於TextUtils)提供了四種執行緒池:
3.2.1 FixedThreadPool 可重用固定執行緒數
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize 和 maximumPoolSize都被設定成指定的引數nThreads,這就意味著 FixedThreadPool只有核心執行緒,並且數量是固定的,沒有非核心執行緒
- keepAliveTime設定為0意味著多餘執行緒會被立即終止。
- 因為不會有多餘執行緒,其實這個引數沒啥用
- 阻塞佇列採用了 LinkedBlockingQueue 無界阻塞佇列
- 由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了maximumPoolSize的設定失效,因為匯流排程數永遠不會超過corePoolSize
【圖】
當執行execute()方法時:
- 如果當前執行的執行緒數未達到 corePoolSize時,就建立新的執行緒來處理任務
- 否則,就將任務新增到 LinkedBlockingQueue。
- 當執行緒池有空閒執行緒時,則從任務佇列中取任務執行
示例:
publicclass TestFixedThreadPool {
publicstaticvoid main(String[] args) {
//建立一個可重用固定執行緒數的執行緒池
ExecutorService pool = Executors.newFixedThreadPool(2);
//建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將執行緒放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉執行緒池
pool.shutdown();
}
}
輸出結果
pool-1-thread-1正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-1正在執行。。。
3.2.2 CachedThreadPool 無限執行緒數
每次提交的任務都會立即被執行
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- corePoolSize為 0 和 maximumPoolSize被設定成Max,這就意味著 CachedThreadPool 沒有核心執行緒,非核心執行緒的數量無限大
- keepAliveTime設定為60L意味著 空閒的執行緒最多等待60s
- 阻塞佇列採用了 SynchronousQueue 不儲存元素的阻塞佇列,每一個插入操作必須等待另一個執行緒的移除操作
【圖】
當執行execute()方法時:
- 執行 SynchronousQueue#offer 來提交任務,並查詢是否有空閒執行緒執行 poll來移除任務
- 有則交給該執行緒執行
- 無則建立一個新執行緒執行
- 執行緒池的執行緒空閒時,會執行 SynchronousQueue#pool, 阻塞式等待新的提交
- 如果超過60s無新任務,則關閉該執行緒
示例:
publicclass TestCachedThreadPool {
publicstaticvoid main(String[] args) {
//建立一個可重用固定執行緒數的執行緒池
ExecutorService pool = Executors.newCachedThreadPool();
//建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將執行緒放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉執行緒池
pool.shutdown();
}
}
輸出結果:
pool-1-thread-2正在執行。。。
pool-1-thread-4正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-5正在執行。。。
3.1.3 SingleThreadExecutor 單執行緒化
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- corePoolSize 和 maximumPoolSize都被設定成1,這就意味著 SingleThreadExecutor 只有1個核心執行緒,沒有非核心執行緒
- keepAliveTime設定為0意味著多餘執行緒會被立即終止。
- 因為不會有多餘執行緒,其實這個引數沒啥用
- 阻塞佇列採用了 LinkedBlockingQueue 無界阻塞佇列
- 由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了maximumPoolSize的設定失效,因為匯流排程數永遠不會超過corePoolSize
【圖】
當執行execute()方法時:
- 如果當前執行的執行緒數未達到 corePoolSize時,也就是沒有一個,就建立新的執行緒來處理任務
- 否則,就將任務新增到 LinkedBlockingQueue。
- 當執行緒池有空閒執行緒時,則從任務佇列中取任務執行
示例:
publicclassMyThread extends Thread {
@Override
publicvoid run() {
System.out.println(Thread.currentThread().getName() + "正在執行。。。");
}
}
publicclassTestSingleThreadExecutor {
publicstaticvoid main(String[] args) {
//建立一個可重用固定執行緒數的執行緒池
ExecutorService pool = Executors. newSingleThreadExecutor();
//建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將執行緒放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉執行緒池
pool.shutdown();
}
}
輸出結果
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
3.2.4 ScheduledThreadPool 定時週期性任務
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
- corePoolSize為 固定值 和 maximumPoolSize被設定成Max,這就意味著 CachedThreadPool 有固定個核心執行緒,非核心執行緒的數量無限大
- keepAliveTime設定為60L意味著 空閒的執行緒最多等待DEFAULT_KEEPALIVE_MILLIS
- 阻塞佇列採用了 DelayedWorkQueue 一個支援延時獲取元素的無界佇列。 建立元素式,可以指定元素的到達時間,只有到期才能被取走
【圖】
示例:
publicclass TestScheduledThreadPoolExecutor {
publicstaticvoid main(String[] args) {
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段時間就觸發異常
@Override
publicvoid run() {
//throw new RuntimeException();
System.out.println("================");
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段時間列印系統時間,證明兩者是互不影響的
@Override
publicvoid run() {
System.out.println(System.nanoTime());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}
}
================
8384644549516
8386643829034
8388643830710
================
8390643851383
8392643879319
8400643939383
四、阻塞佇列BlockingQueue
阻塞佇列常用於生產者和消費者場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒
- 當佇列沒有資料時,消費者端的所有執行緒會被自動阻塞(掛起),直到有資料放入佇列
- 當佇列資料滿額時,生產者端的所有執行緒會被自動阻塞(掛起),直到有資料被取出佇列
阻塞佇列其實就是一個容器,盛放了這些元素,但是提供了一些特殊的API去訪問這個容器,譬如實現阻塞
這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。
BlockingQueue不光實現了一個完整佇列所具有的基本功能,同時在多執行緒環境下,他還自動管理了多線間的自動等待於喚醒功能,從而使得程式設計師可以忽略這些細節,關注更高階的功能。
4.1 BlockingQueue的核心方法
- 放入資料
- offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒);
- offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
- put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.
- 獲取資料
- poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;
- poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
- take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入;
- drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。
4.2 常見BlockingQueue
【圖】
4.2.1 ArrayBlockingQueue有界
基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。
而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。
4.2.2 LinkedBlockingQueue無界
基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成)
當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理
而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。
4.2.3 DelayQueue無界延時
是一個支援延時獲取元素的無界阻塞佇列。佇列使用PriorityQueue來實現。佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。我們可以將DelayQueue運用在以下應用場景:
- 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
- 定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
4.2.4 PriorityBlockingQueue
基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖。
4.2.5 SynchronousQueue無元素
- 是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素。
- 一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。
佇列本身並不儲存任何元素,非常適合於傳遞性場景,比如在一個執行緒中使用的資料,傳遞給另外一個執行緒使用,SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。
宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:
如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。
4.2.6 LinkedBlockingDeque
是一個由連結串列結構組成的雙向阻塞佇列。所謂雙向佇列指的你可以從佇列的兩端插入和移出元素。
雙端佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。
相比其他的阻塞佇列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法
以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端佇列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端佇列的最後一個元素。
另外插入方法add等同於addLast,移除方法remove等效於removeFirst。但是take方法卻等同於takeFirst,不知道是不是Jdk的bug,使用時還是用帶有First和Last字尾的方法更清楚。
4.3 阻塞佇列的實現原理
其實阻塞佇列實現阻塞同步的方式很簡單,使用的就是是lock鎖的多條件(condition)阻塞控制
下面是Jdk 1.7中ArrayBlockingQueue部分程式碼
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//建立陣列
this.items = new Object[capacity];
//建立鎖和阻塞條件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//新增元素的方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//如果佇列不滿就入隊
enqueue(e);
} finally {
lock.unlock();
}
}
//入隊的方法
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//移除元素的方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出隊的方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
五、Java提供的執行緒池類
【圖Java提供的執行緒池類】
- Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;
- ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
- 抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;
- ThreadPoolExecutor繼承了類AbstractExecutorService。
- execute()方法實際上是Executor中宣告的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行。
- submit()在ExecutorService中宣告的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果
- 去看submit()方法的實現,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
- shutdown()
- shutdownNow()
5.1 Executors類
JDK1.5中提供Executors工廠類來產生連線池,該工廠類中包含如下的幾個靜態工程方法來建立連線池:
- public static ExecutorService newFixedThreadPool(int nThreads):建立一個可重用的、具有固定執行緒數的執行緒池。
- public static ExecutorService newSingleThreadExecutor():建立一個只有單執行緒的執行緒池,它相當於newFixedThreadPool方法是傳入的引數為1
- public static ExecutorService newCachedThreadPool():建立一個具有快取功能的執行緒池,系統根據需要建立執行緒,這些執行緒將會被快取線上程池中。
- public static ScheduledExecutorService newSingleThreadScheduledExecutor:建立只有一條執行緒的執行緒池,他可以在指定延遲後執行執行緒任務
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):建立具有指定執行緒數的執行緒池,它可以再指定延遲後執行執行緒任務,corePoolSize指池中所儲存的執行緒數,即使執行緒是空閒的也被儲存線上程池內。
上面的幾個方法都有一個過載的方法,多傳入一個ThreadFactory引數的過載方法,使用的比較少。
5.2 ExecutorService類
可以看到上面的5個方法中,前面3個方法的返回值都是一個ExecutorService物件。該ExecutorService物件就代表著一個儘快執行執行緒的執行緒池(只要執行緒池中有空閒執行緒立即執行執行緒任務),程式只要將一個Runnable物件或Callable物件提交給該執行緒池即可,該執行緒就會盡快的執行該任務。
ExecutorService有幾個重要的方法:
- boolean isShutdown()
- 如果此執行程式已關閉,則返回 true。
- boolean isTerminated()
- 如果關閉後所有任務都已完成,則返回 true。
- void shutdown()
- 啟動一次順序關閉,執行以前提交的任務,但不接受新任務。
- List shutdownNow()
- 試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。
- Future submit(Callable task)
- 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。
- Future<?> submit(Runnable task)
- 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。
- Future submit(Runnable task, T result)
- 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。
ScheduleExecutorService類是ExecutorService類的子類。所以,它裡面也有直接提交任務的submit方法,並且新增了一些延遲任務處理的方法:
- ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit)
- 建立並執行在給定延遲後啟用的 ScheduledFuture。
- ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- 建立並執行在給定延遲後啟用的一次性操作。
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- 建立並執行一個在給定初始延遲後首次啟用的定期操作,後續操作具有給定的週期;也就是將在 initialDelay 後開始執行,然後在 initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推。
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- 建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
六、原始碼分析
下面我們來深入解析一下執行緒池的具體實現原理,將從下面幾個方面講解:
1.執行緒池狀態 2.任務的執行 3.執行緒池中的執行緒初始化 4.任務快取佇列及排隊策略 5.任務拒絕策略 6.執行緒池的關閉 7.執行緒池容量的動態調整
6.1 執行緒池狀態
在 ThreadPoolExecutor 中定義了一個volatile變數,另外定義了幾個static final變量表示執行緒池的各個狀態:
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
runState表示當前執行緒池的狀態,它是一個volatile變數用來保證執行緒之間的可見性;
下面的幾個static final變量表示runState可能的幾個取值。
- 當建立執行緒池後,初始時,執行緒池處於RUNNING狀態;
- 如果呼叫了shutdown()方法,則執行緒池處於SHUTDOWN狀態,此時執行緒池不能夠接受新的任務,它會等待所有任務執行完畢;
- 如果呼叫了shutdownNow()方法,則執行緒池處於STOP狀態,此時執行緒池不能接受新的任務,並且會去嘗試終止正在執行的任務;
- 當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。
6.2 任務的執行
6.2.1 重要成員變數
一個執行緒池包括以下四個基本組成部分:
- 執行緒池管理器(ThreadPool):用於建立並管理執行緒池,包括 建立執行緒池,銷燬執行緒池,新增新任務;
- 工作執行緒(PoolWorker):執行緒池中執行緒,在沒有任務時處於等待狀態,可以迴圈的執行任務;
- 任務介面(Task):每個任務必須實現的介面,以供工作執行緒排程任務的執行,它主要規定了任務的入口,任務執行完後的收尾工作,任務的執行狀態等;
- 任務佇列(taskQueue):用於存放沒有處理的任務。提供一種緩衝機制。
private volatile int corePoolSize; //核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列)
private final BlockingQueue<Runnable> workQueue; //任務快取佇列,用來存放等待執行的任務
private volatile int maximumPoolSize; //執行緒池最大能容忍的執行緒數
private volatile long keepAliveTime; //執行緒存活時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心執行緒設定存活時間
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //執行緒工廠,用來建立執行緒
private int largestPoolSize; //用來記錄執行緒池中曾經出現過的最大執行緒數
private long completedTaskCount; //用來記錄已經執行完畢的任務個數
private volatile int poolSize; //執行緒池中當前的執行緒數
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private final ReentrantLock mainLock = new ReentrantLock(); //執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小、runState等)的改變都要使用這個鎖
Worker工作執行緒
它既實現了Runnable,同時也是一個AQS ( AbstractQueuedSynchronizer )
封裝了3樣東西,Runnable類的首個任務物件,執行的執行緒thread和完成的任務數(volatile)completedTasks。
privatefinal class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();// allow interrupts
booleancompletedAbruptly = true;
try{
while(task != null|| (task = getTask()) != null) {//是否是第一次執行任務,或者從佇列中可以獲取到任務。
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try{
beforeExecute(wt, task);//獲取到任務後,執行任務開始前操作鉤子。
Throwable thrown = null;
try{
task.run();//執行任務。
}catch(RuntimeException x) {
thrown = x; throwx;
}catch(Error x) {
thrown = x; throwx;
}catch(Throwable x) {
thrown = x; thrownew Error(x);
}finally{
afterExecute(task, thrown);//執行任務後鉤子。
}
}finally{
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
}finally{
processWorkerExit(w, completedAbruptly);
}
}
}
這兩個鉤子(beforeExecute,afterExecute)允許我們自己繼承執行緒池,做任務執行前後處理。
runWorker這段程式碼實際上就是執行提交給執行緒池執行的Runnable任務的實際內容。其中,值得注意的有以下幾點:
- 執行緒開始執行前,需要對worker加鎖,完成一個任務後執行unlock()
- 在任務執行前後,執行beforeExecute()和afterExecute()方法
- 記錄任務執行中的異常後,繼續丟擲
- 每個任務完成後,會記錄當前執行緒完成的任務數
- 當worker執行完一個任務的時候,包括初始任務firstTask,會呼叫getTask()繼續獲取任務,這個方法呼叫是可以阻塞的
- 執行緒退出,執行processWorkerExit(w, completedAbruptly)處理
Worker執行緒的複用和任務的獲取getTask()
在上一段程式碼中,也就是runWorker()方法,任務的執行過程是巢狀在while迴圈語句塊中的。每當一個任務執行完畢,會從頭開始做下一次迴圈執行,實現了空閒執行緒的複用。而要執行的任務則是來自於getTask()方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
private Runnable getTask() {
booleantimedOut = false;// Did the last poll() time out?
retry:
for(;;) {
intc = ctl.get();
intrs = runStateOf(c);
// Check if queue empty only if necessary.
if(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
returnnull;
}
booleantimed; // Are workers subject to culling?
for(;;) {
intwc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if(wc <= maximumPoolSize && ! (timedOut && timed))
break;
if(compareAndDecrementWorkerCount(c))
returnnull;
c = ctl.get();
// Re-read ctl
if(runStateOf(c) != rs)
continueretry;
// else CAS failed due to workerCount change; retry inner loop
}
try{
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r != null)
returnr;
timedOut = true;
}catch(InterruptedException retry) {
timedOut = false;
}
}
}
}
getTask()實際上是從工作佇列(workQueue)中取提交進來的任務。這個workQueue是一個BlockingQueue,通常當佇列中沒有新任務的時候,則getTask()會阻塞
- 另外,還有定時阻塞這樣一段邏輯:如果從佇列中取任務是計時的,則用poll()方法,並設定等待時間為keepAlive,否則呼叫阻塞方法take()。當poll()超時,則獲取到的任務為null,timeOut設定為 true。
- 這段程式碼也是放在一個for(;;)迴圈中,前面有判斷超時的語句,如果超時,則return null。這意味著runWorker()方法的while迴圈結束,執行緒將退出,執行processWorkerExit()方法。
processWorkerExit執行緒池執行緒數的維護和執行緒的退出處理
這個方法最主要就是從workers的Set中remove掉一個多餘的執行緒。
private void processWorkerExit(Worker w, booleancompletedAbruptly) {
if(completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
finalReentrantLock mainLock = this.mainLock;
mainLock.lock();
try{
completedTaskCount += w.completedTasks;
workers.remove(w);
}finally{
mainLock.unlock();
}
tryTerminate();
intc = ctl.get();
if(runStateLessThan(c, STOP)) {
if(!completedAbruptly) {
intmin = allowCoreThreadTimeOut ? 0: corePoolSize;
if(min == 0&& ! workQueue.isEmpty())
min = 1;
if(workerCountOf(c) >= min)
return;// replacement not needed
}
addWorker(null,false);
}
}
這個方法的第二個引數是判斷是否在runWorker()中正常退出了迴圈向下執行,如果不是,說明在執行任務的過程中出現了異常,completedAbruptly為true,執行緒直接退出,需要直接對活動執行緒數減1 。
之後,加鎖統計完成的任務數,並從workers這個集合中移除當前worker。
執行tryTerminate(),這個方法後面會詳細說,主要就是嘗試將執行緒池推向TERMINATED狀態。
最後比較當前執行緒數是不是已經低於應有的執行緒數,如果這個情況發生,則新增無任務的空Worker到執行緒池中待命。
6.2.2 execute()方法
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終呼叫的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:
public void execute(Runnable command) {
if(command == null)
thrownew NullPointerException();
intc = ctl.get();
if(workerCountOf(c) < corePoolSize) {//1
if(addWorker(command, true))
return;
c = ctl.get();
}
if(isRunning(c) && workQueue.offer(command)) {//2
intrecheck = ctl.get();
if(! isRunning(recheck) && remove(command))
reject(command);
elseif (workerCountOf(recheck) == 0)
addWorker(null,false);
}
elseif (!addWorker(command, false))//3
reject(command);
- workerCountOf方法根據ctl的低29位,得到執行緒池的當前執行緒數,如果執行緒數小於corePoolSize,則執行addWorker方法建立新的執行緒執行任務;
- 判斷執行緒池是否在執行,如果在,任務佇列是否允許插入,插入成功再次驗證執行緒池是否執行,如果不在執行,移除插入的任務,然後丟擲拒絕策略。如果在執行,沒有執行緒了,就啟用一個執行緒。
- 如果新增非核心執行緒失敗,就直接拒絕了。
【圖execute()方法】
addWorker()的實現
在上面提交任務的時候,會出現開闢新的執行緒來執行,這會呼叫addWorker()方法。
private boolean addWorker(Runnable firstTask, booleancore) {
retry:
for(;;) {
intc = ctl.get();
intrs = runStateOf(c);
// Check if queue empty only if necessary.
if(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&
! workQueue.isEmpty()))
returnfalse;
for(;;) {
intwc = workerCountOf(c);
if(wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
returnfalse;
if(compareAndIncrementWorkerCount(c))
breakretry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) != rs)
continueretry;
// else CAS failed due to workerCount change; retry inner loop
}
}
booleanworkerStarted = false;
booleanworkerAdded = false;
Worker w = null;
try{
finalReentrantLock mainLock = this.mainLock;
w = newWorker(firstTask);
finalThread t = w.thread;
if(t != null) {
mainLock.lock();
try{
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
intc = ctl.get();
intrs = runStateOf(c);
if(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if(t.isAlive()) // precheck that t is startable
thrownew IllegalThreadStateException();
workers.add(w);
ints = workers.size();
if(s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
}finally{
mainLock.unlock();
}
if(workerAdded) {
t.start();
workerStarted = true;
}
}
}finally{
if(! workerStarted)
addWorkerFailed(w);
}
returnworkerStarted;
}
第一段從第3行到第26行,是雙層無限迴圈,嘗試增加執行緒數到ctl變數,並且做一些比較判斷,如果超出執行緒數限定或者ThreadPoolExecutor的狀態不符合要求,則直接返回false,增加worker失敗。
第二段從第28行開始到結尾,把firstTask這個Runnable物件傳給Worker構造方法,賦值給Worker物件的task屬性。Worker物件把自身(也是一個Runnable)封裝成一個Thread物件賦予Worker物件的thread屬性。鎖住整個執行緒池並實際增加worker到workers的HashSet物件當中。成功增加後開始執行t.start(),就是worker的thread屬性開始執行,實際上就是執行Worker物件的run方法。Worker的run()方法實際上呼叫了ThreadPoolExecutor的runWorker()方法。