1. 程式人生 > >【搞定Java併發程式設計】第29篇:Executor 框架詳解

【搞定Java併發程式設計】第29篇:Executor 框架詳解

上一篇:Java中的執行緒池詳解

本文目錄

1、Executor 框架簡介

1.1、Executor 框架的兩級排程模型

1.2、Executor 框架的結構與成員

2、ThreadPoolExecutor 詳解

2.1、FixedThreadPool

2.2、SingleThreadExecutor  

2. 3、CachedThreadPool

2.4、具體應用案例

3、ScheduledThreadPoolExecutor 詳解

3.1、ScheduledThreadPoolExecutor 的執行機制

3.2、ScheduledThreadPoolExecutor 的實現

4、FutureTask 詳解

4.1、FutureTask 簡介

4.2、FutureTask 的實現

本文的內容來自於學習《Java併發程式設計的藝術》一書的學習筆記。


Java中的執行緒即是工作單元也是執行機制,從JDK 5後,工作單元與執行機制被分離。工作單元包括Runnable和Callable,執行機制由JDK 5中增加的java.util.concurrent包中Executor框架提供。

1、Executor 框架簡介

1.1、Executor 框架的兩級排程模型

在 HotSpot VM 的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本地作業系統執行緒。Java執行緒啟動時會建立一個本地作業系統執行緒;當該Java執行緒終止時,這個作業系統執行緒也被回收。作業系統會排程所有執行緒並將它們分配給可用的CPU。

在上層,Java多執行緒程式通常把應用分解為若干個任務,然後使用使用者級的排程器(Executor框架)將這些任務對映為固定的數量的執行緒;在底層,作業系統核心將這些執行緒對映到硬體處理器上。這兩級的排程模型的示意圖如下所示:

任務的兩級排程模型

從圖中可以看出,應用程式通過 Executor 框架控制上層的排程;而下層的排程由作業系統核心控制,下層的排程不受應用程式的控制。

1.2、Executor 框架的結構與成員

1.2.1、Executor 框架的結構

Executor 框架主要由3大部分組成如下:

1、任務

:包括被執行任務需要實現的介面:Runnable 介面或 Callable 介面;

2、任務的執行:包括任務執行機制的核心介面Executor,以及繼承自 Executor 的 ExecutorService 介面。Executor 框架有兩個關鍵類實現了 ExecutorService 介面,即:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor;

3、非同步計算的結果:包括介面 Future 和實現 Future 介面的 FutureTask 類。

Executor 框架包含的主要類和介面如下圖所示:

1、Executor:是一個介面,它是 Executor 框架的基礎,它將任務的提交與任務的執行分離開來;

2、ThreadPoolExecutor:是執行緒池的核心實現類,用來執行被提交的任務;

3、ScheduledThreadPoolExecutor:是一個實現類,可以在給定的延遲後執行命令,或者定期執行命令。ScheduledThreadPoolExecutor 比 Timer 更加靈活,功能更強大;

4、Future 介面和實現 Future 介面的 FutureTask 類,代表非同步計算的結果;

5、Runnable 介面和 Callable 介面的實現類,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。

Executor 框架的使用示意圖如下圖所示:

Executor 框架的使用示意圖

主執行緒首先要建立實現Runnable或者Callable介面的任務物件。工具類Executors可以把一個Runnable物件封裝為一個Callable物件(Executors.callable(Runnable  task) 或 Executors.callable(Runnable  task, Object  result))。

然後可以把Runnbale物件直接交給 ExecutorService 執行(ExecutorService.execute(Runnable  command));或者也可以把Runnable或者Callable物件提交給 ExecutorService 執行(ExecutorService.submit(Runnable  task) 或  ExcutorService.submit(Callable<T>  task))。

如果執行ExecutorService.submit( ... ),ExecutorService 將返回一個實現 Future 介面的物件。由於 FutureTask 實現了 Runnable,程式設計師也可以建立 FutureTask,然後直接交給 ExecutorService 執行。

最後,主執行緒可以執行 FutureTask.get() 方法來等待任務執行完成。主執行緒也可以執行 FutureTask.cancel(boolean  mayInterruptIfRunning) 來取消此任務的執行。

1.2.2、Executor 框架的成員

Executor 框架的主要成員包括:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future介面、Runnable介面、Callable介面和Executors。

  • 1、ThreadPoolExecutor

