Java執行緒池ThreadPoolExecutor原理詳解
目錄
前言
最近java面試,基本都會考察多執行緒的,多執行緒就一定要問執行緒池的,然而我卻在同一個問題上栽跟頭兩次,也是醉醉的。在懊悔之餘所以專門花了一個下午的時間把它詳細總結整理了一遍,也以此告誡自己學東西切不可浮躁,要靜心專研,打紮實基礎。
問題:
問:新建執行緒池有哪幾個引數,具體含義是什麼呢?
問:假如我設定corePoolSize為2,maximumPoolSize為5,現線上程池裡已經有1個了,我再往裡面新增第2到6個,具體的執行邏輯是什麼呢?
問:常用新建執行緒池的方法有哪幾個,他們與ThreadPoolExecutor有關係嗎?
執行緒池的作用
執行緒雖然提供了並行處理速度,但是執行緒的新建銷燬帶來的系統開銷是很大滴,為了能夠更科學的利用執行緒,才有了大名鼎鼎的執行緒池:ThreadPoolExecutor類。作用大致如下:
1、提高資源利用率
執行緒池可以重複利用已經建立了的執行緒,執行緒任務完成後,可以不銷燬而是被安排去做別的任務。
2、具有可管理性
程式設計師可以設定執行緒個數,執行緒池會根據系統的使用情況調整執行的執行緒,降低系統開銷。
簡單例項
package thread.threadpool; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 執行緒池的使用 * @author shu jun * */ public class ThreadPoolDemo { public static void main(String[] args) { //1. 新建五個執行緒 Thread ta = new MyThread("thread-a"); Thread tb = new MyThread("thread-b"); Thread tc = new MyThread("thread-c"); Thread td = new MyThread("thread-d"); Thread te = new MyThread("thread-e"); //2. 建立一個可重用固定執行緒數的執行緒池 ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.AbortPolicy()); //3. 將執行緒放入池中進行執行 pool.execute(ta); pool.execute(tb); pool.execute(tc); pool.execute(td); pool.execute(te); //4. 關閉執行緒池 pool.shutdown(); } } class MyThread extends Thread { public MyThread(String threadName) { this.setName(threadName); } @Override public void run() { System.out.println(Thread.currentThread().getName() + ", " + this.getName()+ " is running."); } }
輸出如下:
引數與原理
ThreadPoolExecutor有4個建構函式,最全的就是上面程式碼中的七個構造引數,其它建構函式只是預設給出引數而已。
corePoolSize |
核心執行緒池的執行緒數量 |
maximumPoolSize |
最大的執行緒池執行緒數量 |
keepAliveTime |
空閒執行緒活動保持時間,要當 執行緒數>corePoolSize時才起作用。 |
unit |
執行緒活動保持時間的單位。 |
workQueue |
指定任務佇列所使用的阻塞佇列 |
ThreadFactory |
執行緒工廠,用來建立執行緒 |
RejectedExecutionHandler |
佇列已滿,而且任務量大於最大執行緒的異常處理策略 |
執行緒往執行緒池裡面丟,最後執行呼叫pool.execute(Thread); excute函式的核心邏輯如下:
Jdk excute原始碼如下:
public void execute(Runnable command) {
// 如果任務為null,則丟擲異常。
if (command == null)
throw new NullPointerException();
// 獲取ctl對應的int值。該int值儲存了"執行緒池中任務的數量"和"執行緒池狀態"資訊
int c = ctl.get();
// 1. 當執行緒池中的任務數量 < "核心池大小"時,即執行緒池中少於corePoolSize個任務。
// 則通過addWorker(command, true)新建一個執行緒,並將任務(command)新增到該執行緒中;然後,啟動該執行緒從而執行任務。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 當執行緒池中的任務數量 >= "核心池大小"時,
// 2.1 而且,"執行緒池處於允許狀態"時,則嘗試將任務新增到阻塞佇列中。
if (isRunning(c) && workQueue.offer(command)) {
// 再次確認“執行緒池狀態”,若執行緒池異常終止了,則刪除任務;然後通過reject()執行相應的拒絕策略的內容。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 否則,如果"執行緒池中任務數量"為0,則通過addWorker(null, false)嘗試新建一個執行緒,新建執行緒對應的任務為null。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//通過addWorker(command, false)新建一個執行緒,並將任務(command)新增到該執行緒中;然後,啟動該執行緒從而執行任務。
// 3. 如果addWorker(command, false)執行失敗,則通過reject()執行相應的拒絕策略的內容。
else if (!addWorker(command, false))
reject(command);
}
- 這一步很好理解的,如果"執行緒池中任務數量" < "核心池大小"時,即執行緒池中少於corePoolSize個任務;此時就新建一個執行緒,並將該任務新增到執行緒中進行執行。
- 當執行緒池中的任務數量 >= "核心池大小"時,那麼就跟阻塞佇列workQueue相關了。來一個執行緒就丟到workQueue(這個阻塞佇列大小可以自由設定)裡面去,如果佇列容量很大很大,那就沒maximumPoolSize啥事了,一直往裡面放就可以了,但是這樣是很消耗系統資源滴,所以阻塞佇列經常是有界的,如果滿了,那就繼續第3步。
- 佇列滿了就去和maximumPoolSize判斷,小於等於則建立新執行緒,大於則不完了,按照拒絕策略RejectedExecutionHandler,執行reject。具體邏輯在這個比較複雜的addWorker函式中,作用可以理解為新增任務到阻塞佇列;
private boolean addWorker(Runnable firstTask, boolean core) {
… 此處省略若干字…
for (;;) {
// 獲取執行緒池中任務的數量。
int wc = workerCountOf(c);
// 如果"執行緒池中任務的數量"超過限制,則返回false。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
…
}
原理基本上就是以上三步了。
剩下的三個引數中keepAliveTime和unit比較好理解,那麼ThreadFactory呢?
ThreadFactory是一個執行緒工廠,應該說的是執行緒的建立方式吧,預設是用defaultThreadFactory()方法返回DefaultThreadFactory物件,裡面用newThread()來建立的執行緒,這麼搞出來的執行緒對應的任務是Runnable物件,它建立的執行緒都是“非守護執行緒”而且“執行緒優先順序都是Thread.NORM_PRIORITY”。
拒絕策略RejectedExecutionHandler具體又分為:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
阻塞佇列BlockingQueue還是很有內容的,要總結拓展的話可以挖掘出蠻多好玩的,大致分為三類:
1. 直接提交, SynchronousQueue。
它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。
2. 無界佇列, LinkedBlockingQueue。
LinkedBlockingQueue預設為空,則是無界的了,當然也可以傳入引數指定執行緒大小的。
3.有界佇列,ArrayBlockingQueue
這又點像LinkedList和ArrayList的感覺,資料儲存結構不一樣。
執行緒池的常用建立方式
常見三種方式:
- Executors.newCachedThreadPool():無限執行緒池。
- Executors.newFixedThreadPool(nThreads):建立固定大小的執行緒池。
- Executors.newSingleThreadExecutor():建立單個執行緒的執行緒池。
檢視原始碼發現其實它們呼叫的都是ThreadPoolExecutor建構函式。
類之間的繼承關係如下:
那三個常用方法都是Executors的static方法,而又去呼叫ThreadPoolExcutor;
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
可以把上面第2小節的例子修改下 , 可以測試不同的執行緒池新建方式,
//2. 建立一個可重用固定執行緒數的執行緒池
/*ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadPoolExecutor.AbortPolicy());*/
// 無限執行緒池
//ExecutorService pool = Executors.newCachedThreadPool();
// 建立固定大小的執行緒池,與上面一開始面建立ThreadPoolExecutor是等效滴
ExecutorService pool = Executors.newFixedThreadPool(2);
// 建立單個執行緒的執行緒池
//ExecutorService pool = Executors.newSingleThreadExecutor();
參考連結
執行緒池真是博大精神,還有不少可拓展研究的,不過主體脈絡應該就這些了。