Java執行緒池---Executor框架原始碼深度解析
(文末有公眾號二維碼,Java腦洞世界,深度好文等你來讀)
1:為什麼會需要執行緒池技術?
- (1)Thread是一個重量級的資源,它的建立,啟動以及銷燬都是比較耗費效能的;重複利用執行緒,減少執行緒建立,銷燬的開銷,是一種好的程式設計習慣。
- (2)通過new Thread的方法建立執行緒難以管理,並且難以控制數量,執行緒的數量通常和系統的效能呈拋物線關係,合理控制執行緒數量才能發揮出系統最強的效能。
- (3)使用new Thread的方式建立的執行緒不利於擴充套件,比如定時,定期執行任務實現起來相對麻煩,但執行緒池提供了相應的穩定並且高效的解決方案。
2:執行緒池的原理
所謂執行緒池,我們可以簡單地把它理解成一個池子,只不過裡面裝的都是已經建立好的執行緒,當我們向這個池子提交任務時,池子中的某個執行緒就會主動地執行這個任務。當我們提交的任務數量大於池子中的執行緒時,執行緒池會自動建立執行緒加入池子中,但是會有數量的控制,就像游泳池裡的水一樣,當水達到一定的量時,就會溢位了。當任務比較少的時候,執行緒池會在系統空閒的時候自動地回收資源。為了能夠非同步地提交任務和快取未處理的任務,需要有一個任務快取佇列。
小插曲: 執行緒池是如何提升效能的? 解:比如你們的伺服器執行一項任務的時候為T, 建立一個執行緒的時間為T1, 執行任務花了T2的時間, 執行緒銷燬用了T3的時間。 那麼顯然T = T1 + T2 + T3 因為我們是從池子裡面拿出執行緒出來執行任務的, 用完之後,又把執行緒放回池子裡, 所以T1和T3就基本可以忽略不計了。 因此T = T2
3:執行緒池的組成要素
- 任務列隊: 用來快取提交的任務
- 工作執行緒佇列: 一個執行緒池要想很好地管理和控制執行緒數量,可以通過後面的兩個引數實現,執行緒池需要維護的核心數量corePoolSize,執行緒池自動擴充執行緒時最大的數量:maximumPoolSize。
- 任務拒絕策略: 如果執行緒數量已達上限並且任務佇列也滿了,就需要相應的拒絕策略來告訴提交任務的人。
- 執行緒工廠: 用於建立執行緒的,按需定製執行緒,執行我們的提交的任務。
4:JDK提供的執行緒池實現方案---Executor框架
前面三個小節主要是給大家簡單回憶一下執行緒池的一些基礎知識,接下來就正式進入我們這次的主題,看看Doug Lea大神是如何實現整個執行緒池框架的。
Executor是一個強大且非常靈活的非同步執行框架,它以Runnable為任務物件,並且提供了一套將任務提交和任務執行分離開來的機制,讓提交任務的過程和執行任務的過程得到充分的解耦。Executor還提供了對執行緒池生命週期的管理,以及統計資訊收集,和應用程式管理機制和效能監控,以及任務排程功能。隨後,我們就開始看看這個強大的Executor框架是怎麼樣一步一步實現的。
4.1:Executor UML
我們先通過Executor的UML整體瞭解一下Executor框架:

Executor UML
- Executor(解耦利器): Executor採用了 命令設計模式 ,讓任務的提交和任務的執行之間得到了充分的解耦。
- ExecutorSevice: 它擴充套件了Executor介面,我們常用的執行緒池基本都是實現了ExecutorService,它提供了豐富的操作。
- ScheduledExecutorService: 可定時排程的介面,它可以對提交了的任務進行延遲執行或者週期性執行。
- AbstractExecutorService: ExecutorService介面的預設實現,他只實現了Executor中的部分方法,並提供了一個newTaskFor()方法,返回了一個RunnableFuture物件。
- ForkJoinPool: ForkJoinPool可以充分利用CPU,多核CPU的優勢,它支援將一個任務拆分成多個小任務的,然後把這些小任務放到CPU的多個核心並行處理,最後再把結果合並起成,得到最終的結果的特殊執行緒池。
- ThreadPoolExecutor: 執行緒池,所有的核心實現都在這裡了!
- ScheduledThreadPoolExecutor: 可排程的執行緒池。
4.2:細說Executor原始碼
從整體看完Executor框架後,現在我們要開始,我們就採取自頂向下的方式詳細深入地聊聊Executor了。
4.2.1 Executor介面原始碼分析
public interface Executor { /** * 在未來的那個時間點執行command任務, * 具體是在新的執行緒中執行呢,還是線上程池, * 又或者是呼叫執行緒中執行,則由Executor的實現者決定 */ void execute(Runnable command); }
像我們前面說的一樣,Executor介面的主要實現的功能是,讓任務的提交和任務的執行得到解耦,通常,Executor是用來代替顯式建立執行緒的。比如:相比通過
//eg1: new Thread(new RunnableTask()).start().
建立一組執行緒,更好的方式是:
//eg2: Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
為什麼會更好?這個問題就充分體現了我們這篇文章一開始所講的,為什麼需要執行緒池技術這一個論點了。
但需要注意的一個點是: 我們通過eg1方式一定是非同步執行的。 而eg2這種方式並不嚴格要求非同步執行某個,比如下面的例子中,他會在呼叫執行緒中立即執行任務: //eg3: class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } } 但更具代表性的還是在其它執行緒中執行任務: //eg4: class DirectExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
4.2.2 ExecutorService介面原始碼分析
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ExecutorService提供了終止執行緒池的管理方法,並且還提供了一些返回一個Future物件的方法,通過Future物件,我們就可以跟蹤到非同步任務的程序了。
一個ExecutorService是可以被關閉的,如果ExecutorService被關閉了,它將會拒絕接收新的任務。有兩個不同的方法可以關閉ExecutorService
- shutdown() 允許先前提交的任務在終止之前執行。
- shutdownNow() 會阻止開啟新的任務並且嘗試停止當前正在執行的任務。
當一個執行緒池被終止時,沒有正在執行的任務,也沒有等待執行的任務,也沒有新的任務可以提交,一個沒有被使用的池程池應該關閉以允許回收它的資源。
submit()方法擴充套件了Executor#execute(Runnable)方法,建立被返回一個Future物件,這個物件可以用於取消任務的執行或者等待任務完成並取出返回值,至於如何取消任務,或者取值,大家可以參考一些對Future介面的使用案例,這裡就不擴充套件了。
invokeAny()和invokeAll()方法是以批量的形式執行一組任務,然後等待至少一個或者全部的任務完成。
ExecutorCompletionService類可以用來定義於這些方法,包括submit(),invokeAll()和invokeAny().
4.2.3 AbstractExecutorService抽象類原始碼分析
AbstractExecutorService類提供了ExecutorService介面部分方法的預設實現,這個類使用了newTaskFor()方法返回的RunnableFuture物件實現了submit(),invokeAny()和invokeAll()方法,如果你熟悉Future體系,那在看原始碼就是水到渠成的事情了。如果你還不熟悉,我之前也寫了一篇關於Future體系,深入分析的文章。
ofollow,noindex">Future體系原始碼深度解析public abstract class AbstractExecutorService implements ExecutorService { //通過runnable和一個value引數,建立一個FutureTask protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } //通過一個callable建立一個FutureTask protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } }
第一個方法中的value其實就是當你的任務執行完成時,你想要通過Future#get()拿到的預期的結果,而第二個方法就是藉助了Callable介面,拿到了任務的執行結果。上面我提到的那篇文章有深入詳細的分析。如果你感興趣這整個過程,可以看看,都是乾貨。
接下來,我們就開始分析 submit()方法 :
//通過Future#get()獲取出來的值,永遠都為null public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } //通過Future#get()獲取出來的值為result。 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } //通過Future#get()獲取出來的值為任務執行的結果。 public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
sumbit()方法有三個過載,主要是提交的引數不同。然後由這些引數來建立一個FutureTask.然後交給Executor#execute()方法執行,最後再通過他的get()方法就可以獲取到一個執行結行的結果了,但是需要注意的一點就是,這三個方法的返回值是不一樣的,前面兩個方法的返回情況還是比較簡單的,但是第三個方法,再獲取返回值的時候,不一定能獲取到執行結果的,需要結合任務執行的情況一起合析,還有一個點就是,再呼叫get()方法使,會讓當前執行緒阻塞,直到任務執行完成。
invokeAll()方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { //如果任務集後為空,則丟擲一個NullPointerException if (tasks == null) throw new NullPointerException(); //建立一個和任務數量等大的ArrayList. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); //標誌任務是否完成 boolean done = false; try { //將tasks裡面的每個Callable轉化成Future,新增到futures裡面,並交給Executor#execute()方法執行 for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } //判斷futures裡面的Future是否執行結束,如果還沒有完成,通過get()阻塞直到任務完成 //也是就說執行完這一段程式碼,futures裡面的每一個任務都是執行完成的情況 for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; //返回ArrayList return futures; } finally { if (!done)//如果任務沒有完成的,就全部都取消,並釋放記憶體 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true);//採用中斷執行緒的方式取消任務 } }
invokeAll()方法整體分析下面,也是很簡單的,可以批量提交併執行任務,當所有任務都執行完成時,返回一個儲存任務狀態和執行結果的Future列表。它的過載方法也是類似的,就是加入了一個超時時間,不管是所有的任務都執行完,還是已經到達超時的時間,只有兩個有中滿足其中一個,就會返回一個儲存任務狀態和執行結果的Future列表。下面簡單分析一下:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { //如果任務集後為空,則丟擲一個NullPointerException if (tasks == null) throw new NullPointerException(); //將超時時間轉化成納秒 long nanos = unit.toNanos(timeout); //建立一個和任務數量等大的ArrayList. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); //標誌任務是否完成 boolean done = false; try { //將tasks裡面的每個Callable轉化成Future,新增到futures裡面 for (Callable<T> t : tasks) futures.add(newTaskFor(t)); //超時時間點 final long deadline = System.nanoTime() + nanos; //記錄一下futures的大小, final int size = futures.size(); //執行任務,如果超時就直接返回futures for (int i = 0; i < size; i++) { execute((Runnable)futures.get(i)); nanos = deadline - System.nanoTime(); if (nanos <= 0L) return futures; } //判斷futures裡面的Future是否執行結束,如果還沒有完成,通過get()阻塞直到任務完成 //也是就說執行完這一段程式碼,要麼futures裡面的每一個任務都是執行完成了,要麼就是超時了 for (int i = 0; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { if (nanos <= 0L) return futures; try { //這裡捕捉了get()方法可能出現了所有的異常 f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } nanos = deadline - System.nanoTime(); } } //標記任務完成 done = true; return futures; } finally { if (!done)//如果任務沒有完成的,就全部都取消,並釋放記憶體 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true);//採用中斷執行緒的方式取消任務 } }
invokeAny()方法也是執行給定的一批量任務,通是通過內部的doInvokeAny()方法完成了,與invokeAll()方法不同的是這一批任務中的某個任務完成了,就返回他的結果,所以他返回的是最快執行完成的那個任務的結果,他的過載方法,加入了超時機制,如果超過了限制的時間也沒有一個任務完成,那麼就會丟擲超時異常了。
前方高能!
經過前面這麼多知識的鋪墊,是時候要開始核心 ThreadPoolExecutor執行緒池 的學習了,我們先來認識它的成員變數,認清了他們有助於更好地進行原始碼閱讀。
public class ThreadPoolExecutor extends AbstractExecutorService { //預設為false,用於控制核心執行緒在空閒狀態是否會被回收。 private volatile boolean allowCoreThreadTimeOut; //存活的時間,當非核心執行緒閒置的時間超過它時,將會被回收 private volatile long keepAliveTime; //核心執行緒的數量,預設情況下即使核心執行緒處於空閒狀態也不會被回收, //除非把allowCoreThreadTimeOut設定為true. private volatile int corePoolSize; //存放任務的佇列,如果 當前執行緒數>核心執行緒數新提交的任務將會被放入到該任務對列。 //它是一個阻塞佇列,通常有SynchronousQueue(直接提交),LinkedBlockingQueue(無界佇列), //ArrayBlockingQueue (有界佇列),三種 private final BlockingQueue<Runnable> workQueue; //執行緒池能容納最大的執行緒數 private volatile int maximumPoolSize; //建立新執行緒使使用的執行緒工廠 private volatile ThreadFactory threadFactory; //當任務佇列滿並且當前執行緒等於執行緒池能容納的最大執行緒數時所採用的拒絕策略 private volatile RejectedExecutionHandler handler; //預設的拒絕策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //統計執行緒池完成的任務數量 private long completedTaskCount; //工作執行緒池,存放工作執行緒的地方,Worker包含一個執行緒和對應的任務 private final HashSet<Worker> workers = new HashSet<Worker>(); //用於記錄最大的工作執行緒池的工作執行緒的大小. private int largestPoolSize; }
以上是執行緒池的核心引數,整個執行緒池主要就是圍繞著這些引數開展的,所以牢牢記住這些引數,對閱讀執行緒池的原始碼的幫助是非常大的。接下來我們瞭解一下執行緒池的狀態,他和FutureTask的狀態一樣,也是由一組int型別變數也標識的
//ctl是控制執行緒的狀態的,裡面包含兩個狀態,執行緒的數量和執行緒池執行的狀態 //它是一AtomicInteger型別,由後面的操作可以得知, //高3位是用來儲存執行緒池執行的狀態的,低29位用於儲存執行緒的數量,這裡的限制是2^29-1,大約有5億3千6百萬左右。 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //COUNT_BITS =29,用於位於操作 private static final int COUNT_BITS = Integer.SIZE - 3; //2^29-1 =536,870,911 執行緒的容量 private static final int CAPACITY= (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //高3位:111:接受新任務並且繼續處理阻塞佇列中的任務 private static final int RUNNING= -1 << COUNT_BITS; //高3位:000:不接受新任務但是會繼續處理阻塞佇列中的任務 private static final int SHUTDOWN=0 << COUNT_BITS; //高3位:001:不接受新任務,不在執行阻塞佇列中的任務,中斷正在執行的任務 private static final int STOP=1 << COUNT_BITS; //高3位:010:所有任務都已經完成,執行緒數都被回收,執行緒會轉到TIDYING狀態會繼續執行鉤子方法 private static final int TIDYING=2 << COUNT_BITS; //高3位:110:鉤子方法執行完畢 private static final int TERMINATED =3 << COUNT_BITS; // 用於打包ctl或者拆包ctl的,如果你原始碼看得比較多,這種操作應該是經常看到的 private static int runStateOf(int c){ return c & ~CAPACITY; } private static int workerCountOf(int c){ return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
執行緒池之間的狀態轉化情況:
- RUNNING -> SHUTDOWN: 呼叫了shutdown()方法,也可能隱含在finalize()方法中
- (RUNNING or SHUTDOWN) -> STOP: 呼叫了shutdownNow()方法
- SHUTDOWN -> TIDYING: 佇列和執行緒池都是空的時候
- STOP -> TIDYING: 執行緒池為空的時候
- TIDYING -> TERMINATED: terminated()鉤子方法執行完成
在瞭解了執行緒池的這些基本引數之後,就可以開始正式看實現過程了。但是閱讀了這麼久,先休息一下再繼續往下面讀吧,不著急,慢慢來。
如果休息夠了,我們就繼續吧,先從建構函式入手,ThreadPoolExecutor有四個過載的建構函式,但是我們看引數最多的一個就知道這個建構函式怎麼用了,
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //這裡主要是對執行緒池的基本引數的判斷, //其中keepAliveTimen必須大於0, //並且maximumPoolSize>corePoolSize>0 //否則就丟擲 IllegalArgumentException異常 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); //建構函式中傳入的workQueue,threadFactory,handler均不能為null //否則就會丟擲NullPointerException if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); //然後就開始賦值操作。 this.acc = System.getSecurityManager() == null ?null :AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; //unit是控制keepAliveTime的時間單位的 this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
在有了前面的對執行緒池重要引數講解的鋪墊,ThreadPoolExecutor的建構函式就變得特別的清淅和簡單了,其中keepAliveTime的時間單位由unit引數控制,必須>0,然後maximumPoolSize>corePoolSize>0,任務佇列,執行緒工廠,拒絕策略均不能為null。如果在使用了其它的建構函式,可以會使用預設的的執行緒工廠和預設的拒絕策略。
接下來我們先來看一個案例
public static void main(String[] args) throws Exception { ExecutorService executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3)); for (int i = 1; i <= 10; ++i) { final int index = i; Runnable runnable = () -> { try { System.out.println(Thread.currentThread().getName() + "我是任務" + index); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } executor.submit(runnable); //這裡確認id小的任務先提交到執行緒池 TimeUnit.MILLISECONDS.sleep(30); } executor.shutdown(); } //輸入 pool-1-thread-1 我是任務1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6d03e736 rejected from java.util.concurrent.ThreadPoolExecutor@568db2f2 [Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0] pool-1-thread-2 我是任務2 pool-1-thread-3 我是任務6 pool-1-thread-4 我是任務7 pool-1-thread-1 我是任務3 pool-1-thread-2 我是任務4 pool-1-thread-3 我是任務5
我們建立了一個核心執行緒為:2,最大執行緒數為:4,空閒執行緒存活的時間為:10秒,有界佇列的容量為:3的執行緒池。然後我們模擬提交10個任務,為了讓id小的任務先有機會執行,我們提交一個任務後先休眠30ms,然後模擬每個任務需要執行1秒,確認10個任務都是先提交了,才有任務執行完,我們先分析一下執行的結程:
任務1:被核心執行緒1執行 任務2:被核心執行緒2執行 任務3:此時已經沒有核心執行緒了,所以被放到有介面佇列裡面,任務個數:1 任務4:此時已經沒有核心執行緒了,所以被放到有介面佇列裡面,任務個數:2 任務5:此時已經沒有核心執行緒了,所以被放到有介面佇列裡面,任務個數:3 (任務佇列滿) 任務6:核心執行緒都被佔用了,佇列也已經滿了,所有建立新執行緒3來執行 任務7:核心執行緒都被佔用了,佇列也已經滿了,所有建立新執行緒4來執行(達到最大執行緒數) 任務8:佇列滿了,也達到最大執行緒數了,被拒絕掉(執行任務拒絕策略) 任務9:佇列滿了,也達到最大執行緒數了,被拒絕掉(執行任務拒絕策略) 任務10:佇列滿了,也達到最大執行緒數了,被拒絕掉(執行任務拒絕策略) 核心執行緒1把任務1執行完畢,從任務佇列中拿出任務3執行 核心執行緒2把任務2執行完畢,從任務佇列中拿出任務4執行 執行緒3把任務6執行完畢,從任務佇列中拿出任務5執行, ... 然後回收空閒執行緒
這個是它的大概過程,但是至於那個執行緒執行那個任務,這個就由執行緒搶到CPU資源來決定了。多次執行後,結果不一致。
大家看到我在這個案例中並沒有使用 Executors 來建立執行緒池,為什麼?這裡有一個特別主要的原因就是,我們通過ThreadPoolExecutor的建構函式來建立一個執行緒池的時候,我對整個過程是非常清淅,但是如果採有Executors來建立的時候,可能這種感覺就會慢慢下降! 然後還有另外一個重要的依據是來自阿里巴巴的Java開發規範,我全力呼籲大家都遵守裡面的條例,讓我們的編碼風格規範起來!

執行緒池使用規範
接下來我們就開始分析execute()方法了:
public void execute(Runnable command) { //提交的任務command不能為null,不然會丟擲NullPointerException if (command == null) throw new NullPointerException(); /** * 緊接著會進行如下三個步驟: * * 1:如果當前執行的執行緒數小於 corePoolSize,則馬上嘗試使用command物件建立一個新執行緒。 * 呼叫addWorker()方法進行原子性檢查runState和workerCount,然後通過返回false來防止在不應該 * 新增執行緒時添加了執行緒產生的錯誤警告。 * * 2:如果一個任務能成功新增到任務佇列,在我們新增一個新的執行緒時仍然需要進行雙重檢查 * (因為自 上一次檢查後,可能執行緒池中的其它執行緒全部都被回收了) 或者在進入此方法後, * 執行緒池已經 shutdown了。所以我們必須重新檢查狀態,如果有必要,就線上程池shutdown時採取 * 回滾入隊操作移除任務,如果執行緒池的工作執行緒數為0,就啟動新的執行緒。 * * 3:如果任務不能入隊,那麼需要嘗試新增一個新的執行緒,但如果這個操作失敗了,那麼我們知道執行緒 * 池可能已經shutdown了或者已經飽和了,從而拒絕任務。 */ //獲取執行緒池控制狀態 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//如果工作執行緒數<核心執行緒數 if (addWorker(command, true))//新增一個工作執行緒來執行任務,如果成功了,則直接返回 //if 還有要有大括號比較好,至少讓閱讀的人看起來更清楚,這裡我要小小地批評一下小Lea return; //如果新增執行緒失敗了,就再次獲取執行緒池控制狀態 c = ctl.get(); } //如果執行緒池處理RUNNING狀態,則嘗試把任務新增到任務佇列 if (isRunning(c) && workQueue.offer(command)) { // // 再次檢查,獲取執行緒池控制狀態 int recheck = ctl.get(); //如果執行緒池已經不是RUNNING狀態了,把任務從佇列中移除,並執行拒絕任務策略 //「可能執行緒池已經被關閉了」 if (! isRunning(recheck) && remove(command)) reject(command); //如果工作執行緒數為0,就新增一個新的工作執行緒 //「因為舊執行緒可能已經被回收了,所以工作執行緒數可能為0」 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //再次嘗試建立一個新執行緒來執行任務,如果還是不行,並執行拒絕任務策略 else if (!addWorker(command, false)) reject(command); }
我們可以從execute()方法看出ThreadPoolExecutor執行緒池的三個原則:
- 如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。
- 如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。
- 如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。
很明顯要開始分析addWorker()方法了,在這個方法裡面有一個語法,估計有小部分同學還沒見過呢。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {//一上來就是一個無限迴圈,不過外面有一個標籤噢 int c = ctl.get();//先獲取執行緒池控制狀態 int rs = runStateOf(c);//獲取執行緒池的狀態 //如果執行緒池的狀態為RUNNING狀態,直接跳過這個檢查 //如果執行緒池狀態不為RUNNING,那麼當滿足 // 狀態為SHUTDOWN並且任務為null,任務佇列不為空的時候也跳過這個檢查 //「也就是說:如果執行緒池的狀態SHUTDOWN時,它不接收新任務,但是會繼續執行任務佇列中的任務」 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))//worker佇列不為空 return false;//返回不能新增執行緒 for (;;) {//如果執行緒池繼續工作! int wc = workerCountOf(c);//獲取工作執行緒的數量 if (wc >= CAPACITY ||//如果工作執行緒的數量>=最大容量 //據據core來控制工作執行緒數量是>=corePoolSize 還是 >=maximumPoolSize wc >= (core ? corePoolSize : maximumPoolSize)) return false;//返回不能新增執行緒 if (compareAndIncrementWorkerCount(c))//如果原子性增加工作執行緒數(+1)成功,跳出大迴圈 break retry; c = ctl.get();// Re-read ctl//否則,再次獲取執行緒池控制狀態 if (runStateOf(c) != rs)//如果執行緒池的狀態發生改變了,再次重複大迴圈 continue retry; // else CAS failed due to workerCount change; retry inner loop } } //工作執行緒開始的標記 boolean workerStarted = false; //工作執行緒被新增的標記 boolean workerAdded = false; //Worker包裝了執行緒和任務 Worker w = null; try { w = new Worker(firstTask);//初始化worker final Thread t = w.thread;//獲取worker對應的執行緒 if (t != null) {//如果執行緒不為null final ReentrantLock mainLock = this.mainLock;// 執行緒池鎖 mainLock.lock();//獲取鎖 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());//拿著鎖重新檢查池程池的狀 //執行緒池的狀態為RUNNING或者(執行緒池的狀態SHUTDOWN並且提交的任務為null時) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果執行緒已經運行了或者還沒有死掉,丟擲一個IllegalThreadStateException異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //把worker加入到工作執行緒Set裡面 workers.add(w); int s = workers.size(); if (s > largestPoolSize)//如果工作執行緒池的大小大於largestPoolSize largestPoolSize = s;//讓largestPoolSize記錄工作執行緒池的最大的大小 workerAdded = true;//工作執行緒被新增的標記置為true } } finally { mainLock.unlock();//釋放鎖 } if (workerAdded) {//如果工作執行緒已經被新增到工作執行緒池了 t.start();//開始執行任務 workerStarted = true;//把工作執行緒開始的標記置為true } } } finally { if (! workerStarted)//如果沒有新增,那麼移除任務,並減少工作執行緒的數量(-1) addWorkerFailed(w); } return workerStarted; }
我們先在總結一下,addWorker()方法都做了什麼:
- 池程的狀態為RUNNING
- 1.採用原子性增加工作執行緒的數量(+1)
- 2.將提交的任務封裝成一個worker,並將此worker新增到workers中
- 3.啟動worker對應執行緒,然後開始執行提交的任務
- 4.如果沒有把工作執行緒新增到工作執行緒池中,那麼會移除任務,並原子性減少工作執行緒的數量(-1)
- 如果池程池的狀態是SHUTDOWN
- 它不接收新任務,但是會繼續執行任務佇列中的任務
真正執行任務的runWorker()方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread();//獲取當前執行緒(和worker繫結的執行緒) Runnable task = w.firstTask;//用task儲存在worker中的任務 w.firstTask = null;//把worker中的任務置為null w.unlock(); //釋放鎖 boolean completedAbruptly = true; try { //這個while迴圈,保證瞭如果任務佇列中還有任務就繼續拿出來執行,注意這裡的短路情況 while (task != null || (task = getTask()) != null) {//如果任務不為空,或者任務佇列中還有任務 w.lock();//獲取鎖 // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted.This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) ||//如果執行緒池的狀態>STOP,直接中斷 (Thread.interrupted() &&//呼叫者執行緒被中斷 runStateAtLeast(ctl.get(), STOP))) &&//再次檢查執行緒池的狀態如果>STOP !wt.isInterrupted())//當前執行緒還沒有被中斷 wt.interrupt();//中斷當前執行緒 try { beforeExecute(wt, task);//在任務執行前的鉤子方法 Throwable thrown = null;//用於記錄執行任務時可能出現的異常 try { //開始正式執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任務執行完成後的鉤子方法 afterExecute(task, thrown); } } finally { task = null;//把任務置為null w.completedTasks++;//把任務完成的數量+1 w.unlock();//釋放鎖 } } completedAbruptly = false; } finally { //當所有任務完成之後的一個鉤子方法 processWorkerExit(w, completedAbruptly); } }
runWorker()方法除了會執行我們提交的任務外,還會自動取出從任務佇列中的任務,與此同時還提供了任務執行之前和任務執行之後的鉤子方法,並且提供了所有任務都執行完的鉤子方法。通過這些鉤子方法,我們就可以做我們的一些處理操作了。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) {//又是一個無限迴圈 int c = ctl.get();//先獲取執行緒池的控制狀態 int rs = runStateOf(c);//獲取執行緒池的當前狀態 // Check if queue empty only if necessary. //如果池程的狀態>= STOP或者 任務佇列為空 //這裡rs>=SHUTDOWN這個判斷其實體現的是一種優化的策略 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();//交工作執行緒數原子性減少1 return null;//返回null } int wc = workerCountOf(c);//獲取工作執行緒的數量 // Are workers subject to culling? // 是否允許核心執行緒超時或者當前工作執行緒數是否大於核心執行緒數 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果當前工作執行緒數>maximumPoolSize>(1或者任務佇列為空),那麼減少工作執行緒數,並返回null //如果當前工作執行緒數>corePoolSize>(1或者任務佇列為空),且超過keepAliveTime還沒有拿到任務,那麼減少工作執行緒數,並返回null //如果允許核心執行緒超時,且超過keepAliveTime還沒有拿到任務,且當前執行緒數(>1或者任務佇列為空),那麼減少工作執行緒數,並返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://這裡就是控制一個工作執行緒能閒置的時間 workQueue.take();//一直等下面,直到拿到任務 if (r != null)//如果拿的任務不為null,直到返回任務 return r; timedOut = true;//等待指定的時間後,還沒有獲取到任務,則超時 } catch (InterruptedException retry) { timedOut = false;//如果被中斷了,將超時置為false } } }
小小地總結一下getTask()方法,首先他會不斷地檢查執行緒池的狀態,如果執行緒池的狀態為SHUTDOWN或者STOP時,它會直接返回null,同時他在特定情況下會減少工作執行緒數,也是返回null,比較重要的一點是,因為getTask()方法是從阻塞佇列中獲取任務的,所以他支援有限時間的等待poll(),和無限時間的等待take().
接下來就是我們的shutdown()方法了
public void shutdown() { //重入鎖 final ReentrantLock mainLock = this.mainLock; //獲取鎖 mainLock.lock(); try { //檢查shutdown許可權 checkShutdownAccess(); //設定執行緒池控制狀態為SHUTDOWN advanceRunState(SHUTDOWN); //中斷所有空閒的工作執行緒 interruptIdleWorkers(); //shutdown的鉤子方法 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { //釋放鎖 mainLock.unlock(); } //嘗試終止 tryTerminate(); }
整個shutdown()方法的流程是很簡單明瞭的,他會執行所有已經提交到執行緒池的任務,但是不會再接收新任務了.還有一個shutdownNow()方法,他和shutdown()方法是非常相似的,不過shutdownNow()會嘗試停止所有的任務,暫停正在任務佇列中等待的任務等等,如果有興趣的同學就要自行去閱讀他的原始碼了,這裡就不帶著大家讀了.
接下來我們就分析我們的本篇文章的最後一個方法吧. tryTerminate()
final void tryTerminate() { for (;;) {//執行緒池裡面用了大量的這種無限迴圈 int c = ctl.get();//獲取執行緒池的控制狀態 if (isRunning(c) ||// 執行緒池的執行狀態為RUNNING runStateAtLeast(c, TIDYING) || // 執行緒池的執行狀態最小要大於TIDYING (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))// 執行緒池的執行狀態為SHUTDOWN並且workQueue佇列不為null return;// 不能終止,直接返回 // 執行緒池正在執行的worker數量不為0 if (workerCountOf(c) != 0) { // Eligible to terminate // 僅僅中斷一個空閒的worker interruptIdleWorkers(ONLY_ONE); return; } // 獲取執行緒池的鎖 final ReentrantLock mainLock = this.mainLock; // 獲取鎖 mainLock.lock(); try { // 比較並設定執行緒池控制狀態為TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //終止執行緒池的鉤子方法 terminated(); } finally { // 設定執行緒池控制狀態為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 釋放在termination條件上等待的所有執行緒 termination.signalAll(); } return; } } finally { // 釋放鎖 mainLock.unlock(); } // else retry on failed CAS } }
最後和大家簡單介紹一下ThreadPoolExecutor給我們提供的四種拒絕策略:
ThreadPoolExecutor.AbortPolicy:預設的拒絕策略,處理程式遭到拒絕將丟擲執行時RejectedExecutionException ThreadPoolExecutor.CallerRunsPolicy:它直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務 ThreadPoolExecutor.DiscardPolicy:不能執行的任務將被刪除。 ThreadPoolExecutor.DiscardOldestPolicy:如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)
到這裡,執行緒池的核心程式碼,我們都已經閱讀了一篇了,如果你現在頭腦裡面,有一幅ThreadPoolExecutor的流程圖,那這一次你的閱讀是非常成功的.執行緒池在多執行緒開發的環境下的作用是非常大的,只有深刻理解了執行緒池的基本原理,瞭解執行緒池細緻的實現過程,你才能更好地使用它,再出現問題之後,也更容易排查。與此同時,將大大增加你的戰鬥力。
文章寫得比較長了,有一萬多字,能夠堅持讀完的同學,你們也是非常非常不得了了,至少你是一位非常有毅力的人,相信在你肯定能走得更遠!當然,執行緒池裡面的技術點,技術細節非常多,一篇文章難以全部講完,有一些知識點只能分開來一個一個地講,比如:ReentrantLock ,阻塞列隊等等。最後建議大家要多讀幾遍,執行緒池的程式碼有很多細節寫得都是有點繞的,很明顯是經過了精心的優化的,如果有什麼感想想法,歡迎你們留言,我們交流一下。有你讚揚,我們可以一起走得更遠!歡迎大家關注我的公眾號,那樣有什麼好文你就可以第一時間收到啦,謝謝你的閱讀。

掃一掃加入公眾號,第一次時間獲取新知識