Java併發程式設計的藝術筆記(八)——執行緒池
一.執行緒池的主要處理流程
ThreadPoolExecutor執行execute方法分下面4種情況。
1)如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步需要獲取全域性鎖)。
2)如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。
3)如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執行這一步驟需要獲取全域性鎖)。
4)如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。T
二.原始碼分析
1.執行緒池任務執行的方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException();// 如果執行緒數小於核心執行緒數,則建立執行緒並執行當前任務 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 如執行緒數大於等於核心執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); }// 如果執行緒池不處於執行中或任務無法放入佇列,並且當前執行緒數量小於最大允許的執行緒數量, // 則建立一個執行緒執行任務。 else if (!addIfUnderMaximumPoolSize(command)) // 丟擲RejectedExecutionException異常 reject(command); // is shutdown or saturated } }
2.工作執行緒:執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務
後,還會迴圈獲取工作佇列裡的任務來執行。我們可以從Worker類的run()方法裡看到這點。
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
三.執行緒池的使用
3.1 執行緒池的建立
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
corePoolSize 核心執行緒池
runnableTaskQueue 任務佇列,可選ArrayBlockingQueue、LinkedBlockingQueue(吞吐量高)
maximumPoolSize(執行緒池最大數量)
RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常
keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。
TimeUnit(執行緒活動保持時間的單位)
3.2 拒絕策略
.AbortPolicy:直接丟擲異常。
·CallerRunsPolicy:只用呼叫者所線上程來執行任務。
·DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
·DiscardPolicy:不處理,丟棄掉
3.3 向執行緒池提交任務
execute()和submit()
//execute 無返回 threadsPool.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }); //submit有返回 Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理無法執行任務異常 } finally { // 關閉執行緒池 executor.shutdown(); }
3.4 關閉執行緒池
shutdown或shutdownNow方法來關閉執行緒池。
原理:遍歷線程池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。
區別:shutdownNow首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,而shutdown只是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。只要呼叫了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。至於應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常呼叫shutdown方法來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow方法。
四.合理配置執行緒池
分析任務特性:
·任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
·任務的優先順序:高、中和低。
·任務的執行時間:長、中和短。
·任務的依賴性:是否依賴其他系統資源,如資料庫連線。
CPU密集型任務應配置儘可能小的執行緒,如配置N cpu +1個執行緒的執行緒池。
由於IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*N cpu 。
依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼執行緒數應該設定得越大,這樣才能更好地利用CPU。
建議使用有界佇列。建議值幾千。如資料庫查詢出了問題,佇列就會一直增加。
五.監控執行緒池
通過擴充套件執行緒池進行監控。可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和執行緒池關閉前執行一些程式碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。
這幾個方法線上程池裡是空方法。
protected void beforeExecute(Thread t, Runnable r) { }
六.幾種常見的執行緒池
(1)ThreadPoolExecutor
1)FixedThreadPool 固定執行緒數
FixedThreadPool適用於為了滿足資源管理的需求,而需要限制當前執行緒數量的應用場
景,它適用於負載比較重的伺服器。
2)SingleThreadExecutor 單個執行緒
適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個執行緒是活動的應用場景。
3)CachedThreadPool 根據需要建立新執行緒
是大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。
七.擴充套件:Executor框架
7.1 Executor框架成員
主要成員ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future介面、Runnable介面、Callable介面和Executors。
(1)ThreadPoolExecutor 固定執行緒數
ThreadPoolExecutor通常使用工廠類Executors來建立。Executors可以建立3種類型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
下面分別介紹這3種ThreadPoolExecutor。
1)FixedThreadPool。Executors提供的建立使用固定執行緒數的FixedThreadPool的API。
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactorythreadFactory)
使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列
FixedThreadPool適用於為了滿足資源管理的需求,而需要限制當前執行緒數量的應用場景,它適用於負載比較重的伺服器。
2)SingleThreadExecutor 單個執行緒
下面是Executors提供的,建立使用單個執行緒的SingleThread�Executor的API。
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列
SingleThreadExecutor適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個執行緒是活動的應用場景。
3)CachedThreadPool。下面是Executors提供的,建立一個會根據需要建立新執行緒的CachedThreadPool的API。
public static ExecutorService newCachedThreadPool() public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
使用沒有容量的SynchronousQueue作為執行緒池的工作佇列
CachedThreadPool是大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。
(2)ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor通常使用工廠類Executors來建立。Executors可以建立2種類型的ScheduledThreadPoolExecutor,如下。
·ScheduledThreadPoolExecutor。包含若干個執行緒的ScheduledThreadPoolExecutor。
·SingleThreadScheduledExecutor。只包含一個執行緒的ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor適用於需要多個後臺執行緒執行週期任務,同時為了滿足資源管理的需求而需要限制後臺執行緒的數量的應用場景。
SingleThreadScheduledExecutor適用於需要單個後臺執行緒執行週期任務,同時需要保證順序地執行各個任務的應用場景。
(3)Future介面
Future介面和實現Future介面的FutureTask類用來表示非同步計算的結果。當我們把Runnable介面或Callable介面的實現類提交(submit)給ThreadPoolExecutor或
ScheduledThreadPoolExecutor時,ThreadPoolExecutor或ScheduledThreadPoolExecutor會向我們返回一個FutureTask物件。
(4)Runnable介面和Callable介面
Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或Scheduled�ThreadPoolExecutor執行。它們之間的區別是Runnable不會返回結果,而Callable可以返回結
果。
除了可以自己建立實現Callable介面的物件外,還可以使用工廠類Executors來把一個Runnable包裝成一個Callable。