執行緒池ThreadPoolExecutor配置、引數詳解及例子
對於執行緒池,我僅限於簡單的使用,對其原理和引數並不是很理解。以前只是在網上找個執行緒池的例子,然後模仿寫一下而已,最近專案中又再次用的執行緒池,做了一些研究,現記錄下,以備以後使用。
我以前利用執行緒池只會這樣用:
ExecutorService pool = Executors.newFixedThreadPool(5); pool.execute(new Runnable() { @Override public void run() { //具體執行緒要乾的活 } });
這個只是建立一個固定執行緒數為5的執行緒池(當然還有其他3種)。
這種寫法有一定的弊端,當系統併發的增長量遠遠高於執行緒池的消費量(比如系統是每秒中增加20個併發,而執行緒池是每秒鐘執行5個執行緒)。這就有可能造成伺服器記憶體溢位的風險。原因是newFixedThreadPool採用了無邊界佇列。可以從原始碼中一窺究竟。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
這裡LinkedBlockingQueue是無邊界佇列。ThreadPoolExecutor構造中的引數文章下面會逐一講解。總之上面的寫法不靈活,對於一些併發高的系統是不滿足的。
下面我們開始步入正題,開始研究下ThreadPoolExecutor執行緒池。
第一步,研究ThreadPoolExecutor執行緒池有幾種構造方法
第一個構造: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
第二個構造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
第三個構造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
第四個構造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
總結:從上面四個構造可以看出,其實前三個構造都是呼叫的第四個構造方法。所以我們研究第四個構造即可。
第二步,研究第四個構造的每個引數
corePoolSize: 執行緒池核心執行緒數量 int型別
maximumPoolSize:執行緒池允許執行緒的最大數量 int型別
keepAliveTime: 執行緒池中執行緒所允許的空閒時間。long型別
JDK解釋:當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。
也就是說啊,執行緒池中當前的空閒執行緒服務完某任務後的存活時間。如果時間足夠長,那麼可能會服務其它任務。
這些空閒執行緒指的是新建執行緒還是核心執行緒,還是兩者都包含。這個不是很清楚。(哪位大神知道可以告訴我下)
unit: 執行緒池維護執行緒所允許的空閒時間的單位
MICROSECONDS 微秒 一百萬分之一秒(就是毫秒/1000)
MILLISECONDS 毫秒 千分之一秒
NANOSECONDS 毫微秒 十億分之一秒(就是微秒/1000)
SECONDS 秒
MINUTES 分鐘
HOURS 小時
DAYS 天
workQueue: 執行緒池所使用的緩衝佇列(關於佇列的詳解在後面第四步)
直接提交 SynchronousQueue
無界佇列 如LinkedBlockingQueue
有界佇列 如ArrayBlockingQueue
handler: 執行緒池對拒絕任務的處理策略
ThreadPoolExecutor.AbortPolicy() 丟擲java.util.concurrent.RejectedExecutionException異常。
ThreadPoolExecutor.CallerRunsPolicy() 重試添加當前的任務,他會自動重複呼叫execute()方法。
ThreadPoolExecutor.DiscardOldestPolicy() 拋棄舊的任務
ThreadPoolExecutor.DiscardPolicy() 拋棄當前的任務。
threadFactory:執行緒工廠,主要用來建立執行緒:預設值 DefaultThreadFactory
第三步,有一個例子,對執行緒池描述的非常貼切。
有一個工廠,核定工人數量為10,每個工人同時只做一件事,
因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;
當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待
如果佇列workQueue的容量為1,且現在10個工人都在幹活,
又來一個任務A,則把A放到到佇列中等待有空閒工人來從佇列中拿任務。
但是有這麼一種情況,來的不是一個任務,是5個任務,且10個工人都沒有空閒,且佇列也放滿了(1個任務就滿了哦)
這時候,老闆一看來的活多,幹不過來了,怎麼辦,老闆就想招了,再招5個臨時工吧,這時候工廠就是15個工人。
總結:corePoolSize=10;maximumPoolSize=15。當任務數大於corePoolSize時,是先向佇列裡放,當佇列裡也滿了,還有任務不斷進來,才新建執行緒哦。且最大不會超過maximumPoolSize。
問題來了,如果佇列滿了,且執行緒數到最大執行緒數了,還進來任務怎麼辦呢。這就用到了handler異常策略(或者叫飽和策略)。
第四步,緩衝佇列詳解。
4.1、直接提交 SynchronousQueue
該佇列是將任務直接提交給執行緒而不儲存它們。在此,如果不存在空閒的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。 SynchronousQueue執行緒安全的Queue,可以存放若干任務(但當前只允許有且只有一個任務在等待),其中每個插入操作必須等待另一個執行緒的對應移除操作,也就是說A任務進入佇列,B任務必須等A任務被移除之後才能進入佇列,否則執行異常策略。你來一個我扔一個,所以說SynchronousQueue沒有任何內部容量。
比如:核心執行緒數為2,最大執行緒數為3;使用SynchronousQueue。
當前有2個核心執行緒在執行,又來了個A任務,兩個核心執行緒沒有執行完當前任務,根據如果執行的執行緒等於或多於 corePoolSize,
則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。所以A任務被新增到佇列,此時的佇列是SynchronousQueue,
當前不存在可用於立即執行任務的執行緒,因此會構造一個新的執行緒,此時又來了個B任務,兩個核心執行緒還沒有執行完。
新建立的執行緒正在執行A任務,所以B任務進入Queue後,最大執行緒數為3,發現沒地方仍了。就只能執行異常策略(RejectedExecutionException)。
4.2、無界佇列 LinkedBlockingQueue
使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有核心執行緒都在忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就沒意義了。)也就不會有新執行緒被建立,都在那等著排隊呢。如果未指定容量,則它等於 Integer.MAX_VALUE。如果設定了Queue預定義容量,則當核心執行緒忙碌時,新任務會在佇列中等待,直到超過預定義容量(新任務沒地方放了),才會執行異常策略。你來一個我接一個,直到我容不下你了。FIFO,先進先出。
比如:核心執行緒數為2,最大執行緒數為3;使用LinkedBlockingQueue(1),設定容量為1。
當前有2個核心執行緒在執行,又來了個A任務,兩個核心執行緒沒有執行完當前任務,根據如果執行的執行緒等於或多於 corePoolSize,
則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。所以A任務被新增到佇列,此時的佇列是LinkedBlockingQueue,
此時又來了個B任務,兩個核心執行緒沒有執行完當前任務,A任務在佇列中等待,佇列已滿。所以根據如果無法將請求加入佇列,則建立新的執行緒,
B任務被新建立的執行緒所執行,此時又來個C任務,此時maximumPoolSize已滿,佇列已滿,只能執行異常策略(RejectedExecutionException)。
4.3 有界佇列 ArrayBlockingQueue
操作模式跟LinkedBlockingQueue查不多,只不過必須為其設定容量。所以叫有界佇列。new ArrayBlockingQueue<Runnable>(Integer.MAX_VALUE) 跟 new LinkedBlockingQueue(Integer.MAX_VALUE)效果一樣。LinkedBlockingQueue 底層是連結串列結構,ArrayBlockingQueue 底層是陣列結構。你來一個我接一個,直到我容不下你了。FIFO,先進先出。
總結下:
使用無界佇列,要防止任務增長的速度遠遠超過處理任務的速度,控制不好可能導致的結果就是記憶體溢位。
使用有界佇列,關鍵在於調節執行緒數和Queue大小 ,執行緒數多,佇列容量少,資源浪費。執行緒數少,佇列容量多,效能低,還可能導致記憶體溢位。
第五步,我的一個實際應用例子程式碼如下(我採用的是xml配置式,更加靈活)
5.1、xml中配置執行緒池ThreadPoolExecutor
<!--執行緒池配置開始 --> <bean id="linkQueue" class="java.util.concurrent.LinkedBlockingQueue"> <constructor-arg name="capacity" value="${thread.pool.linkQueue.size}"/> </bean> <bean id="pool" class="java.util.concurrent.ThreadPoolExecutor"> <constructor-arg name="corePoolSize" value="${thread.pool.core.size}"/> <constructor-arg name="maximumPoolSize" value="${thread.pool.max.size}"/> <constructor-arg name="keepAliveTime" value="${thread.pool.keep.alive.time}"/> <constructor-arg name="unit" value="${thread.pool.time.unit}"/> <constructor-arg name="workQueue" ref="linkQueue"/> </bean> <!--執行緒池配置結束 -->
5.2、properties檔案中配置如下:
#執行緒池相關配置 #LinkedBlockingQueue佇列容量 thread.pool.linkQueue.size=10 #執行緒池核心執行緒數 thread.pool.core.size=5 #執行緒池最大執行緒數 thread.pool.max.size=10 #空閒執行緒最大存活時間 thread.pool.keep.alive.time=2 /#時間單位 SECONDS(秒) MILLISECONDS(毫秒) MICROSECONDS(微秒) thread.pool.time.unit=SECONDS
5.3、後臺java程式碼
public class SaveEventAdviceHolder { @Autowired private ThreadPoolExecutor pool; public void saveSerializeObjEvent(Object[] objects,SaveEventBO bo){ //設定執行緒池的異常策略為CallerRunsPolicy() RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); pool.setRejectedExecutionHandler(handler); pool.execute(new Runnable() { public void run() { //執行緒具體執行任務..... }); } }