1. 程式人生 > >Java多線程:CurrentHashMap

Java多線程:CurrentHashMap

出現 .get 核心 規模 邊界 應用 sql ash his

一、 背景

線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,合理的使用線程池可以對線程進行統一的分配、調優和監控,並有以下好處: 第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。 第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。 第三:提高線程的可管理性。

二、線程池的架構

技術分享

三、Executors

用於創建線程池

newFixedThreadPool(固定大小線程池)

初始化一個指定線程數的線程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊列,不過當線程池沒有可執行任務時,也不會釋放線程。
public static ExecutorService newFixedThreadPool(int nThreads) {  
      return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());  
}

newCachedThreadPool(無界線程池,可以進行自動線程回收)

1、初始化一個可以緩存線程的線程池,默認緩存60s,使用SynchronousQueue作為阻塞隊列;
2、和newFixedThreadPool創建的線程池不同,newCachedThreadPool在沒有任務執行時,當線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷; 所以,使用該線程池時,一定要註意控制並發的任務數,否則創建大量的線程可能導致嚴重的性能問題。
public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L
, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

newSingleThreadExecutor(單個後臺線程)

初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞隊列。
public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));  
    } 

newScheduledThreadPool

  創建一個固定長度的線程池,而且以延遲或定時的方式來執行任務。 通過如上配置的線程池的創建方法源代碼,我們可以發現:   1> 除了CachedThreadPool使用的是直接提交策略的緩沖隊列以外,其余兩個采用的都是無界緩沖隊列   2> 三個線程池采用的ThreadPoolExecutor構造方法都是同一個,使用的都是默認的ThreadFactory和handler:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();  
   
 public ThreadPoolExecutor(int corePoolSize,  
                     int maximumPoolSize,  
                     long keepAliveTime,  
                     TimeUnit unit,  
                     BlockingQueue<Runnable> workQueue) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
        Executors.defaultThreadFactory(), defaultHandler);  
}
提交任務用submit(),關閉線程池用shutdown()。

四、ExecutorService任務周期管理接口

Executor的實現通常都會創建線程來執行任務,但是使用異步方式來執行任務時,由於之前提交任務的狀態不是立即可見的,所以如果要關閉應用程序時,就需要將受影響的任務狀態反饋給應用程序。 為了解決執行服務的生命周期問題,Executor擴展了EecutorService接口,添加了一些用於生命周期管理的方法。如下:
public interface ExecutorService extends Executor {  
    void shutdown();  
    List<Runnable> shutdownNow();  
    boolean isShutdown();  
    boolean isTerminated();  
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
    // 省略部分方法  
}

五、ThreadPoolExecutor

線程池的主要工作流程如下圖: 技術分享 類中定義的重要變量,如下:
  1. private final BlockingQueue<Runnable> workQueue;              // 阻塞隊列  
  2. private final ReentrantLock mainLock = new ReentrantLock();   // 互斥鎖  
  3. private final HashSet<Worker> workers = new HashSet<Worker>();// 線程集合.一個Worker對應一個線程  
  4. private final Condition termination = mainLock.newCondition();// 終止條件  
  5. private int largestPoolSize;           // 線程池中線程數量曾經達到過的最大值。  
  6. private long completedTaskCount;       // 已完成任務數量  
  7. private volatile ThreadFactory threadFactory;     // ThreadFactory對象,用於創建線程。  
  8. private volatile RejectedExecutionHandler handler;// 拒絕策略的處理句柄  
  9. private volatile long keepAliveTime;   // 線程池維護線程所允許的空閑時間  
  10. private volatile boolean allowCoreThreadTimeOut;  
  11. private volatile int corePoolSize;     // 線程池維護線程的最小數量,哪怕是空閑的  
  12. private volatile int maximumPoolSize;  // 線程池維護的最大線程數量

其中有幾個重要的規則需要說明一下:

1> corePoolSize與maximumPoolSize

線程池將根據 corePoolSize和 maximumPoolSize設置的邊界自動調整池大小,當新任務在方法 execute() 中提交時:
  • 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  • 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;當隊列滿時才創建新線程去處理請求;
  • 如果當前線程池中的線程數目達到maximumPoolSize,即隊列已經滿了,則通過handler所指定的任務拒絕策略來處理新請求;
  • 如果線程池中的線程數量大於corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;
也就是說,處理任務的優先級為:
  • 1. 核心線程corePoolSize > 任務隊列workQueue > 最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
  • 2. 當池中的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀。

2> workQueue

線程池所使用的緩沖隊列,該緩沖隊列的長度決定了能夠緩沖的最大數量,緩沖隊列有三種通用策略:   1) 直接提交。SynchronousQueue,它將任務直接提交給線程執行而不保存它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。   2) 無界隊列。使用無界隊列將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;   3) 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量.

3>ThreadFactory

使用 ThreadFactory 創建新線程。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優先級、守護進程狀態等等。如果從 newThread 返回 null 時 ThreadFactory 未能創建線程,則執行程序將繼續運行,但不能執行任何任務。
  1. public interface ThreadFactory {
  2. Thread newThread(Runnable r);
  3. }
而構造方法中的threadFactory對象,是通過 Executors.defaultThreadFactory()返回的。

4>RejectedExecutionHandler

當Executor已經關閉(即執行了executorService.shutdown()方法後),並且Executor將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法execute()中提交的新任務將被拒絕.   在以上述情況下,execute 方法將調用RejectedExecutionHandler.rejectedExecution() 方法。 下面提供了四種預定義的處理程序策略: 1) AbortPolicy 直接拋出異常 RejectedExecutionException;     2) CallerRunsPolicy 用調用者所在的線程來執行任務     3) DiscardPolicy 不能執行的任務將被刪除;     4) DiscardOldestPolicy 如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重試執行程序(如果再次失敗,則重復此過程)。

5>keepAliveTime

線程空閑時的存活時間,即當線程沒有任務執行時,繼續存活的時間;默認情況下,該參數只在線程數大於corePoolSize時才有用;

六、線程池的關閉

通過調用線程池的shutdown或shutdownNow方法來關閉線程池,但是它們的實現原理不同,shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的線程。shutdownNow是遍歷線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將線程池的狀態設置成STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。 只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定需要執行完,則可以調用shutdownNow。

七、線程池的配置

可以從以下幾個角度來進行分析: 1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。 2. 任務的優先級:高,中和低。 3. 任務的執行時間:長,中和短。 4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。 CPU密集型任務配置盡可能少的線程數量,如配置Ncpu+1個線程的線程池。 IO密集型任務則由於需要等待IO操作,線程並不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。 混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麽分解後執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。 我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。 優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要註意的是如果一直有優先級高的任務提交到隊列裏,那麽優先級低的任務可能永遠不能執行。 執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。 依賴數據庫連接池的任務,因為線程提交SQL後需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那麽線程數應該設置越大,這樣才能更好的利用CPU。 建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點

Java多線程:CurrentHashMap