1. 程式人生 > >JAVA執行緒池精華總結

JAVA執行緒池精華總結

一. 為什麼用執行緒池(執行緒池的作用

1. 建立/銷燬執行緒伴隨著系統開銷,過於頻繁的建立/銷燬執行緒,會很大程度上影響處理效率

例如:

記建立執行緒消耗時間T1,執行任務消耗時間T2,銷燬執行緒消耗時間T3

如果T1+T3>T2,那麼是不是說開啟一個執行緒來執行這個任務太不划算了!

正好,執行緒池快取執行緒,可用已有的閒置執行緒來執行新任務,避免了T1+T3帶來的系統開銷

2. 執行緒併發數量過多,搶佔系統資源從而導致阻塞

我們知道執行緒能共享系統資源,如果同時執行的執行緒過多,就有可能導致系統資源不足而產生阻塞的情況

運用執行緒池能有效的控制執行緒最大併發數,避免以上的問題

3. 對執行緒進行一些簡單的管理

比如:延時執行、定時迴圈執行的策略等

運用執行緒池都能進行很好的實現

二. 執行緒池ThreadPoolExecutor

Java裡面的執行緒池介面是ExecutorService。具體實現為ThreadPoolExecutor類。

比較重要的幾個類:

Executor 執行器介面(執行一個給定的Runnable實現)

ExecutorService

執行緒池介面(繼承Executor介面)

ThreadPoolExecutor

ExecutorService的預設實現

ScheduledExecutorService

排程執行緒池介面

ScheduledThreadPoolExecutor

ScheduledExecutorService的預設實現,和Timer/TimerTask類似,解決那些需要任務進行延遲或重複執行的問題

對執行緒池的配置,就是對ThreadPoolExecutor建構函式的引數的配置,既然這些引數這麼重要,就來看看建構函式的各個引數吧

ThreadPoolExecutor提供了四個建構函式:

//五個引數的建構函式
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六個引數的建構函式-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六個引數的建構函式-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七個引數的建構函式
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

int corePoolSize => 該執行緒池中核心執行緒數最大值

核心執行緒:

執行緒池新建執行緒的時候,如果當前執行緒總數小於corePoolSize,則新建的是核心執行緒,如果超過corePoolSize,則新建的是非核心執行緒

核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹(閒置狀態)。

如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉

很好理解吧,正常情況下你不幹活我也養你,因為我總有用到你的時候,但有時候特殊情況(比如我自己都養不起了),那你不幹活我就要把你幹掉了

int maximumPoolSize

該執行緒池中執行緒總數最大值

執行緒總數 = 核心執行緒數 + 非核心執行緒數。

long keepAliveTime

該執行緒池中非核心執行緒閒置超時時長

一個非核心執行緒,如果不幹活(閒置狀態)的時長超過這個引數所設定的時長,就會被銷燬掉

如果設定allowCoreThreadTimeOut = true,則會作用於核心執行緒

TimeUnit unit

keepAliveTime的單位,TimeUnit是一個列舉型別,其包括:

  1. NANOSECONDS : 1微毫秒 = 1微秒 / 1000
  2. MICROSECONDS : 1微秒 = 1毫秒 / 1000
  3. MILLISECONDS : 1毫秒 = 1秒 /1000
  4. SECONDS : 秒
  5. MINUTES : 分
  6. HOURS : 小時
  7. DAYS : 天

BlockingQueue<Runnable> workQueue

執行前用於保持任務的佇列。此佇列僅保持由 execute方法提交的 Runnable 任務。

一. 排隊的三種通用策略

直接提交:工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務

無界佇列:使用無界佇列(例如,不具有初始容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web頁伺服器中。這種排隊可用於處理瞬態突發請求。

有界佇列:當使用有限的 maximumPoolSizes時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。使用小型佇列和大型池,CPU使用率較高,可能遇到不可接受的排程開銷,這樣也會降低吞吐量。  

二. 常用的幾種Queue

SynchronousQueue:這個佇列接收到任務的時候,會直接提交給執行緒處理,而不保留它,如果所有執行緒都在工作怎麼辦?那就新建一個執行緒來處理這個任務!所以為了保證不出現<執行緒數達到了maximumPoolSize而不能新建執行緒>的錯誤,使用這個型別佇列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大。

LinkedBlockingQueue:這個佇列接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了maximumPoolSize的設定失效,因為匯流排程數永遠不會超過corePoolSize。

ArrayBlockingQueue:可以限定佇列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒(非核心執行緒)執行任務,又如果匯流排程數到了maximumPoolSize,則採用拒絕策略處理。

DelayQueue:佇列內元素必須實現Delayed介面,這就意味著你傳進去的任務必須先實現Delayed介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務。

ThreadFactory threadFactory

執行程式建立新執行緒時使用的工廠類,實現類需要實現Thread newThread(Runnable r)方法

RejectedExecutionHandler

拒絕策略介面。在ThreadPoolExecutor中已經預設包含了4中策略,因為原始碼非常簡單,這裡直接貼出來。

CallerRunsPolicy:呼叫 execute 方法的執行緒自身執行。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

這個策略顯然不想放棄執行任務。但是由於池中已經沒有任何資源了,那麼就直接使用呼叫該execute的執行緒本身來執行。

AbortPolicy:為JAVA執行緒池預設的阻塞策略,不執行此任務,直接丟擲一個執行時異常rejectedExecution,切記

ThreadPoolExecutor.execute需要try...catch,否則程式會直接退出。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException();
}

這種策略直接丟擲異常,丟棄任務。

DiscardPolicy:這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不丟擲異常。空方法!

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

}

