Java執行緒池ThreadPoolExecutor詳解
1、執行緒池的工作原理?
- 執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。
-
當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:
- 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
- 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列。
- 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立執行緒執行這個任務;
- 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常,告訴呼叫者“我不能再接受任務了”。
3,當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。
4,當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前執行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
這樣的過程說明,並不是先加入任務就一定會先執行。假設佇列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那麼當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然後任務 4~13 被放入佇列。這時候佇列滿了,任務 14、15、16 會被馬上執行,而任務 17~20 則會丟擲異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
2、執行緒池有哪些配置項?
執行緒池可以使用java.util.concurrent.ThreadPoolExecutor來建立,在該類中包含最全引數的建構函式如下:
相應的入參就是執行緒池可以配置的引數:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize :核心池的大小,如果呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,會直接預先建立corePoolSize的執行緒,否則當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;這樣做的好處是,如果任務量很小,那麼甚至就不需要快取任務,corePoolSize的執行緒就可以應對;
maximumPoolSize:執行緒池最大執行緒數,表示線上程池中最多能建立多少個執行緒,如果執行中的執行緒超過了這個數字,那麼相當於執行緒池已滿,新來的任務會使用RejectedExecutionHandler 進行處理;
keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止,然後執行緒池的數目維持在corePoolSize 大小;
unit:引數keepAliveTime的時間單位;
workQueue:一個阻塞佇列,用來儲存等待執行的任務,如果當前對執行緒的需求超過了corePoolSize大小,才會放在這裡;
threadFactory:執行緒工廠,主要用來建立執行緒,比如可以指定執行緒的名字;
handler:如果執行緒池已滿,新的任務的處理方式
3、執行緒池的阻塞佇列包含哪幾種選擇?
如果執行緒數超過了corePoolSize,則開始把執行緒先放到阻塞佇列裡,相當於生產者消費者的一個數據通道,有以下一些阻塞佇列可供選擇:
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
ArrayBlockingQueue是一個有邊界的阻塞佇列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。
DelayQueue阻塞的是其內部元素,DelayQueue中的元素必須實現 java.util.concurrent.Delayed介面,該介面只有一個方法就是long getDelay(TimeUnit unit),返回值就是佇列元素被釋放前的保持時間,如果返回0或者一個負值,就意味著該元素已經到期需要被釋放,此時DelayedQueue會通過其take()方法釋放此物件,DelayQueue可應用於定時關閉連線、快取物件,超時處理等各種場景;
LinkedBlockingQueue阻塞佇列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是採用了預設大小為Integer.MAX_VALUE的容量 。它的內部實現是一個連結串列。
PriorityBlockingQueue是一個沒有邊界的佇列,它的排序規則和 java.util.PriorityQueue一樣。需要注意,PriorityBlockingQueue中允許插入null物件。所有插入PriorityBlockingQueue的物件必須實現 java.lang.Comparable介面,佇列優先順序的排序規則就是按照我們對這個介面的實現來定義的。
SynchronousQueue佇列內部僅允許容納一個元素。當一個執行緒插入一個元素後會被阻塞,除非這個元素被另一個執行緒消費。
使用的最多的應該是LinkedBlockingQueue,注意一般情況下要配置一下佇列大小,設定成有界佇列,否則JVM記憶體會被撐爆!
4、如果執行緒池已經滿了可是還有新的任務提交怎麼辦?
執行緒池已滿的定義,是指執行執行緒數==maximumPoolSize,並且workQueue是有界佇列並且已滿(如果是無界隊列當然永遠不會滿);
這時候再提交任務怎麼辦呢?執行緒池會將任務傳遞給最後一個引數RejectedExecutionHandler來處理,比如列印報錯日誌、丟擲異常、儲存到Mysql/redis用於後續處理等等,執行緒池預設也提供了幾種處理方式見第5條目;
5、有哪些飽和策略可以使用?
飽和策略指的就是執行緒池已滿情況下任務的處理策略,預設有以下幾種:
在預設的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將丟擲執行時RejectedExecutionException。
在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒呼叫執行該任務的execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)
當然也可以自己實現處理策略類,繼承RejectedExecutionHandler介面即可,該介面只有一個方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
6、怎樣優化執行緒池的配置?
如何合理配置執行緒池大小,僅供參考。
一般需要根據任務的型別來配置執行緒池大小:
如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1
如果是IO密集型任務,參考值可以設定為2*NCPU
當然,這只是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先將執行緒池大小設定為參考值,
再觀察任務執行情況和系統負載、資源利用率來進行適當調整。
其中NCPU的指的是CPU的核心數,可以使用Runtime.getRuntime().availableProcessors()來獲取;
參考文章:
java多執行緒詳解(7)-執行緒池的使用 http://www.cnblogs.com/weiguo21/p/4813678.html
ThreadPoolExecutor中策略的選擇與工作佇列的選擇(java執行緒池) http://blog.csdn.net/longeremmy/article/details/8231184
【Java併發之】BlockingQueue http://blog.csdn.net/suifeng3051/article/details/48807423
。
7. ThreadPoolExecutor 構造方法
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
引數:
corePoolSize 核心執行緒池大小
maximumPoolSize 最大執行緒池大小
keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;可以
allowCoreThreadTimeOut(true)使得核心執行緒有效時間
unit 時間單位
workQueue 儲存任務的阻塞佇列
threadFactory 建立執行緒的工廠
handler 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理
1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。
2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理
5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒
6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉
8. ThreadPoolExecutor 預設有四個拒絕策略:
1、ThreadPoolExecutor.AbortPolicy 直接丟擲異常RejectedExecutionException
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2、ThreadPoolExecutor.CallerRunsPolicy 直接呼叫run方法並且阻塞執行
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3、ThreadPoolExecutor.DiscardPolicy 直接丟棄後來的任務
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
4、ThreadPoolExecutor.DiscardOldestPolicy 丟棄在佇列中隊首的任務
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
3. Executors提供的執行緒池配置方案
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
構造一個固定執行緒數目的執行緒池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多餘的任務將存在再阻塞佇列,不會由RejectedExecutionHandler處理
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
構造一個緩衝功能的執行緒池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞佇列 SynchronousQueue,因此任務提交之後,將會建立新的執行緒執行;執行緒空閒超過60s將會銷燬
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
構造一個只支援一個執行緒的執行緒池,配置corePoolSize=maximumPoolSize=1,無界阻塞佇列LinkedBlockingQueue;保證任務由一個執行緒序列執行
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
構造有定時功能的執行緒池,配置corePoolSize,無界延遲阻塞佇列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由於DelayedWorkQueue是無界佇列,所以這個值是沒有意義的