Java多線程:CurrentHashMap
阿新 • • 發佈:2017-09-29
出現 .get 核心 規模 邊界 應用 sql ash his
2、和newFixedThreadPool創建的線程池不同,newCachedThreadPool在沒有任務執行時,當線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷;
所以,使用該線程池時,一定要註意控制並發的任務數,否則創建大量的線程可能導致嚴重的性能問題。
一、 背景
線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,合理的使用線程池可以對線程進行統一的分配、調優和監控,並有以下好處: 第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。 第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。 第三:提高線程的可管理性。二、線程池的架構
三、Executors
用於創建線程池newFixedThreadPool(固定大小線程池)
初始化一個指定線程數的線程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊列,不過當線程池沒有可執行任務時,也不會釋放線程。public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool(無界線程池,可以進行自動線程回收)
1、初始化一個可以緩存線程的線程池,默認緩存60s,使用SynchronousQueue作為阻塞隊列;public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor(單個後臺線程)
初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞隊列。public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
創建一個固定長度的線程池,而且以延遲或定時的方式來執行任務。 通過如上配置的線程池的創建方法源代碼,我們可以發現: 1> 除了CachedThreadPool使用的是直接提交策略的緩沖隊列以外,其余兩個采用的都是無界緩沖隊列 2> 三個線程池采用的ThreadPoolExecutor構造方法都是同一個,使用的都是默認的ThreadFactory和handler:private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
提交任務用submit(),關閉線程池用shutdown()。
四、ExecutorService任務周期管理接口
Executor的實現通常都會創建線程來執行任務,但是使用異步方式來執行任務時,由於之前提交任務的狀態不是立即可見的,所以如果要關閉應用程序時,就需要將受影響的任務狀態反饋給應用程序。 為了解決執行服務的生命周期問題,Executor擴展了EecutorService接口,添加了一些用於生命周期管理的方法。如下:public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// 省略部分方法
}
五、ThreadPoolExecutor
線程池的主要工作流程如下圖: 類中定義的重要變量,如下: 1. private final BlockingQueue<Runnable> workQueue; // 阻塞隊列
2. private final ReentrantLock mainLock = new ReentrantLock(); // 互斥鎖
3. private final HashSet<Worker> workers = new HashSet<Worker>();// 線程集合.一個Worker對應一個線程
4. private final Condition termination = mainLock.newCondition();// 終止條件
5. private int largestPoolSize; // 線程池中線程數量曾經達到過的最大值。
6. private long completedTaskCount; // 已完成任務數量
7. private volatile ThreadFactory threadFactory; // ThreadFactory對象,用於創建線程。
8. private volatile RejectedExecutionHandler handler;// 拒絕策略的處理句柄
9. private volatile long keepAliveTime; // 線程池維護線程所允許的空閑時間
10. private volatile boolean allowCoreThreadTimeOut;
11. private volatile int corePoolSize; // 線程池維護線程的最小數量,哪怕是空閑的
12. private volatile int maximumPoolSize; // 線程池維護的最大線程數量
其中有幾個重要的規則需要說明一下:
1> corePoolSize與maximumPoolSize
線程池將根據 corePoolSize和 maximumPoolSize設置的邊界自動調整池大小,當新任務在方法 execute() 中提交時:- 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
- 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;當隊列滿時才創建新線程去處理請求;
- 如果當前線程池中的線程數目達到maximumPoolSize,即隊列已經滿了,則通過handler所指定的任務拒絕策略來處理新請求;
- 如果線程池中的線程數量大於corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;
- 1. 核心線程corePoolSize > 任務隊列workQueue > 最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
- 2. 當池中的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀。
2> workQueue
線程池所使用的緩沖隊列,該緩沖隊列的長度決定了能夠緩沖的最大數量,緩沖隊列有三種通用策略: 1) 直接提交。SynchronousQueue,它將任務直接提交給線程執行而不保存它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。 2) 無界隊列。使用無界隊列將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列; 3) 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量.3>ThreadFactory
使用 ThreadFactory 創建新線程。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優先級、守護進程狀態等等。如果從 newThread 返回 null 時 ThreadFactory 未能創建線程,則執行程序將繼續運行,但不能執行任何任務。- public interface ThreadFactory {
- Thread newThread(Runnable r);
- }
4>RejectedExecutionHandler
當Executor已經關閉(即執行了executorService.shutdown()方法後),並且Executor將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法execute()中提交的新任務將被拒絕. 在以上述情況下,execute 方法將調用RejectedExecutionHandler.rejectedExecution() 方法。 下面提供了四種預定義的處理程序策略: 1) AbortPolicy 直接拋出異常 RejectedExecutionException; 2) CallerRunsPolicy 用調用者所在的線程來執行任務 3) DiscardPolicy 不能執行的任務將被刪除; 4) DiscardOldestPolicy 如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重試執行程序(如果再次失敗,則重復此過程)。5>keepAliveTime
線程空閑時的存活時間,即當線程沒有任務執行時,繼續存活的時間;默認情況下,該參數只在線程數大於corePoolSize時才有用;六、線程池的關閉
通過調用線程池的shutdown或shutdownNow方法來關閉線程池,但是它們的實現原理不同,shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的線程。shutdownNow是遍歷線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將線程池的狀態設置成STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。 只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定需要執行完,則可以調用shutdownNow。七、線程池的配置
可以從以下幾個角度來進行分析: 1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。 2. 任務的優先級:高,中和低。 3. 任務的執行時間:長,中和短。 4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。 CPU密集型任務配置盡可能少的線程數量,如配置Ncpu+1個線程的線程池。 IO密集型任務則由於需要等待IO操作,線程並不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。 混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麽分解後執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。 我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。 優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要註意的是如果一直有優先級高的任務提交到隊列裏,那麽優先級低的任務可能永遠不能執行。 執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。 依賴數據庫連接池的任務,因為線程提交SQL後需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那麽線程數應該設置越大,這樣才能更好的利用CPU。 建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點Java多線程:CurrentHashMap