1. 程式人生 > >Executor 之 執行緒池及定時器

Executor 之 執行緒池及定時器

1. Executor系列介面

Alt text

Executor用於解耦任務(Runnable)提交者和執行者,它只有一個方法void execute(Runnable command),通過呼叫它向執行者提交任務,但無法知道執行的結果/進度,也無法拿到任務返回值。

ExecutorService 繼承Executor,是一個更具體的介面。它額外提供了以下方法:

  1. 關閉執行者

    shutdown(); // 拒絕接受任務,但繼續執行舊任務

    shutdownNow(); // 拒絕接受任務,且返回所有未執行的舊任務

    awaitTermination(long,TimeUnit); // 等待執行者完全關閉

  2. 查詢執行者當前狀態

    isShutdown();

    isTerminated();

  3. submitinvokeall系列方法  接受Runnable/Callable型別的引數,返回Future,讓提交者可以瞭解任務執行的進度/拿到執行結果。

    Future是提交者和執行者之間的通訊手段,它代表任務的執行情況: Alt text  其中,get方法會阻塞直到任務被完成,常用於非同步轉同步的場景。

ScheduledExecutorService是一個定時器,它在ExecutorService的基礎上增加了若干定時/延遲/迴圈排程任務的方法:

// 延遲執行

public ScheduledFuture<?> schedule(Runnable command,

long delay,

TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,

long delay,

TimeUnit unit);

// 固定頻率執行(不管單個任務執行時間,間隔到了就執行)

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,

long initialDelay,

long period,

TimeUnit unit);

// 固定延遲執行(任務1執行完和任務2開始執行之間的間隔是固定的)

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,

long initialDelay,

long delay,

TimeUnit unit);

2. 執行緒池ThreadPoolExecutor

ThreadPoolExecutor是 執行緒池 和 任務佇列(BlockingQueue) 實現的ExecutorService,和所有的一樣,它的主要功能有兩個:

  1. 降低頻繁建立/銷燬執行緒的開銷;
  2. 防止濫用執行緒資源

作為一個對外提供服務的容器,它很多方面的實現都值得參考。

2.1 服務能力的逐步降級 & 佔用資源的自動伸縮

提交任務時呼叫的是BlockingQueue#offer()方法,如果不成功則立即返回false,這意味著任務提交者不會因為佇列滿而被阻塞

submit(...)方法對傳入的Runnable/Callable包裝進一個FutureTask,再呼叫execute(...)處理,最後返回這個FutureTask物件。

FutureTask是一個實現了RunnableFuture的類,它本質上是一個任務,同時又是一個同步器,提供了具有阻塞/喚醒語義的查詢和執行方法。

execute(...)處理策略如下:

  1. 輕負載 (當前執行緒數 < corePoolSize):建立一個執行緒(內部類Worker),該任務作為其初始任務。
  2. 中負載 (當前執行緒數 = corePoolSize):任務入佇列(offer()方法,客戶不會阻塞)。
  3. 重負載 (當前執行緒數 ∈ [corePoolSizemaximumPoolSize],且任務佇列已滿):繼續建立執行緒。
  4. 超負載 (當前執行緒數 = maximumPoolSize,且任務佇列已滿): 拒絕新任務。

隨著負載增加,執行緒池的處理速度 逐漸下降;超負荷時拒絕新請求,保證容器不掛:

快 –> 部分快,部分慢 –> 慢 –> 慢,且拒絕新任務

Worker執行緒的邏輯

Worker不停地從任務佇列中拿任務執行,如果拿到 null,則退出。

WorkerBlockingQueue中取元素有兩種選擇:poll(timeout) 或 take(),如佇列空二者均會阻塞,但前者有超時時間。選擇策略如下:

  1. 當前執行緒數 <= corePoolSize(輕負載):使用take(),當佇列為空時執行緒一直阻塞而不退出,原因是執行緒池在大部分時間都會處在輕/中負載的狀態,避免此時執行緒的頻繁建立和銷燬。

    也可以更改這個預設行為。如果手動設定allowCoreThreadTimeOut,將用poll(timeout),超過指定時間還沒任務執行緒就退出。

  2. 當前執行緒數 > corePoolSize(重負載):使用poll(timeout),負載回落後多餘執行緒退出,避免不必要的資源佔用。

不管哪種情況,呼叫poll的超時時間由引數keepAliveTime指定。

Worker通過以上行為保證了容器佔用的資源隨負載程度而自動彈性伸縮。

2.2 資源的懶載入和預載入

預設執行緒是懶載入的,即只有當新任務提交時才會建立Worker。可以呼叫prestartAllCoreThreads()prestartCoreThread()方法預啟動 corePoolSize 個或1個執行緒,一來避免任務佇列初始就有元素時沒有執行緒執行;二來避免接受任務時線上程建立上消耗時間。

2.3 拒絕服務的方式

當執行緒數和佇列容量都達到上限時,容器將拒絕新來的服務請求,這是RejectedExecutionHandler介面負責的。

ThreadPoolExecutor內部提供了以下 4 個實現:

  1. CallerRunsPolicy:由客戶執行緒自己執行任務;
  2. AbortPolicy(預設):丟擲RejectedExecutionException
  3. DiscardPolicy:忽略,什麼也不做;
  4. DiscardOldestPolicy:移除任務佇列裡呆的最久的任務(poll()),重新提交。

2.4 狀態管理

執行緒池的狀態圖如下: Alt text

初始狀態為 running;

進入 shutDown 和 stop 區別是:

  • shutDown: 不接受新任務,但會繼續執行已有任務
  • stop: 不接受新任務,也不執行已有任務;佇列中的剩餘任務被返回給客戶

當佇列為空且執行緒數為0時,進入 tidying 狀態。該狀態下會呼叫一個預留給子類實現的terminated方法進行額外的資源清理工作。結束後進入 terminated 狀態。

2.4 任務佇列的選擇

ThreadPoolExecutor 的任務佇列是可配置的。

如果要求任務立即得到處理,可以使用SynchronousQueue

SynchronousQueue是一個容量為0的阻塞佇列,當 offer 元素時,如果當前沒有執行緒阻塞在take / poll(timeout)上就立即失敗;否則將元素交給其中的一個執行緒。因此它更像是執行緒之間傳遞物件的工具,而不是一個佇列。

使用SynchronousQueue時,執行緒池將無法快取任務,每個任務都會立即建立一個執行緒來執行它,任務可以得到快速處理。這種策略下執行緒池最多隻能處理maximumPoolSize個任務。

如果可以接受任務執行的延遲,但需要儘可能多的處理任務,可以使用無界的LinkedBlockingQueue或有界的ArrayBlockingQueue。若是前者,執行緒數將不超過corePoolSize,引數maximumPoolSize無效;若是後者,則需要線上程數和佇列長度間權衡:短佇列需要更多的執行緒,但這會帶來更多的資源佔用和 context switch;少執行緒則需要更長的佇列,但此時吞吐量較低。

2.5 定時器ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutorScheduledExecutorService定時器介面的實現,它繼承自ThreadPoolExecutor,因此底層的工作原理和執行緒池是一樣的,但其任務佇列預設是DelayBlockingQueue的變種(不可配置)。Worker依次從中獲取最快到期的任務執行。對於需要重複執行的任務,第一次執行完畢後計算其下次執行時間,再重新入隊。

這個不太瞭解,以後看。

3. 工廠類 Executors

提供了建立執行緒池、定時器的工廠方法。

執行緒池:

  1. newCachedThreadPool()

    corePoolSize = 0, maximumPoolSize = int 最大值,keepAliveTime = 1分鐘,快取佇列為SynchronousQueue

    每個任務都會重用已有執行緒或建立一個新執行緒執行,直到執行緒數達到上限;沒有任務時所有執行緒都會自然退出。

  2. newFixedThreadPool

    corePoolSize = n, maximumPoolSize = n,keepAliveTime = 0,快取佇列為無界的LinkedBlockingQueue

  3. newSingleThreadExecutor

    corePoolSize = 1, maximumPoolSize = 1,keepAliveTime = 0,快取佇列為無界的LinkedBlockingQueue

    效果和newFixedThreadPool(1)一樣。

  4. newScheduledThreadPool 定時器  建立一個定時器ScheduledThreadPoolExecutor,corePoolSize = n, maximumPoolSize = int 最大值,keepAliveTime = 0,快取佇列為DelayedWorkQueue