Java併發程式設計筆記——J.U.C之executors框架:executors框架設計理念
一、executors框架簡介
juc-executors
框架是整個J.U.C包中類/介面關係最複雜的框架,真正理解executors框架的前提是理清楚各個模組之間的關係,高屋建瓴,從整體到區域性才能透徹理解其中各個模組的功能和背後的設計思路。
網上有太多文章講executors框架,要麼泛泛而談,要麼一葉障目不見泰山,缺乏整體視角,很多根本沒有理解整個框架的設計思想和模組關係。本文將對整個executors框架做綜述,介紹各個模組的功能和聯絡,後續再深入探討每個模組,包括模組中的各個工具類。
從Executor談起
Executor
是JDK1.5時,隨著J.U.C引入的一個介面,引入該介面的主要目的是解耦任務本身和任務的執行
start
方法來執行任務:
new Thread(new(RunnableTask())).start();
上述RunnableTask是實現了Runnable介面的任務類。而Executor介面解耦了任務和任務的執行,該介面只有一個方法,入參為待執行的任務:
public interface Executor { /** * 執行給定的Runnable任務. * 根據Executor的實現不同, 具體執行方式也不相同. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
我們可以像下面這樣執行任務,而不必關心執行緒的建立:
Executor executor = someExecutor; // 建立具體的Executor物件
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
由於Executor僅僅是一個介面,所以根據其實現的不同,執行任務的具體方式也不盡相同,比如:
①同步執行任務
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
DirectExecutor是一個同步任務執行器,對於傳入的任務,只有執行完成後execute才會返回。
②非同步執行任務
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
ThreadPerTaskExecutor是一個非同步任務執行器,對於每個任務,執行器都會建立一個新的執行緒去執行任務。
注意:Java執行緒與本地作業系統的執行緒是一一對映的。Java執行緒啟動時會建立一個本地作業系統執行緒;當該Java執行緒終止時,對應作業系統執行緒會被回收。由於CPU資源是有限的,所以執行緒數量有上限,所以一般由執行緒池來管理執行緒的建立/回收,而上面這種方式其實是執行緒池的雛形。
③對任務進行排隊執行
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
SerialExecutor 會對傳入的任務進行排隊(FIFO順序),然後從隊首取出一個任務執行。
以上這些示例僅僅是給出了一些可能的Executor實現,J.U.C包中提供了很多Executor的具體實現類,我們以後會具體講到,這裡關鍵是理解Executor的設計思想——對任務和任務的執行解耦。
增強的Executor——ExecutorService
Executor介面提供的功能很簡單,為了對它進行增強,J.U.C又提供了一個名為ExecutorService
介面,ExecutorService也是在JDK1.5時,隨著J.U.C引入的:
public interface ExecutorService extends Executor
可以看到,ExecutorService繼承了Executor,它在Executor的基礎上增強了對任務的控制,同時包括對自身生命週期的管理,主要有四類:
- 關閉執行器,禁止任務的提交;
- 監視執行器的狀態;
- 提供對非同步任務的支援;
- 提供對批處理任務的支援。
public interface ExecutorService extends Executor {
/**
* 關閉執行器, 主要有以下特點:
* 1. 已經提交給該執行器的任務將會繼續執行, 但是不再接受新任務的提交;
* 2. 如果執行器已經關閉了, 則再次呼叫沒有副作用.
*/
void shutdown();
/**
* 立即關閉執行器, 主要有以下特點:
* 1. 嘗試停止所有正在執行的任務, 無法保證能夠停止成功, 但會盡力嘗試(例如, 通過 Thread.interrupt中斷任務, 但是不響應中斷的任務可能無法終止);
* 2. 暫停處理已經提交但未執行的任務;
*
* @return 返回已經提交但未執行的任務列表
*/
List<Runnable> shutdownNow();
/**
* 如果該執行器已經關閉, 則返回true.
*/
boolean isShutdown();
/**
* 判斷執行器是否已經【終止】.
* <p>
* 僅當執行器已關閉且所有任務都已經執行完成, 才返回true.
* 注意: 除非首先呼叫 shutdown 或 shutdownNow, 否則該方法永遠返回false.
*/
boolean isTerminated();
/**
* 阻塞呼叫執行緒, 等待執行器到達【終止】狀態.
*
* @return {@code true} 如果執行器最終到達終止狀態, 則返回true; 否則返回false
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 提交一個具有返回值的任務用於執行.
* 注意: Future的get方法在成功完成時將會返回task的返回值.
*
* @param task 待提交的任務
* @param <T> 任務的返回值型別
* @return 返回該任務的Future物件
* @throws RejectedExecutionException 如果任務無法安排執行
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一個 Runnable 任務用於執行.
* 注意: Future的get方法在成功完成時將會返回給定的結果(入參時指定).
*
* @param task 待提交的任務
* @param result 返回的結果
* @param <T> 返回的結果型別
* @return 返回該任務的Future物件
* @throws RejectedExecutionException 如果任務無法安排執行
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個 Runnable 任務用於執行.
* 注意: Future的get方法在成功完成時將會返回null.
*
* @param task 待提交的任務
* @return 返回該任務的Future物件
* @throws RejectedExecutionException 如果任務無法安排執行
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
/**
* 執行給定集合中的所有任務, 當所有任務都執行完成後, 返回保持任務狀態和結果的 Future 列表.
* <p>
* 注意: 該方法為同步方法. 返回列表中的所有元素的Future.isDone() 為 true.
*
* @param tasks 任務集合
* @param <T> 任務的返回結果型別
* @return 任務的Future物件列表,列表順序與集合中的迭代器所生成的順序相同,
* @throws InterruptedException 如果等待時發生中斷, 會將所有未完成的任務取消.
* @throws NullPointerException 任一任務為 null
* @throws RejectedExecutionException 如果任一任務無法安排執行
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
/**
* 執行給定集合中的所有任務, 當所有任務都執行完成後或超時期滿時(無論哪個首先發生), 返回保持任務狀態和結果的 Future 列表.
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
/**
* 執行給定集合中的任務, 只有其中某個任務率先成功完成(未丟擲異常), 則返回其結果.
* 一旦正常或異常返回後, 則取消尚未完成的任務.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
/**
* 執行給定集合中的任務, 如果在給定的超時期滿前, 某個任務已成功完成(未丟擲異常), 則返回其結果.
* 一旦正常或異常返回後, 則取消尚未完成的任務.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
關於Future,其實就是Java多執行緒設計模式中 Future模式,後面我們會專門講解J.U.C中的Future框架。
Future
物件提供了對任務非同步執行的支援,也就是說呼叫執行緒無需等待任務執行完成,提交待執行的任務後,就會立即返回往下執行。然後,可以在需要時檢查Future是否有結果了,如果任務已執行完畢,通過Future.get()
方法可以獲取到執行結果——Future.get()是阻塞方法。
週期任務的排程——ScheduledExecutorService
在工業環境中,我們可能希望提交給執行器的某些任務能夠定時執行或週期性地執行,這時我們可以自己實現Executor介面來建立符合我們需要的類,Doug Lea已經考慮到了這類需求,所以在ExecutorService的基礎上,又提供了一個介面——ScheduledExecutorService
,該介面也是在JDK1.5時,隨著J.U.C引入的:
public interface ScheduledExecutorService extends ExecutorService
ScheduledExecutorService提供了一系列schedule方法,可以在給定的延遲後執行提交的任務,或者每個指定的週期執行一次提交的任務,我們來看下面這個示例:
public class TestSche {
public static void main(String[] args) {
//建立了一個ScheduledExecutorService 例項
ScheduledExecutorService executorService=new ScheduledThreadPoolExecutor(1);
final ScheduledFuture<?>scheduledFuture=executorService.scheduleAtFixedRate(new BeepTask(),10,10
,TimeUnit.SECONDS);
executorService.schedule(new Runnable() {
@Override
public void run() {
scheduledFuture.cancel(true);
}
},1,TimeUnit.HOURS); //1小時取消任務
}
private static class BeepTask implements Runnable{
@Override
public void run() {
System.out.println("beep!...");
}
}
}
上述示例先建立一個ScheduledExecutorService型別的執行器,然後利用scheduleAtFixedRate方法提交了一個“蜂鳴”任務,每隔10s該任務會執行一次。
注意:scheduleAtFixedRate
方法返回一個ScheduledFuture物件,ScheduledFuture其實就是在Future的基礎上增加了延遲的功能。通過ScheduledFuture,可以取消一個任務的執行,本例中我們利用schedule方法,設定在1小時後,執行任務的取消。
ScheduledExecutorService完整的介面宣告如下:
public interface ScheduledExecutorService extends ExecutorService {
/**
* 提交一個待執行的任務, 並在給定的延遲後執行該任務.
*
* @param command 待執行的任務
* @param delay 延遲時間
* @param unit 延遲時間的單位
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 提交一個待執行的任務(具有返回值), 並在給定的延遲後執行該任務.
*
* @param command 待執行的任務
* @param delay 延遲時間
* @param unit 延遲時間的單位
* @param <V> 返回值型別
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* 提交一個待執行的任務.
* 該任務在 initialDelay 後開始執行, 然後在 initialDelay+period 後執行, 接著在 initialDelay + 2 * period 後執行, 依此類推.
*
* @param command 待執行的任務
* @param initialDelay 首次執行的延遲時間
* @param period 連續執行之間的週期
* @param unit 延遲時間的單位
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* 提交一個待執行的任務.
* 該任務在 initialDelay 後開始執行, 隨後在每一次執行終止和下一次執行開始之間都存在給定的延遲.
* 如果任務的任一執行遇到異常, 就會取消後續執行. 否則, 只能通過執行程式的取消或終止方法來終止該任務.
*
* @param command 待執行的任務
* @param initialDelay 首次執行的延遲時間
* @param delay 一次執行終止和下一次執行開始之間的延遲
* @param unit 延遲時間的單位
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
至此,Executors框架中的三個最核心的介面介紹完畢,這三個介面的關係如下圖:
二、生產executor的工廠
通過第一部分的學習,讀者應該對Executors框架有了一個初步的認識,Executors框架就是用來解耦任務本身與任務的執行,並提供了三個核心介面來滿足使用者的需求:
- Executor:提交普通的可執行任務
- ExecutorService:提供對執行緒池生命週期的管理、非同步任務的支援
- ScheduledExecutorService:提供對任務的週期性執行支援
既然上面三種執行器只是介面,那麼就一定存在具體的實現類,J.U.C提供了許多預設的介面實現,如果要使用者自己去建立這些類的例項,就需要了解這些類的細節,有沒有一種直接的方式,僅僅根據一些需要的特性(引數)就建立這些例項呢?因為對於使用者來說,其實使用的只是這三個介面。
JDK1.5時,J.U.C中還提供了一個Executors
類,專門用於建立上述介面的實現類物件。Executors其實就是一個簡單工廠,它的所有方法都是static的,使用者可以根據需要,選擇需要建立的執行器例項,Executors一共提供了五類可供建立的Executor執行器例項。
固定執行緒數的執行緒池
Executors提供了兩種建立具有固定執行緒數的Executor的方法,固定執行緒池在初始化時確定其中的執行緒總數,執行過程中會始終維持執行緒數量不變。
可以看到下面的兩種建立方法其實都返回了一個ThreadPoolExecutor
例項。ThreadPoolExecutor是一個ExecutorService介面的實現類,我們會在後面用專門章節講解,現在只需要瞭解這是一種Executor,用來排程其中的執行緒的執行即可。
/**
* 建立一個具有固定執行緒數的Executor.
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 建立一個具有固定執行緒數的Executor.
* 在需要時使用提供的 ThreadFactory 建立新執行緒.
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
上面需要注意的是ThreadFactory
這個介面:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
既然返回的是一個執行緒池,那麼就涉及執行緒的建立,一般我們需要通過 new Thread ()
這種方法建立一個新執行緒,但是我們可能希望設定一些執行緒屬性,比如
名稱、守護程式狀態、ThreadGroup 等等,執行緒池中的執行緒非常多,如果每個執行緒都這樣手動配置勢必非常繁瑣,而ThreadFactory 作為一個執行緒工廠可以讓我們從這些繁瑣的執行緒狀態設定的工作中解放出來,還可以由外部指定ThreadFactory例項,以決定執行緒的具體建立方式。
Executors提供了靜態內部類,實現了ThreadFactory介面,最簡單且常用的就是下面這個DefaultThreadFactory :
/**
* 預設的執行緒工廠.
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
可以看到,DefaultThreadFactory 初始化的時候定義了執行緒組、執行緒名稱等資訊,每建立一個執行緒,都給執行緒統一分配這些資訊,避免了一個個手工通過new的方式建立執行緒,又可進行工廠的複用。
單個執行緒的執行緒池
除了固定執行緒數的執行緒池,Executors還提供了兩種建立只有單個執行緒Executor的方法:
/**
* 建立一個使用單個 worker 執行緒的 Executor.
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 建立一個使用單個 worker 執行緒的 Executor.
* 在需要時使用提供的 ThreadFactory 建立新執行緒.
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory));
}
可以看到,只有單個執行緒的執行緒池其實就是指定執行緒數為1的固定執行緒池,主要區別就是,返回的Executor例項用了一個FinalizableDelegatedExecutorService
物件進行包裝。
我們來看下FinalizableDelegatedExecutorService,該類 只定義了一個finalize方法:
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
核心是其繼承的DelegatedExecutorService ,這是一個包裝類,實現了ExecutorService的所有方法,但是內部實現其實都委託給了傳入的ExecutorService 例項:
/**
* ExecutorService實現類的包裝類.
*/
/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
為什麼要多此一舉,加上這樣一個委託層?因為返回的ThreadPoolExecutor包含一些設定執行緒池大小的方法——比如setCorePoolSize,對於只有單個執行緒的執行緒池來說,我們是不希望使用者通過強轉的方式使用這些方法的,所以需要一個包裝類,只暴露ExecutorService本身的方法。
可快取的執行緒池
有些情況下,我們雖然建立了具有一定執行緒數的執行緒池,但出於資源利用率的考慮,可能希望在特定的時候對執行緒進行回收(比如執行緒超過指定時間沒有被使用),Executors就提供了這種型別的執行緒池:
/**
* 建立一個可快取執行緒的Execotor.
* 如果執行緒池中沒有執行緒可用, 則建立一個新執行緒並新增到池中;
* 如果有執行緒長時間未被使用(預設60s, 可通過threadFactory配置), 則從快取中移除.
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* 建立一個可快取執行緒的Execotor.
* 如果執行緒池中沒有執行緒可用, 則建立一個新執行緒並新增到池中;
* 如果有執行緒長時間未被使用(預設60s, 可通過threadFactory配置), 則從快取中移除.
* 在需要時使用提供的 ThreadFactory 建立新執行緒.
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
}
可以看到,返回的還是ThreadPoolExecutor物件,只是指定了超時時間,另外執行緒池中執行緒的數量在[0, Integer.MAX_VALUE]
之間。
可延時/週期排程的執行緒池
如果有任務需要延遲/週期呼叫,就需要返回ScheduledExecutorService介面的例項,ScheduledThreadPoolExecutor
就是實現了ScheduledExecutorService介面的一種Executor,和ThreadPoolExecutor一樣,這個我們後面會專門講解。
/**
* 建立一個具有固定執行緒數的 可排程Executor.
* 它可安排任務在指定延遲後或週期性地執行.
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* 建立一個具有固定執行緒數的 可排程Executor.
* 它可安排任務在指定延遲後或週期性地執行.
* 在需要時使用提供的 ThreadFactory 建立新執行緒.
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
Fork/Join執行緒池
Fork/Join執行緒池是比較特殊的一類執行緒池,在JDK1.7時才引入,其核心實現就是ForkJoinPool
類。關於Fork/Join框架,我們後面會專題講解,現在只需要知道,Executors框架提供了一種建立該類執行緒池的便捷方法。
/**
* 建立具有指定並行級別的ForkJoin執行緒池.
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}
/**
* 建立並行級別等於CPU核心數的ForkJoin執行緒池.
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
三、總結
至此,Executors框架的整體結構基本就講解完了,此時我們的腦海中應有大致如下的一幅類繼承圖:
下面來回顧一下,上面的各個介面/類的關係和作用:
- Executor
執行器介面,也是最頂層的抽象核心介面, 分離了任務和任務的執行。- ExecutorService
在Executor的基礎上提供了執行器生命週期管理,任務非同步執行等功能。- ScheduledExecutorService
在ExecutorService基礎上提供了任務的延遲執行/週期執行的功能。- Executors
生產具體的執行器的靜態工廠- ThreadFactory
執行緒工廠,用於建立單個執行緒,減少手工建立執行緒的繁瑣工作,同時能夠複用工廠的特性。- AbstractExecutorService
ExecutorService的抽象實現,為各類執行器類的實現提供基礎。- ThreadPoolExecutor
執行緒池Executor,也是最常用的Executor,可以以執行緒池的方式管理執行緒。- ScheduledThreadPoolExecutor
在ThreadPoolExecutor基礎上,增加了對週期任務排程的支援。- ForkJoinPool
Fork/Join執行緒池,在JDK1.7時引入,時實現Fork/Join框架的核心類。
關於ThreadPoolExecutor和ScheduledThreadPoolExecutor,我們會在下一章詳細講解,幫助讀者理解執行緒池的實現原理。至於ForkJoinPool,涉及Fork/Join這個並行框架的講解,我們後面會專題介紹。
本文參考