ThreadPoolExecutor 通常使用工廠類 Executors 來建立。Executors 可以建立3種類型的 ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。

  • 2、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 通常使用工廠類 Executors 來建立。Executors可以建立2種類型的 ScheduledThreadPoolExecutor,如下:

  1. ScheduledThreadPoolExecutor:包含若干個執行緒的ScheduledThreadPoolExecutor;
  2. SingleThreadScheduledExecutor:包含一個執行緒的ScheduledThreadPoolExecutor。
  • 3、Future介面

Future 介面和實現 Future 介面的 FutureTask 類用來表示非同步計算的結果。當我們把 Runnable 介面或 Callable 介面的實現類提交(submit)給 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 時,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 會向我們返回一個 FutureTask 物件。

  • 4、Runnable 介面和 Callable 介面

Runnable 介面和 Callable 介面的實現類,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。它們之間的區別是 Runnable 不會返回結果,而 Callable 可以返回結果。


2、ThreadPoolExecutor 詳解

其實關於 ThreadPoolExecutor 在上一篇文章:Java中的執行緒池詳解已經進行了原始碼講解,這裡僅介紹下它的3種類型:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。

2.1、FixedThreadPool

建立固定長度的執行緒池,每次提交任務建立一個執行緒,直到達到執行緒池的最大數量,執行緒池的大小不再變化。

這個執行緒池可以建立固定執行緒數的執行緒池。特點就是可以重用固定數量執行緒的執行緒池。它的構造原始碼如下:

ublic static ExecutorService newFixedThreadPool(int nThreads) { 
        return new ThreadPoolExecutor(nThreads, 
                                      nThreads, 
                                      0L,
                                      TimeUnit.MILLISECONDS, 
                                      new LinkedBlockingQueue<Runnable>()); 
} 
  • FixedThreadPool的corePoolSize和maxiumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads;
  • 0L則表示當執行緒池中的執行緒數量操作核心執行緒的數量時,多餘的執行緒將被立即停止;
  • 最後一個引數表示FixedThreadPool使用了無界佇列LinkedBlockingQueue作為執行緒池的做工佇列,由於是無界的,當執行緒池的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池的執行緒數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的引數,並且執行中的執行緒池並不會拒絕任務。

FixedThreadPool執行圖如下:

執行過程如下:

1.如果當前工作中的執行緒數量少於corePool的數量,就建立新的執行緒來執行任務。

2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.執行緒執行完1中的任務後會從佇列中去任務。

注意:LinkedBlockingQueue是無界佇列,所以可以一直新增新任務到執行緒池。

2.2、SingleThreadExecutor  