DiscardOldestPolicy:如果執行緒池還未關閉,丟棄掉佇列頭部的(最老的)任務,然後重試執行程式(如果再次失敗,則重複此過程)。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
    }
}

該策略就稍微複雜一些,在pool沒有關閉的前提下首先丟掉快取在佇列中的最早的任務,然後重新嘗試執行該任務。這個策略需要適當小心。

設想:如果其他執行緒都還在執行,那麼新來任務踢掉舊任務,快取在queue中,再來一個任務又會踢掉queue中最老任務。

使用者自定義拒絕策略:實現RejectedExecutionHandler,並自己定義策略模式

新建一個執行緒池的時候,一般只用5個引數的建構函式。

三. 向ThreadPoolExecutor新增任務

有幾種不同的方式來將任務委託給 ExecutorService 去執行:

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(...)
  • invokeAll(...)

四. ThreadPoolExecutor的執行策略

上面介紹引數的時候其實已經說到了ThreadPoolExecutor執行的策略,這裡給總結一下,當一個任務被新增進執行緒池時:

  1. 執行緒數量未達到corePoolSize,則新建一個執行緒(核心執行緒)執行任務。
  2. 執行緒數量達到了corePools,則將任務移入佇列等待。
  3. 佇列已滿,新建執行緒(非核心執行緒)執行任務。
  4. 佇列已滿,匯流排程數又達到了maximumPoolSize,執行拒絕策略。

五. 常見四種執行緒池

如果你不想自己寫一個執行緒池,那麼你可以從下面看看有沒有符合你要求的(一般都夠用了),如果有,那麼很好你直接用就行了,如果沒有,那你就老老實實自己去寫一個吧

Java通過Executors提供了四種執行緒池,這四種執行緒池都是直接或間接配置ThreadPoolExecutor的引數實現的,下面我都會貼出這四種執行緒池建構函式的原始碼,各位大佬們一看便知!

來,走起:

1. CachedThreadPool()

可快取執行緒池,如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。

  1. 執行緒池大小無限制
  2. 有空閒執行緒則複用空閒執行緒,若無空閒執行緒則新建執行緒
  3. 一定程式減少頻繁建立/銷燬執行緒,減少系統開銷

建立方法:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

原始碼:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

2. FixedThreadPool()

定長執行緒池:

  1. 可控制執行緒最大併發數(同時執行的執行緒數)。
  2. 超出的執行緒會在佇列中等待。
  3. 如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

建立方法:

//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);

原始碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

2個引數的構造方法原始碼,不用我貼你也知道他把星期六放在了哪個位置!所以我就不貼了,省下篇幅給我扯皮

3. ScheduledThreadPool()

定長執行緒池:

  1. 支援定時及週期性任務執行。
  2. 執行緒池大小無限制

建立方法:

//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

原始碼:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

4. SingleThreadExecutor()

單執行緒執行緒池

  1. 有且僅有一個工作執行緒執行任務。
  2. 所有任務按照任務的提交順序執行,即遵循佇列的入隊出隊規則。
  3. 如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。

建立方法:

ExecutorService singleThreadPool = Executors.newSingleThreadPool();

原始碼:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

參考: