1. 程式人生 > >一心多用多執行緒-執行緒池ThreadPoolExecutor-看這篇就夠了

一心多用多執行緒-執行緒池ThreadPoolExecutor-看這篇就夠了

首先先寫一下執行緒池的概念: 
執行緒池:執行緒池是一種多執行緒處理形式,處理過程中將任務新增到佇列,然後在建立執行緒後自動啟動這些任務。執行緒池執行緒都是後臺執行緒。每個執行緒都使用預設的堆疊大小,以預設的優先順序執行,並處於多執行緒單元中。如果某個執行緒在託管程式碼中空閒(如正在等待某個事件),則執行緒池將插入另一個輔助執行緒來使所有處理器保持繁忙。如果所有執行緒池執行緒都始終保持繁忙,但佇列中包含掛起的工作,則執行緒池將在一段時間後建立另一個輔助執行緒但執行緒的數目永遠不會超過最大值。超過最大值的執行緒可以排隊,但他們要等到其他執行緒完成後才啟動。

在java裡面,我們就是使用ThreadPoolExecutor來建立一個執行緒池,首先我們先來看看ThreadPoolExecutor的建構函式

,通過建構函式的引數來認識一下如何構造一個執行緒池

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) 
    用給定的初始引數和預設的執行緒工廠及被拒絕的執行處理程式建立新的 ThreadPoolExecutor。

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
    keepAliveTime, TimeUnit unit, BlockingQueue workQueue, 
    RejectedExecutionHandler handler) 
    用給定的初始引數和預設的執行緒工廠建立新的 ThreadPoolExecutor。

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
    keepAliveTime, TimeUnit unit, BlockingQueue workQueue, 
    ThreadFactory threadFactory) 
    用給定的初始引數和預設被拒絕的執行處理程式建立新的 ThreadPoolExecutor。

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
    keepAliveTime, TimeUnit unit, BlockingQueue workQueue, 
    ThreadFactory threadFactory, RejectedExecutionHandler handler) 
    用給定的初始引數建立新的 ThreadPoolExecutor。

其實前面三個建構函式在底層都是使用第四個構造方法,只是在呼叫的過程中添加了一些預設的條件而已,好,那現在我們根據第四個建構函式的引數來認識執行緒池

1. 核心和最大池大小

ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見 getMaximumPoolSize())設定的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,如果執行的執行緒少於 corePoolSize,則建立新執行緒來處理請求,即使其他輔助執行緒是空閒的。如果執行的執行緒多於 corePoolSize 而少於 maximumPoolSize,則僅當佇列滿時才建立新執行緒。如果設定的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的執行緒池。如果將 maximumPoolSize 設定為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的併發任務。在大多數情況下,核心和最大池大小僅基於構造來設定,不過也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。

解決問題:執行緒池的大小定義

2.按需構造

預設情況下,即使核心執行緒最初只是在新任務到達時才建立和啟動的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進行動態重寫。如果構造帶有非空佇列的池,則可能希望預先啟動執行緒。

3.建立新執行緒

使用 ThreadFactory 建立新執行緒。如果沒有另外說明,則在同一個 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 建立執行緒,並且這些執行緒具有相同的 NORM_PRIORITY 優先順序和非守護程序狀態。通過提供不同的 ThreadFactory,可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,等等。如果從 newThread 返回 null 時 ThreadFactory 未能建立執行緒,則執行程式將繼續執行,但不能執行任何任務。

解決問題:執行緒池中的執行緒從何而來

4.保持活動時間

如果池中當前有多於 corePoolSize 的執行緒,則這些多出的執行緒在空閒時間超過 keepAliveTime 時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當池處於非活動狀態時減少資源消耗的方法。如果池後來變得更為活動,則可以建立新的執行緒。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此引數。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在關閉前有效地從以前的終止狀態禁用空閒執行緒。預設情況下,保持活動策略只在有多於 corePoolSizeThreads 的執行緒時應用。但是隻要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可將此超時策略應用於核心執行緒。

5.任務的排隊

所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此佇列與池大小進行互動:

  • 如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。
  • 如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。
  • 如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕(詳細可以檢視第6點)。

排隊有三種通用策略:

  • 直接提交。工作佇列的預設選項是SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
  • 無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
  • 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。 
    解決問題:任務到了執行緒池後儲存在什麼地方

    這裡寫圖片描述

6.被拒絕的任務

當 Executor 已經關閉,並且 Executor 將有限邊界用於最大執行緒和工作佇列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將呼叫其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。 
下面提供了四種預定義的處理程式策略:

  • 在預設的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將丟擲執行時 
    RejectedExecutionException。
  • 在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒呼叫執行該任務的 execute 
    本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
  • 在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。 在 
    ThreadPoolExecutor.DiscardOldestPolicy 
    中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。
static class ThreadPoolExecutor.AbortPolicy 
          用於被拒絕任務的處理程式,它將丟擲 RejectedExecutionException. 
static class ThreadPoolExecutor.CallerRunsPolicy 
          用於被拒絕任務的處理程式,它直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務。 
static class ThreadPoolExecutor.DiscardOldestPolicy 
          用於被拒絕任務的處理程式,它放棄最舊的未處理請求,然後重試 execute;如果執行程式已關閉,則會丟棄該任務。 
static class ThreadPoolExecutor.DiscardPolicy 
          用於被拒絕任務的處理程式,預設情況下它將丟棄被拒絕的任務。 
  •  

定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。

7.鉤子 (hook) 方法

此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後呼叫。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、蒐集統計資訊或新增日誌條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止後需要完成的所有特殊處理。 
如果鉤子 (hook) 或回撥方法丟擲異常,則內部輔助執行緒將依次失敗並突然終止。

8.佇列維護

方法 getQueue() 允許出於監控和除錯目的而訪問工作佇列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行儲存回收。

9.終止

程式 AND 不再引用的池沒有剩餘執行緒會自動 shutdown。如果希望確保回收取消引用的池(即使使用者忘記呼叫 shutdown()),則必須安排未使用的執行緒最終終止:設定適當保持活動時間,使用 0 核心執行緒的下邊界和/或設定 allowCoreThreadTimeOut(boolean)。

對執行緒池的補充: 
在java中我們可以使用Executors類直接建立一個已配置好執行緒池,Executors內部也是呼叫ThreadPoolExecutor去建立執行緒池的,詳細見api

 

有時候,系統需要處理非常多的執行時間很短的請求,如果每一個請求都開啟一個新執行緒的話,系統就要不斷的進行執行緒的建立和銷燬,有時花在建立和銷燬執行緒上的時間會比執行緒真正執行的時間還長。而且當執行緒數量太多時,系統不一定能受得了。

使用執行緒池主要為了解決一下幾個問題:

  • 通過重用執行緒池中的執行緒,來減少每個執行緒建立和銷燬的效能開銷。
  • 對執行緒進行一些維護和管理,比如定時開始,週期執行,併發數控制等等。

Executor

Executor是一個介面,跟執行緒池有關的基本都要跟他打交道。下面是常用的ThreadPoolExecutor的關係。

這裡寫圖片描述

Executor介面很簡單,只有一個execute方法。

ExecutorService是Executor的子介面,增加了一些常用的對執行緒的控制方法,之後使用執行緒池主要也是使用這些方法。

AbstractExecutorService是一個抽象類。ThreadPoolExecutor就是實現了這個類。

ThreadPoolExecutor

構造方法

ThreadPoolExecutor是執行緒池的真正實現,他通過構造方法的一系列引數,來構成不同配置的執行緒池。常用的構造方法有下面四個:

這裡寫圖片描述

  • ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue) 
    • 1
    • 2
    • 3
    • 4
    • 5
  • ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            RejectedExecutionHandler handler)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

構造方法引數說明

  • corePoolSize

    核心執行緒數,預設情況下核心執行緒會一直存活,即使處於閒置狀態也不會受存keepAliveTime限制。除非將allowCoreThreadTimeOut設定為true

  • maximumPoolSize

    執行緒池所能容納的最大執行緒數。超過這個數的執行緒將被阻塞。當任務佇列為沒有設定大小的LinkedBlockingDeque時,這個值無效。

  • keepAliveTime

    非核心執行緒的閒置超時時間,超過這個時間就會被回收。

  • unit

    指定keepAliveTime的單位,如TimeUnit.SECONDS。當將allowCoreThreadTimeOut設定為true時對corePoolSize生效。

  • workQueue

    執行緒池中的任務佇列.

    常用的有三種佇列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue

  • threadFactory

    執行緒工廠,提供建立新執行緒的功能。ThreadFactory是一個介面,只有一個方法

    public interface ThreadFactory {
      Thread newThread(Runnable r);
    }
    • 1
    • 2
    • 3

    通過執行緒工廠可以對執行緒的一些屬性進行定製。

    預設的工廠:

    static class DefaultThreadFactory implements ThreadFactory {
      private static final AtomicInteger poolNumber = new AtomicInteger(1);
      private final ThreadGroup group;
      private final AtomicInteger threadNumber = new AtomicInteger(1);
      private final String namePrefix;
    
      DefaultThreadFactory() {
          SecurityManager var1 = System.getSecurityManager();
          this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup();
          this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
      }
    
      public Thread newThread(Runnable var1) {
          Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
          if(var2.isDaemon()) {
              var2.setDaemon(false);
          }
    
          if(var2.getPriority() != 5) {
              var2.setPriority(5);
          }
    
          return var2;
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  • RejectedExecutionHandler

    RejectedExecutionHandler也是一個介面,只有一個方法

    public interface RejectedExecutionHandler {
      void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
    }
    • 1
    • 2
    • 3

    當執行緒池中的資源已經全部使用,新增新執行緒被拒絕時,會呼叫RejectedExecutionHandler的rejectedExecution方法。

執行緒池規則

執行緒池的執行緒執行規則跟任務佇列有很大的關係。

  • 下面都假設任務佇列沒有大小限制:

    1. 如果執行緒數量<=核心執行緒數量,那麼直接啟動一個核心執行緒來執行任務,不會放入佇列中。
    2. 如果執行緒數量>核心執行緒數,但<=最大執行緒數,並且任務佇列是LinkedBlockingDeque的時候,超過核心執行緒數量的任務會放在任務佇列中排隊。
    3. 如果執行緒數量>核心執行緒數,但<=最大執行緒數,並且任務佇列是SynchronousQueue的時候,執行緒池會建立新執行緒執行任務,這些任務也不會被放在任務佇列中。這些執行緒屬於非核心執行緒,在任務完成後,閒置時間達到了超時時間就會被清除。
    4. 如果執行緒數量>核心執行緒數,並且>最大執行緒數,當任務佇列是LinkedBlockingDeque,會將超過核心執行緒的任務放在任務佇列中排隊。也就是當任務佇列是LinkedBlockingDeque並且沒有大小限制時,執行緒池的最大執行緒數設定是無效的,他的執行緒數最多不會超過核心執行緒數。
    5. 如果執行緒數量>核心執行緒數,並且>最大執行緒數,當任務佇列是SynchronousQueue的時候,會因為執行緒池拒絕新增任務而丟擲異常。
  • 任務佇列大小有限時

    1. 當LinkedBlockingDeque塞滿時,新增的任務會直接建立新執行緒來執行,當建立的執行緒數量超過最大執行緒數量時會拋異常。
    2. SynchronousQueue沒有數量限制。因為他根本不保持這些任務,而是直接交給執行緒池去執行。當任務數量超過最大執行緒數時會直接拋異常。

規則驗證

前提

所有的任務都是下面這樣的,睡眠兩秒後列印一行日誌:

Runnable myRunnable = new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

所有驗證過程都是下面這樣,先執行三個,再執行三個,8秒後,各看一次資訊

executor.execute(myRunnable);
executor.execute(myRunnable);
executor.execute(myRunnable);
System.out.println("---先開三個---");
System.out.println("核心執行緒數" + executor.getCorePoolSize());
System.out.println("執行緒池數" + executor.getPoolSize());
System.out.println("佇列任務數" + executor.getQueue().size());
executor.execute(myRunnable);
executor.execute(myRunnable);
executor.execute(myRunnable);
System.out.println("---再開三個---");
System.out.println("核心執行緒數" + executor.getCorePoolSize());
System.out.println("執行緒池數" + executor.getPoolSize());
System.out.println("佇列任務數" + executor.getQueue().size());
Thread.sleep(8000);
System.out.println("----8秒之後----");
System.out.println("核心執行緒數" + executor.getCorePoolSize());
System.out.println("執行緒池數" + executor.getPoolSize());
System.out.println("佇列任務數" + executor.getQueue().size());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

驗證1

  1. 核心執行緒數為6,最大執行緒數為10。超時時間為5秒

    ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 10, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    • 1
    ---先開三個---
    核心執行緒數6
    執行緒池執行緒數3
    佇列任務數0
    ---再開三個---
    核心執行緒數6
    執行緒池執行緒數6
    佇列任務數0
    pool-1-thread-1 run
    pool-1-thread-6 run
    pool-1-thread-5 run
    pool-1-thread-3 run
    pool-1-thread-4 run
    pool-1-thread-2 run
    ----8秒之後----
    核心執行緒數6
    執行緒池執行緒數6
    佇列任務數0
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

可以看到每個任務都是是直接啟動一個核心執行緒來執行任務,一共建立了6個執行緒,不會放入佇列中。8秒後執行緒池還是6個執行緒,核心執行緒預設情況下不會被回收,不收超時時間限制。

驗證2

  1. 核心執行緒數為3,最大執行緒數為6。超時時間為5秒,佇列是LinkedBlockingDeque

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
    • 1
    ---先開三個---
    核心執行緒數3
    執行緒池執行緒數3
    佇列任務數0
    ---再開三個---
    核心執行緒數3
    執行緒池執行緒數3
    佇列任務數3
    pool-1-thread-3 run
    pool-1-thread-1 run
    pool-1-thread-2 run
    pool-1-thread-3 run
    pool-1-thread-1 run
    pool-1-thread-2 run
    ----8秒之後----
    核心執行緒數3
    執行緒池執行緒數3
    佇列任務數0
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    當任務數超過核心執行緒數時,會將超出的任務放在佇列中,只會建立3個執行緒重複利用。

驗證3

  1. 核心執行緒數為3,最大執行緒數為6。超時時間為5秒,佇列是SynchronousQueue
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
  • 1
---先開三個---
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
---再開三個---
核心執行緒數3
執行緒池執行緒數6
佇列任務數0
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-6 run
pool-1-thread-4 run
pool-1-thread-5 run
pool-1-thread-1 run
----8秒之後----
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

當佇列是SynchronousQueue時,超出核心執行緒的任務會建立新的執行緒來執行,看到一共有6個執行緒。但是這些執行緒是費核心執行緒,收超時時間限制,在任務完成後限制超過5秒就會被回收。所以最後看到執行緒池還是隻有三個執行緒。

驗證4

  1. 核心執行緒數是3,最大執行緒數是4,佇列是LinkedBlockingDeque

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
    • 1
---先開三個---
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
---再開三個---
核心執行緒數3
執行緒池執行緒數3
佇列任務數3
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
----8秒之後----
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

LinkedBlockingDeque根本不受最大執行緒數影響。

但是當LinkedBlockingDeque有大小限制時就會受最大執行緒數影響了

4.1 比如下面,將佇列大小設定為2.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(2));
  • 1
---先開三個---
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
---再開三個---
核心執行緒數3
執行緒池執行緒數4
佇列任務數2
pool-1-thread-2 run
pool-1-thread-1 run
pool-1-thread-4 run
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
----8秒之後----
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

首先為三個任務開啟了三個核心執行緒1,2,3,然後第四個任務和第五個任務加入到佇列中,第六個任務因為佇列滿了,就直接建立一個新執行緒4,這是一共有四個執行緒,沒有超過最大執行緒數。8秒後,非核心執行緒收超時時間影響回收了,因此執行緒池只剩3個執行緒了。

4.2 將佇列大小設定為1

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1));
  • 1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 4, active threads = 4, queued tasks = 1, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.sunlinlin.threaddemo.Main.main(Main.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
---先開三個---
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
pool-1-thread-1 run
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-4 run
pool-1-thread-1 run
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

直接出錯在第6個execute方法上。因為核心執行緒是3個,當加入第四個任務的時候,就把第四個放在佇列中。加入第五個任務時,因為佇列滿了,就建立新執行緒執行,建立了執行緒4。當加入第六個執行緒時,也會嘗試建立執行緒,但是因為已經達到了執行緒池最大執行緒數,所以直接拋異常了。

驗證5

  1. 核心執行緒數是3 ,最大執行緒數是4,佇列是SynchronousQueue

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    • 1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.sunlinlin.threaddemo.Main.main(Main.java:34)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
---先開三個---
核心執行緒數3
執行緒池執行緒數3
佇列任務數0
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-4 run
pool-1-thread-1 run

這次在新增第五個任務時就報錯了,因為SynchronousQueue各奔不儲存任務,收到一個任務就去建立新執行緒。所以第五個就會拋異常了。

 

為什麼用執行緒池?

  • 1.建立/銷燬執行緒伴隨著系統開銷,過於頻繁的建立/銷燬執行緒,會很大程度上影響處-理效率
  • 2.執行緒併發數量過多,搶佔系統資源從而導致阻塞
  • 3.對執行緒進行一些簡單的管理

在Java中,執行緒池的概念是Executor這個介面,具體實現為ThreadPoolExecutor類,學習Java中的執行緒池,就可以直接學習他了對執行緒池的配置,就是對ThreadPoolExecutor建構函式的引數的配置

一、ThreadPoolExecutor提供了四個建構函式:

//五個引數的建構函式
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六個引數的建構函式-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六個引數的建構函式-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七個引數的建構函式
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

下面來解釋下各個引數:

  • int corePoolSize:該執行緒池中核心執行緒數最大值

核心執行緒:執行緒池新建執行緒的時候,如果當前執行緒總數小於corePoolSize,則新建的是核心執行緒,如果超過corePoolSize,則新建的是非核心執行緒核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹(閒置狀態)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉。

  • int maximumPoolSize: 該執行緒池中執行緒總數最大值

執行緒總數 = 核心執行緒數 + 非核心執行緒數。

  • long keepAliveTime:該執行緒池中非核心執行緒閒置超時時長

一個非核心執行緒,如果不幹活(閒置狀態)的時長超過這個引數所設定的時長,就會被銷燬掉,如果設定allowCoreThreadTimeOut = true,則會作用於核心執行緒。

  • TimeUnit unit:keepAliveTime的單位

TimeUnit是一個列舉型別,其包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小時
DAYS : 天

  • BlockingQueue workQueue:該執行緒池中的任務佇列:維護著等待執行的Runnable物件

當所有的核心執行緒都在幹活時,新新增的任務會被新增到這個佇列中等待處理,如果佇列滿了,則新建非核心執行緒執行任務。
常用的workQueue型別:

  • SynchronousQueue:這個佇列接收到任務的時候,會直接提交給執行緒處理,而不保留它,如果所有執行緒都在工作怎麼辦?那就新建一個執行緒來處理這個任務!所以為了保證不出現<執行緒數達到了maximumPoolSize而不能新建執行緒>的錯誤,使用這個型別佇列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大

  • LinkedBlockingQueue:這個佇列接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了maximumPoolSize的設定失效,因為匯流排程數永遠不會超過corePoolSize

  • ArrayBlockingQueue:可以限定佇列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒(非核心執行緒)執行任務,又如果匯流排程數到了maximumPoolSize,並且佇列也滿了,則發生錯誤

  • DelayQueue:佇列內元素必須實現Delayed介面,這就意味著你傳進去的任務必須先實現Delayed介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務

  • ThreadFactory threadFactory:建立執行緒的方式,這是一個介面,你new他的時候需要實現他的Thread newThread(Runnable r)方法,一般用不上。

  • RejectedExecutionHandler handler:這玩意兒就是丟擲異常專用的,比如上面提到的兩個錯誤發生了,就會由這個handler丟擲異常,根本用不上。

二、向ThreadPoolExecutor新增任務

我們怎麼知道new一個ThreadPoolExecutor,大概知道各個引數是幹嘛的,可是我new完了,怎麼向執行緒池提交一個要執行的任務啊?

ThreadPoolExecutor.execute(Runnable command)

通過ThreadPoolExecutor.execute(Runnable command)方法即可向執行緒池內新增一個任務。

三、ThreadPoolExecutor的策略

這裡給總結一下,當一個任務被新增進執行緒池時,執行策略:

  • 1.執行緒數量未達到corePoolSize,則新建一個執行緒(核心執行緒)執行任務
  • 2.執行緒數量達到了corePools,則將任務移入佇列等待
  • 3.佇列已滿,新建執行緒(非核心執行緒)執行任務
  • 4.佇列已滿,匯流排程數又達到了maximumPoolSize,就會由(RejectedExecutionHandler)丟擲異常

+++++++++++++++++++++++++++我是分割線++++++++++++++++++++++++++++


常見四種執行緒池:

如果你不想自己寫一個執行緒池,Java通過Executors提供了四種執行緒池,這四種執行緒池都是直接或間接配置ThreadPoolExecutor的引數實現的。

1.可快取執行緒池CachedThreadPool()

原始碼:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

根據原始碼可以看出:

  1. 這種執行緒池內部沒有核心執行緒,執行緒的數量是有沒限制的。
  2. 在建立任務時,若有空閒的執行緒時則複用空閒的執行緒,若沒有則新建執行緒。
  3. 沒有工作的執行緒(閒置狀態)在超過了60S還不做事,就會銷燬。

建立方法:

ExecutorService mCachedThreadPool = Executors.newCachedThreadPool();

用法:

//開始下載
private void startDownload(final ProgressBar progressBar, final int i) {
        mCachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                int p = 0;
                progressBar.setMax(10);//每個下載任務10秒
                while (p < 10) {
                    p++;
                    progressBar.setProgress(p);
                    Bundle bundle = new Bundle();
                    Message message = new Message();
                    bundle.putInt("p", p);
                    //把當前執行緒的名字用handler讓textview顯示出來
                    bundle.putString("ThreadName", Thread.currentThread().getName());
                    message.what = i;
                    message.setData(bundle);
                    mHandler.sendMessage(message);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

2.FixedThreadPool 定長執行緒池

原始碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

根據原始碼可以看出:

  1. 該執行緒池的最大執行緒數等於核心執行緒數,所以在預設情況下,該執行緒池的執行緒不會因為閒置狀態超時而被銷燬。
  2. 如果當前執行緒數小於核心執行緒數,並且也有閒置執行緒的時候提交了任務,這時也不會去複用之前的閒置執行緒,會建立新的執行緒去執行任務。如果當前執行任務數大於了核心執行緒數,大於的部分就會進入佇列等待。等著有閒置的執行緒來執行這個任務。

建立方法:

//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads);

//threadFactory => 建立執行緒的方法,用得少
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

用法:

private void startDownload(final ProgressBar progressBar, final int i) {
        mFixedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
               //....邏輯程式碼自己控制
            }
        });
    }

3.SingleThreadPool

原始碼:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

根據原始碼可以看出:

  1. 有且僅有一個工作執行緒執行任務
  2. 所有任務按照指定順序執行,即遵循佇列的入隊出隊規則

建立方法:
ExecutorService mSingleThreadPool = Executors.newSingleThreadPool();

用法同上。

4.ScheduledThreadPool

原始碼:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

根據原始碼可以看出:
DEFAULT_KEEPALIVE_MILLIS就是預設10L,這裡就是10秒。這個執行緒池有點像是吧CachedThreadPool和FixedThreadPool 結合了一下。

  1. 不僅設定了核心執行緒數,最大執行緒數也是Integer.MAX_VALUE。
  2. 這個執行緒池是上述4箇中為唯一個有延遲執行和週期執行任務的執行緒池。

建立:

//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService mScheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

一般的執行任務方法和上面的都大同小異,我們主要看看延時執行任務和週期執行任務的方法。

//表示在3秒之後開始執行我們的任務。
mScheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
            //....
            }
        }, 3, TimeUnit.SECONDS);
//延遲3秒後執行任務,從開始執行任務這個時候開始計時,每7秒執行一次不管執行任務需要多長的時間。 
mScheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
             //....
            }
        },3, 7, TimeUnit.SECONDS);
/**延遲3秒後執行任務,從任務完成時這個時候開始計時,7秒後再執行,
*再等完成後計時7秒再執行也就是說這裡的迴圈執行任務的時間點是
*從上一個任務完成的時候。
*/
mScheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
             //....
            }
        },3, 7, TimeUnit.SECONDS);

以上就是常用的四個執行緒池以及他們的實現原理。


 

執行緒池介紹

在web開發中,伺服器需要接受並處理請求,所以會為一個請求來分配一個執行緒來進行處理。如果每次請求都新建立一個執行緒的話實現起來非常簡便,但是存在一個問題:

如果併發的請求數量非常多,但每個執行緒執行的時間很短,這樣就會頻繁的建立和銷燬執行緒,如此一來會大大降低系統的效率。可能出現伺服器在為每個請求建立新執行緒和銷燬執行緒上花費的時間和消耗的系統資源要比處理實際的使用者請求的時間和資源更多。

那麼有沒有一種辦法使執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務呢?

這就是執行緒池的目的了。執行緒池為執行緒生命週期的開銷和資源不足問題提供瞭解決方案。通過對多個任務重用執行緒,執行緒建立的開銷被分攤到了多個任務上。

什麼時候使用執行緒池?

  • 單個任務處理時間比較短
  • 需要處理的任務數量很大

使用執行緒池的好處

引用自 http://ifeve.com/java-threadpool/ 的說明:

  • 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。

Java中的執行緒池是用ThreadPoolExecutor類來實現的. 本文就結合JDK 1.8對該類的原始碼來分析一下這個類內部對於執行緒的建立, 管理以及後臺任務的排程等方面的執行原理。

先看一下執行緒池的類圖:

QQ20170331-004227.png

Executor框架介面

Executor框架是一個根據一組執行策略呼叫,排程,執行和控制的非同步任務的框架,目的是提供一種將”任務提交”與”任務如何執行”分離開來的機制。

J.U.C中有三個Executor介面:

  • Executor:一個執行新任務的簡單介面;
  • ExecutorService:擴充套件了Executor介面。添加了一些用來管理執行器生命週期和任務生命週期的方法;
  • ScheduledExecutorService:擴充套件了ExecutorService。支援Future和定期執行任務。

Executor介面

 

1

2

3

 

public interface Executor {

void execute(Runnable command);

}

Executor介面只有一個execute方法,用來替代通常建立或啟動執行緒的方法。例如,使用Thread來建立並啟動執行緒的程式碼如下:

 

1

2

 

Thread t = new Thread();

t.start();

使用Executor來啟動執行緒執行任務的程式碼如下:

 

1

2

 

Thread t = new Thread();

executor.execute(t);

對於不同的Executor實現,execute()方法可能是建立一個新執行緒並立即啟動,也有可能是使用已有的工作執行緒來執行傳入的任務,也可能是根據設定執行緒池的容量或者阻塞佇列的容量來決定是否要將傳入的執行緒放入阻塞佇列中或者拒絕接收傳入的執行緒。

ExecutorService介面

ExecutorService介面繼承自Executor介面,提供了管理終止的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支援即時關閉,也就是shutDownNow()方法,則任務需要正確處理中斷。

ScheduledExecutorService介面

ScheduledExecutorService擴充套件ExecutorService介面並增加了schedule方法。呼叫schedule方法可以在指定的延時後執行一個Runnable或者Callable任務。ScheduledExecutorService介面還定義了按照指定時間間隔定期執行任務的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

ThreadPoolExecutor分析

ThreadPoolExecutor繼承自AbstractExecutorService,也是實現了ExecutorService介面。

幾個重要的欄位

 

1

2

3

4

5

6

7

8

9

10

 

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

ctl是對執行緒池的執行狀態和執行緒池中有效執行緒的數量進行控制的一個欄位, 它包含兩部分的資訊: 執行緒池的執行狀態 (runState) 和執行緒池內有效執行緒的數量 (workerCount),這裡可以看到,使用了Integer型別來儲存,高3位儲存runState,低29位儲存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。

下面再介紹下執行緒池的執行狀態. 執行緒池一共有五種狀態, 分別是:

  1. RUNNING :能接受新提交的任務,並且也能處理阻塞佇列中的任務;
  2. SHUTDOWN:關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞佇列中已儲存的任務。線上程池處於 RUNNING 狀態時,呼叫 shutdown()方法會使執行緒池進入到該狀態。(finalize() 方法在執行過程中也會呼叫shutdown()方法進入該狀態);
  3. STOP:不能接受新任務,也不處理佇列中的任務,會中斷正在處理任務的執行緒。線上程池處於 RUNNING 或 SHUTDOWN 狀態時,呼叫 shutdownNow() 方法會使執行緒池進入到該狀態;
  4. TIDYING:如果所有的任務都已終止了,workerCount (有效執行緒數) 為0,執行緒池進入該狀態後會呼叫 terminated() 方法進入TERMINATED 狀態。
  5. TERMINATED:在terminated() 方法執行完後進入該狀態,預設terminated()方法中什麼也沒有做。
    進入TERMINATED的條件如下:
    • 執行緒池不是RUNNING狀態;
    • 執行緒池狀態不是TIDYING狀態或TERMINATED狀態;
    • 如果執行緒池狀態是SHUTDOWN並且workerQueue為空;
    • workerCount為0;
    • 設定TIDYING狀態成功。

下圖為執行緒池的狀態轉換過程:

threadpool-status.png

ctl相關方法

這裡還有幾個對ctl進行計算的方法:

 

1

2

3

 

private static int runStateOf(int c) { return c & ~CAPACITY; }

private static int workerCountOf(int c) { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

  • runStateOf:獲取執行狀態;
  • workerCountOf:獲取活動執行緒數;
  • ctlOf:獲取執行狀態和活動執行緒數的值。

ThreadPoolExecutor構造方法

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

 

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

構造方法中的欄位含義如下:

  • corePoolSize:核心執行緒數量,當有新任務在execute()方法提交時,會執行以下判斷:

    1. 如果執行的執行緒少於 corePoolSize,則建立新執行緒來處理任務,即使執行緒池中的其他執行緒是空閒的;
    2. 如果執行緒池中的執行緒數量大於等於 corePoolSize 且小於 maximumPoolSize,則只有當workQueue滿時才建立新的執行緒去處理任務;
    3. 如果設定的corePoolSize 和 maximumPoolSize相同,則建立的執行緒池的大小是固定的,這時如果有新任務提交,若workQueue未滿,則將請求放入workQueue中,等待有空閒的執行緒去從workQueue中取任務並處理;
    4. 如果執行的執行緒數量大於等於maximumPoolSize,這時如果workQueue已經滿了,則通過handler所指定的策略來處理任務;

    所以,任務提交時,判斷的順序為 corePoolSize –> workQueue –> maximumPoolSize。

  • maximumPoolSize:最大執行緒數量;
  • workQueue:等待佇列,當任務提交時,如果執行緒池中的執行緒數量大於等於corePoolSize的時候,把該任務封裝成一個Worker物件放入等待佇列;
  • workQueue:儲存等待執行的任務的阻塞佇列,當提交一個新的任務到執行緒池以後, 執行緒池會根據當前執行緒池中正在執行著的執行緒的數量來決定對該任務的處理方式,主要有以下幾種處理方式:
    1. 直接切換:這種方式常用的佇列是SynchronousQueue,但現在還沒有研究過該佇列,這裡暫時還沒法介紹;
    2. 使用無界佇列:一般使用基於連結串列的阻塞佇列LinkedBlockingQueue。如果使用這種方式,那麼執行緒池中能夠建立的最大執行緒數就是corePoolSize,而maximumPoolSize就不會起作用了(後面也會說到)。當執行緒池中所有的核心執行緒都是RUNNING狀態時,這時一個新的任務提交就會放入等待佇列中。
    3. 使用有界佇列:一般使用ArrayBlockingQueue。使用該方式可以將執行緒池的最大執行緒數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得執行緒池對執行緒的排程變得更困難,因為執行緒池和佇列的容量都是有限的值,所以要想使執行緒池處理任務的吞吐率達到一個相對合理的範圍,又想使執行緒排程相對簡單,並且還要儘可能的降低執行緒池對資源的消耗,就需要合理的設定這兩個數量。
      • 如果要想降低系統資源的消耗(包括CPU的使用率,作業系統資源的消耗,上下文環境切換的開銷等), 可以設定較大的佇列容量和較小的執行緒池容量, 但這樣也會降低執行緒處理任務的吞吐量。
      • 如果提交的任務經常發生阻塞,那麼可以考慮通過呼叫 setMaximumPoolSize() 方法來重新設定執行緒池的容量。
      • 如果佇列的容量設定的較小,通常需要將執行緒池的容量設定大一點,這樣CPU的使用率會相對的高一些。但如果執行緒池的容量設定的過大,則在提交的任務數量太多的情況下,併發量會增加,那麼執行緒之間的排程就是一個要考慮的問題,因為這樣反而有可能降低處理任務的吞吐量。
  • keepAliveTime:執行緒池維護執行緒所允許的空閒時間。當執行緒池中的執行緒數量大於corePoolSize的時候,如果這時沒有新的任務提交,核心執行緒外的執行緒不會立即銷燬,而是會等待,直到等待的時間超過了keepAliveTime;
  • threadFactory:它是ThreadFactory型別的變數,用來建立新執行緒。預設使用Executors.defaultThreadFactory() 來建立執行緒。使用預設的ThreadFactory來建立執行緒時,會使新建立的執行緒具有相同的NORM_PRIORITY優先順序並且是非守護執行緒,同時也設定了執行緒的名稱。
  • handler:它是RejectedExecutionHandler型別的變數,表示執行緒池的飽和策略。如果阻塞佇列滿了並且沒有空閒的執行緒,這時如果繼續提交任務,就需要採取一種策略處理該任務。執行緒池提供了4種策略:
    1. AbortPolicy:直接丟擲異常,這是預設策略;
    2. CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;
    3. DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務;
    4. DiscardPolicy:直接丟棄任務;

execute方法

execute()方法用來提交任務,程式碼如下:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

 

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* clt記錄著runState和workerCount

*/

int c = ctl.get();

/*

* workerCountOf方法取出低29位的值,表示當前活動的執行緒數;

* 如果當前活動執行緒數小於corePoolSize,則新建一個執行緒放入執行緒池中;

* 並把任務新增到該執行緒中。

*/

if (workerCountOf(c) < corePoolSize) {

/*