SingleThreadExecutor是使用單個worker執行緒的Executor。特點是使用單個工作執行緒執行任務。它的構造原始碼如下:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
                    (new ThreadPoolExecutor(1, 
                                            1,
                                            0L, 
                                            TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor 的 corePoolSize 和 maxiumPoolSize 都被設定1。其他引數均與 FixedThreadPool 相同,其執行圖如下:

執行過程如下:

1.如果當前工作中的執行緒數量少於corePool的數量,就建立一個新的執行緒來執行任務。

2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.執行緒執行完1中的任務後會從佇列中去任務。

注意:由於線上程池中只有一個工作執行緒,所以任務可以按照新增順序執行。

2. 3、CachedThreadPool

 CachedThreadPool是一個”無限“容量的執行緒池,它會根據需要建立新執行緒。特點是可以根據需要來建立新的執行緒執行任務,沒有特定的corePool。下面是它的構造方法:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 
                                      Integer.MAX_VALUE,
                                      60L, 
                                      TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

CachedThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximum是無界的。這裡keepAliveTime設定為60秒,意味著空閒的執行緒最多可以等待任務60秒,否則將被回收。

CachedThreadPool使用沒有容量的SynchronousQueue作為主執行緒池的工作佇列,它是一個沒有容量的阻塞佇列。每個插入操作必須等待另一個執行緒的對應移除操作。這意味著,如果主執行緒提交任務的速度高於執行緒池中處理任務的速度時,CachedThreadPool會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU資源。其執行圖如下:

執行過程如下:

1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的執行緒池中有空閒的執行緒正在執行SynchronousQueue.poll(),那麼主執行緒執行的offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行。,execute()方法執行成功,否則執行步驟2;

2.當執行緒池為空(初始maximumPool為空)或沒有空閒執行緒時,配對失敗,將沒有執行緒執行SynchronousQueue.poll操作。這種情況下,執行緒池會建立一個新的執行緒執行任務;

3.在建立完新的執行緒以後,將會執行poll操作。當步驟2的執行緒執行完成後,將等待60秒,如果此時主執行緒提交了一個新任務,那麼這個空閒執行緒將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會佔用系統資源。

SynchronousQueue是一個不儲存元素阻塞佇列,每次要進行offer操作時必須等待poll操作,否則不能繼續新增元素。

2.4、具體應用案例

1、newCachedThreadPool

建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:

public class Demo1 {

	public static void main(String[] args) {
		
		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
		
		for(int i = 0; i < 10; i++){
			final int index = i;
			try {
				Thread.sleep(index * 1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
			cachedThreadPool.execute(new Runnable(){
				@Override
				public void run() {
					System.out.println(index);
				}
			});
		}
	}
}

執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

2、newFixedThreadPool

建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:

public class Demo2 {

	public static void main(String[] args) {
		
		ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
		
		for (int i = 0; i < 10; i++) {
			final int index = i;
			fixedThreadPool.execute(new Runnable(){

				@Override
				public void run() {
					try {
						System.out.println(index);
						Thread.sleep(2000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
		}
	}
}

因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()。可參考PreloadDataCache

3、newScheduledThreadPool

建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:

public class Demo3 {

	public static void main(String[] args) {
		
		ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
		
		scheduledThreadPool.schedule(new Runnable(){

			@Override
			public void run() {
				System.out.println("延遲3秒");
			}
		}, 3, TimeUnit.SECONDS);
	}
}

表示延遲3秒執行。定期執行示例程式碼如下:

public class Demo4 {

	public static void main(String[] args) {

		ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
		
		scheduledThreadPool.scheduleAtFixedRate(new Runnable(){

			@Override
			public void run() {
				System.out.println("延遲1秒,每3秒執行1次");
			}
			
		}, 1, 3, TimeUnit.SECONDS);
	}
}

表示延遲1秒後每3秒執行一次。ScheduledExecutorService比Timer更安全,功能更強大,後面會有一篇單獨進行對比。

4、newSingleThreadExecutor

建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:

public class Demo5 {

	public static void main(String[] args) {
		
		ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
		
		for (int i = 0; i < 10; i++) {
			final int index = i;
			singleThreadExecutor.execute(new Runnable(){

				@Override
				public void run() {
					try {
						System.out.println(index);
						Thread.sleep(2000);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
		}
	}
}

結果依次輸出,相當於順序執行各個任務。現行大多數GUI程式都是單執行緒的。Android中單執行緒可用於資料庫操作,檔案操作,應用批量安裝,應用批量刪除等不適合併發但可能IO阻塞性及影響UI執行緒響應的操作。


3、ScheduledThreadPoolExecutor 詳解

我們知道Timer與TimerTask雖然可以實現執行緒的週期和延遲排程,但是Timer與TimerTask存在一些缺陷,所以對於這種定期、週期執行任務的排程策略,我們一般都是推薦ScheduledThreadPoolExecutor來實現。下面就深入分析ScheduledThreadPoolExecutor是如何來實現執行緒的週期、延遲排程的。

ScheduledThreadPoolExecutor,繼承ThreadPoolExecutor且實現了ScheduledExecutorService介面,它就相當於提供了“延遲”和“週期執行”功能的ThreadPoolExecutor。在JDK API中是這樣定義它的:ScheduledThreadPoolExecutor,它可另行安排在給定的延遲後執行命令,或者定期執行命令。需要多個輔助執行緒時,或者要求 ScheduledThreadPoolExecutor具有額外的靈活性或功能時,此類要優於 Timer。 一旦啟用已延遲的任務就執行它,但是有關何時啟用,啟用後何時執行則沒有任何實時保證。按照提交的先進先出 (FIFO) 順序來啟用那些被安排在同一執行時間的任務。

3.1、ScheduledThreadPoolExecutor 的執行機制

ScheduledThreadPoolExecutor 的執行示意圖如下圖所示:

ScheduledThreadPoolExecutor的任務傳遞示意圖

DelayQueue 是一個無界佇列,所以 ThreadPoolExecutor 的 maximumPoolSize 在 ScheduledThreadPoolExecutor 中沒有什麼意義。

ScheduledThreadPoolExecutor 的執行主要分為兩大部分:

1、當呼叫 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduledWithFixedDelay() 方法時,會向 ScheduledThreadPoolExecutor 的 DelayQueue 新增一個實現了 RunnableScheduledFuture 介面的 ScheduledFutureTask。

2、執行緒池中的執行緒從 DelayQueue 中獲取 ScheduledFutureTask,然後執行任務。

ScheduledThreadPoolExecutor 為了實現週期性的執行任務,對 ThreadPoolExecutor 做了如下的修改:

1、使用 DelayQueue 作為任務佇列;

2、獲取任務的方式不同(後文會講解到);

3、執行週期任務後,增加了額外的處理(後文會講解到)。

3.2、ScheduledThreadPoolExecutor 的實現

先來看下ScheduledThreadPoolExecutor類中的主要結構:

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
                                         implements ScheduledExecutorService {
		
	private static final AtomicLong sequencer = new AtomicLong(0);
	
	private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
        ...
	}

	private void delayedExecute(RunnableScheduledFuture<?> task) { ... }
	
	void reExecutePeriodic(RunnableScheduledFuture<?> task) { ... }
	
	protected <V> RunnableScheduledFuture<V> decorateTask(...){...}
		
        public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {...}

	public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {...}
    
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}	
	
	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {...}
	
	public void execute(Runnable command) {...}	
	
	public Future<?> submit(Runnable task) {...}
	 
	public <T> Future<T> submit(Runnable task, T result) {...}

	public <T> Future<T> submit(Callable<T> task) {...}
		
	public void shutdown() {...}

	public List<Runnable> shutdownNow() {...}
	
	public BlockingQueue<Runnable> getQueue() {...}
	
	static class DelayedWorkQueue extends AbstractQueue<Runnable>
                                      implements BlockingQueue<Runnable> {...}
			
	......
}

ScheduledThreadPoolExecutor 會把排程的任務(ScheduledFutureTask)放到一個DelayQueue中。下面來看下ScheduledFutureTask主要包含的3個成員變數:

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

    private final long sequenceNumber;

    private long time;

    private final long period;

    ...
}

1、long型成員變數time:表示這個任務將要執行的具體時間;

2、long型成員變數sequenceNumber,表示這個任務被新增到ScheduledThreadPoolExecutor中的序號;

3、long型成員變數period,表示任務執行的間隔週期。

DelayQueue 封裝了一個 PriorityQueue,這個 PriorityQueue 會對佇列中的 ScheduledEutureTask 進行排序。排序時,time 小的排在前面(時間早的任務將被先執行)。如果兩個 ScheduledFutureTask 的 time 相同,就比較 sequenceNumber,sequenceNumber 小的排在前面,也就是說,如果兩個任務的執行時間相同,那麼先執行提交早的任務。

下圖所示的是:ScheduledThreadPoolExecutor 中的執行緒1執行某個週期任務的4個步驟:

ScheduledThreadPoolExecutor的任務執行步驟

1、執行緒1從 DelayQueue 中獲取已到期的 ScheduledFutureTask(DealyQueue.take())。到其任務是指 ScheduledFutureTask 的 time 大於等於當前時間;

2、執行緒1執行這個 ScheduledFutureTask;

3、執行緒1修改 ScheduledFutureTask 的 time 變數為下次將要被執行的時間;

4、執行緒1把這個修改 time 之後的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。

下面就看下DelayQueue.take()方法的原始碼實現:【在原始碼中:DelayQueue 就是 DelayedWorkQueue】

static class DelayedWorkQueue extends AbstractQueue<Runnable>
                              implements BlockingQueue<Runnable> {

    public RunnableScheduledFuture take() throws InterruptedException {
            // 獲取lock
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture first = queue[0];  // 獲取任務
                    if (first == null)
                        available.await();  // 如果佇列為空,則等待
                    else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)  
                            return finishPoll(first);
                        else if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
}

DelayQueue.take()的執行示意圖如下所示:

ScheduledThreadFutureTask 獲取任務的過程

如上圖所示的過程,大致可以分為3個步驟:

1、獲取Lock;

2、獲取週期任務;

  • 2.1、如果 PriorityQueue 為空,當前執行緒到 Condition 中等待,否則執行下面的2.2;
  • 2.2、如果 PriorityQueue 的頭元素的 time 時間比當前時間大,到 Condition 中等待到 time 時間,否則執行2.3;
  • 2.3、獲取 PriorityQueue 的頭元素,如果 PriorityQueue 不為空,則喚醒在 Condition 中等待的所有執行緒。

3、釋放Lock。

ScheduledThreadFutureTask 在一個迴圈中執行步驟2,直到執行緒從 PriorityQueue 獲取到一個元素之後,才會退出無限迴圈。

下面看下 ScheduledThreadFutureTask 中的執行緒把 ScheduledFutureTask 放入 DelayQueue 中的過程。下面是 DelayQueue.add() 的原始碼實現:

static class DelayedWorkQueue extends AbstractQueue<Runnable>
                              implements BlockingQueue<Runnable> {

    public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
    }
    ......
}

下圖是 DelayQueue.add() 的執行示意圖:

ScheduledThreadPoolExecutor新增任務的過程

如上圖所示,新增任務分為3大步驟:

1、獲取 Lock;

2、新增任務;

  • 2.1、向 PriorityQueue 新增任務;
  • 2.2、如果在上面2.1 中新增的任務是 PriorityQueue 的頭元素,則喚醒在 Conditon 中等待的所有執行緒;

3、釋放 Lock。


4、FutureTask 詳解

Future 介面和實現 Future 介面的 FutureTask 類,代表非同步計算的結果。

4.1、FutureTask 簡介

FutureTask 除了實現了 Future 介面外,還實現了 Runnable介面。那麼我們就先看下這兩個介面的內部結構。

  • Future 介面的內部結構
public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • Runnable 介面的內部結構
public interface Runnable {
    public abstract void run();
}

由於FutureTask 除了實現了 Future 介面外,還實現了 Runnable介面。因此,FutureTask 可以交給 Executor 執行,也可以呼叫執行緒直接執行(FutureTask.run())。

根據 FutureTask.run()方法被執行的時機,FutureTask可以處於下面3種狀態:

1、未啟動:FutureTask.run()方法還沒有被執行之前,FutureTask 處於未啟動狀態。當建立一個 FutureTask,且沒有執行 FutureTask.run() 方法之前,這個 FutureTask 處於未啟動狀態;

2、已啟動:FutureTask.run()方法被執行的過程中,FutureTask 處於已啟動狀態;

3、已完成:FutureTask.run()方法執行完成後正常結束,或被取消(FutureTask.cancel(...)),或執行FutureTask.run()方法時丟擲異常而異常結束,FutureTask 處於已完成狀態。

FutureTask 的狀態遷移的示意圖如下所示:

FutureTask 的 get 和 cancel 的執行示意圖如下所示:

4.2、FutureTask 的實現

先看下 FutureTask 的內部結構:

public class FutureTask<V> implements RunnableFuture<V> {
    
    private final Sync sync;
	
	// 建構函式1 Callable
	public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }
	
	// 建構函式2 Runnable
	public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }
	// 呼叫的是sync中的innerCancel方法
	public boolean cancel(boolean mayInterruptIfRunning) {
		return sync.innerCancel(mayInterruptIfRunning);
	}
	// 呼叫的是sync中的innerGet方法
	public V get() throws InterruptedException, ExecutionException {
		return sync.innerGet();
	}
	// 呼叫的是sync中的innerGet方法
	public V get(long timeout, TimeUnit unit){
		throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
	}
    // 呼叫的是sync中的innerRun方法
    public void run() {
        sync.innerRun();
    }

	private final class Sync extends AbstractQueuedSynchronizer {...}
	
	// .......
}

從 FutureTask 的原始碼中可以看出來,它的實現是基於 AbstractQueuedSynchronizer 。AQS 是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒執行緒,以及維護被阻塞執行緒的佇列。基於 AQS 實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch 和 FutureTask。

每一個基於 AQS 實現的同步器都會包含兩種型別的操作,如下:

1、至少一個 acquire 操作:這個操作阻塞呼叫執行緒,除非 / 直到 AQS 的狀態允許這個執行緒繼續執行。 FutureTask 的 acquire 操作為 get() / get(long  timeout, TimeUnit  unit)方法呼叫;

2、至少一個 release 操作:這個操作改變 AQS 的狀態,改變後的狀態可允許一個或多個阻塞執行緒被解除阻塞。FutureTask 的 release 操作包括 run() 方法和 cancel(...) 方法。

基於“複合優先繼承”的原則,FutureTask 聲明瞭一個內部私有的繼承於 AQS 的子類 Sync,對 FutureTask 所有公有方法的呼叫都會委託給這個內部子類。

AQS 被作為“模板方法模式”的基礎類提供給 FutureTask 的內部子類 Sync,這個內部子類只需要實現狀態檢測和狀態更新的方法即可,這些方法將控制 FutureTask 的獲取和釋放操作。具體來說,Sync實現了 AQS 的 tryAcquireShared(int)方法和 tryReleaseShared(int)方法,Sync 通過這兩個方法來檢查和更新同步狀態。

FutureTask 的設計示意圖如下圖所示:

FutureTask的設計示意圖

如圖所示,Sync 是 FutureTask 的內部私有類,它繼承自 AQS。建立 FutureTask 時會建立內部私有的成員物件 Sync,FutureTask 所有的公有方法都直接委託給了內部私有的 Sync。

下面對 FutureTask 中主要的幾個方法進行呼叫過程分析:

4.2.1、FutureTask.get() 方法 

  • 第1步:呼叫 FutureTask 中的 get() 方法
public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
}

從原始碼中很清楚的看到 get() 方法內部是由 sync 的 innerGet()方法實現的。 

  •  第2步:呼叫 Sync 中的 innerGet()方法
V innerGet() throws InterruptedException, ExecutionException {
    acquireSharedInterruptibly(0);
    if (getState() == CANCELLED)
        throw new CancellationException();
    if (exception != null)
        throw new ExecutionException(exception);
    return result;
}
  • 第3步:呼叫 AQS.acquireSharedInterruptibly(int  args)方法。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
  • 第4步:呼叫Sync.tryAcquireShared方法
  • 第5步:呼叫 AQS.doAcquireSharedIntrruptibly方法

這個方法首先會在子類 Sync 中實現的 tryAcquireShared()方法來判斷 acquire 操作是否可以成功,acquire 操作可以成功的條件為:state 為執行完成狀態RAN 或取消狀態 CANCELLED,且 runner 不為null。

【至於tryAcquireShared和doAcquireSharedIntrruptibly方法,這裡不再做原始碼分析了,前面文章已經分析過多次了】

如果成功則立即返回,如果失敗則到執行緒等待佇列中去等待其他執行緒執行 release 操作。

當其他執行緒執行 release 操作(比如:FutureTask.run() 或 FutureTask.cancel(...))喚醒當前執行緒後,當前執行緒再次執行 tryAcquiredShared() 將返回正值 1,當前執行緒將離開執行緒等待佇列,並喚醒它的後繼節點執行緒。

最後返回計算的結果或者丟擲異常。

4.2.2、FutureTask.run() 方法

  • 第1步:呼叫了 FutureTask.run() 方法
public void run() {
    sync.innerRun();
}

可以看到 run() 方法內部仍然是呼叫了 Sync.innerRun() 方法。

  • 第2步:呼叫 Sync.innerRun() 方法
void innerRun() {
    if (!compareAndSetState(READY, RUNNING))
        return;

    runner = Thread.currentThread();
    if (getState() == RUNNING) { // recheck after setting thread
        V result;
        try {
            result = callable.call();
        } catch (Throwable ex) {
            setException(ex);
            return;
        }
        set(result);
    } else {
        releaseShared(0); // cancel
    }
}

Sync.innerRun() 方法中以原子的方式更新同步狀態(呼叫AQS.compareAndSetState(READY, RUNNING),將 state 值設定為 RUNNING 狀態)。如果這個原子操作成功,就設定代表計算結果的變數 result 的值為 Callable.call() 的返回值,然後呼叫AQS.releaseShared(int  args)方法。

  • 第3步:呼叫AQS.releaseShared(int  args)方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

AQS.releaseShared(int  args)首先會回撥子類 Sync 中實現的 tryReleaseShared(int  args)方法來執行 release操作。

  • 第4步:呼叫 Sync.tryReleaseShared(int  args) 方法
protected boolean tryReleaseShared(int ignore) {
    runner = null;
    return true;
}

設定允許任務執行緒 runner 為 null,然後返回 true。

  • 第5步:呼叫AQS.doReleaseShared() 方法
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
            }
        if (h == head)                   // loop if head changed
            break;
    }
}

喚醒執行緒等待佇列中的第一個執行緒。


全文完!

上一篇:Java中的執行緒池詳解