1. 程式人生 > >併發程式設計(十二)—— Java 執行緒池 實現原理與原始碼深度解析 之submit方法 (二)

併發程式設計(十二)—— Java 執行緒池 實現原理與原始碼深度解析 之submit方法 (二)

在上一篇《併發程式設計(十一)—— Java 執行緒池 實現原理與原始碼深度解析(一)》中提到了執行緒池ThreadPoolExecutor的原理以及它的execute方法。這篇文章是接著上一篇文章寫的,如果你沒有閱讀上一篇文章,建議你去讀讀。本文解析ThreadPoolExecutor#submit。

  對於一個任務的執行有時我們不需要它返回結果,但是有我們需要它的返回執行結果。對於執行緒來講,如果不需要它返回結果則實現Runnable,而如果需要執行結果的話則可以實現Callable。線上程池同樣execute提供一個不需要返回結果的任務執行,而對於需要結果返回的則可呼叫其submit方法。

AbstractExecutorService

我們把上一篇文章的程式碼貼過來

 1 public abstract class AbstractExecutorService implements ExecutorService {
 2 
 3     // RunnableFuture 是用於獲取執行結果的,我們常用它的子類 FutureTask
 4     // 下面兩個 newTaskFor 方法用於將我們的任務包裝成 FutureTask 提交到執行緒池中執行
 5     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
6 return new FutureTask<T>(runnable, value); 7 } 8 9 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 10 return new FutureTask<T>(callable); 11 } 12 13 // 提交任務 14 public Future<?> submit(Runnable task) {
15 if (task == null) throw new NullPointerException(); 16 // 1. 將任務包裝成 FutureTask 17 RunnableFuture<Void> ftask = newTaskFor(task, null); 18 // 2. 交給執行器執行,execute 方法由具體的子類來實現 19 // 前面也說了,FutureTask 間接實現了Runnable 介面。 20 execute(ftask); 21 return ftask; 22 } 23 24 public <T> Future<T> submit(Runnable task, T result) { 25 if (task == null) throw new NullPointerException(); 26 // 1. 將任務包裝成 FutureTask 27 RunnableFuture<T> ftask = newTaskFor(task, result); 28 // 2. 交給執行器執行 29 execute(ftask); 30 return ftask; 31 } 32 33 public <T> Future<T> submit(Callable<T> task) { 34 if (task == null) throw new NullPointerException(); 35 // 1. 將任務包裝成 FutureTask 36 RunnableFuture<T> ftask = newTaskFor(task); 37 // 2. 交給執行器執行 38 execute(ftask); 39 return ftask; 40 } 41 }

 

儘管submit方法能提供執行緒執行的返回值,但只有實現了Callable才會有返回值,而實現Runnable的執行緒則是沒有返回值的,也就是說在上面的3個方法中,submit(Callable<T> task)能獲取到它的返回值,submit(Runnable task, T result)能通過傳入的載體result間接獲得執行緒的返回值或者準確來說交給執行緒處理一下,而最後一個方法submit(Runnable task)則是沒有返回值的,就算獲取它的返回值也是null。

使用示例

submit(Callable<T> task)

 1 /**
 2  * @author: ChenHao
 3  * @Date: Created in 14:54 2019/1/11
 4  */
 5 public class Test {
 6     public static void main(String[] args) throws ExecutionException, InterruptedException {
 7         Callable<String> callable = new Callable<String>() {
 8             public String call() throws Exception {
 9                 System.out.println("This is ThreadPoolExetor#submit(Callable<T> task) method.");
10                 return "result";
11             }
12         };
13 
14         ExecutorService executor = Executors.newSingleThreadExecutor();
15         Future<String> future = executor.submit(callable);
16         executor.shutdown();
17         System.out.println(future.get());
18     }
19 }

執行結果:

submit(Runnable task, T result)

 1 /**
 2  * @author: ChenHao
 3  * @Date: Created in 14:54 2019/1/11
 4  */
 5 public class Test {
 6     public static void main(String[] args) throws ExecutionException, InterruptedException {
 7 
 8         ExecutorService executor = Executors.newSingleThreadExecutor();
 9         Data data = new Data();
10         Future<Data> future = executor.submit(new Task(data), data);
11         executor.shutdown();
12         System.out.println(future.get().getName());
13     }
14 }
15 class Data {
16     String name;
17     public String getName() {
18         return name;
19     }
20     public void setName(String name) {
21         this.name = name;
22     }
23 }
24 
25 class Task implements Runnable {
26     Data data;
27     public Task(Data data) {
28         this.data = data;
29     }
30     @Override
31     public void run() {
32         System.out.println("This is ThreadPoolExetor#submit(Runnable task, T result) method.");
33         data.setName("陳浩");
34     }
35 }

 執行結果:

submit(Runnable task)

 1 /**
 2  * @author: ChenHao
 3  * @Date: Created in 14:54 2019/1/11
 4  */
 5 public class Test {
 6     public static void main(String[] args) throws ExecutionException, InterruptedException {
 7         Runnable runnable = new Runnable() {
 8             @Override
 9             public void run() {
10                 System.out.println("This is ThreadPoolExetor#submit(Runnable runnable) method.");
11             }
12         };
13 
14         ExecutorService executor = Executors.newSingleThreadExecutor();
15         Future future = executor.submit(runnable);
16         executor.shutdown();
17         System.out.println(future.get());
18     }
19 }

執行結果:

 

從上面的原始碼可以看到,這三者方法幾乎是一樣的,關鍵就在於:

1 RunnableFuture<T> ftask = newTaskFor(task);
2 execute(ftask);

是如何將一個任務作為引數傳遞給了newTaskFor,然後呼叫execute方法,最後進而返回ftask的呢?

1 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
2     return new FutureTask<T>(runnable, value);
3 }
4 
5 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
6     return new FutureTask<T>(callable);
7 }

 

原始碼分析

這裡我建議大家去看看我之前的一篇文章《Java 多執行緒(五)—— 執行緒池基礎 之 FutureTask原始碼解析

submit(Callable<T> task)

我們看上面的原始碼中知道實際上是呼叫瞭如下程式碼

1 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
2     return new FutureTask<T>(callable);
3 }

 

 我們看看 FutureTask 的結構

 1 public class FutureTask<V> implements RunnableFuture<V> { 
 2     private volatile int state; 
 3     private static final int NEW = 0; //初始狀態 
 4     private static final int COMPLETING = 1; //結果計算完成或響應中斷到賦值給返回值之間的狀態。 
 5     private static final int NORMAL = 2; //任務正常完成,結果被set 
 6     private static final int EXCEPTIONAL = 3; //任務丟擲異常 
 7     private static final int CANCELLED = 4; //任務已被取消 
 8     private static final int INTERRUPTING = 5; //執行緒中斷狀態被設定ture,但執行緒未響應中斷 
 9     private static final int INTERRUPTED = 6; //執行緒已被中斷 
10 
11     //將要執行的任務 
12     private Callable<V> callable; //用於get()返回的結果,也可能是用於get()方法丟擲的異常 
13     private Object outcome; // non-volatile, protected by state reads/writes //執行callable的執行緒,呼叫FutureTask.run()方法通過CAS設定 
14     private volatile Thread runner; //棧結構的等待佇列,該節點是棧中的最頂層節點。 
15     private volatile WaitNode waiters; 
16 
17     public FutureTask(Callable<V> callable) {
18         if (callable == null)
19             throw new NullPointerException();
20         this.callable = callable;
21         this.state = NEW;       // ensure visibility of callable
22     }
23     ....
24 }

 

1 public interface RunnableFuture<V> extends Runnable, Future<V> {
2     /**
3      * Sets this Future to the result of its computation
4      * unless it has been cancelled.
5      */
6     void run();
7 }

 

 

 我們知道 FutureTask 繼承了 Runnable,這裡將 Callable<T> callable 的例項封裝成 FutureTask 傳給 execute(ftask);我們再來看看上一篇文章中執行緒執行的程式碼

 1 // 此方法由 worker 執行緒啟動後呼叫,這裡用一個 while 迴圈來不斷地從等待佇列中獲取任務並執行
 2 // 前面說了,worker 在初始化的時候,可以指定 firstTask,那麼第一個任務也就可以不需要從佇列中獲取
 3 final void runWorker(Worker w) {
 4     Thread wt = Thread.currentThread();
 5     // 該執行緒的第一個任務(如果有的話)
 6     Runnable task = w.firstTask;
 7     w.firstTask = null;
 8     w.unlock(); // allow interrupts
 9     boolean completedAbruptly = true;
10     try {
11         // 迴圈呼叫 getTask 獲取任務
12         while (task != null || (task = getTask()) != null) {
13             w.lock();          
14             // 如果執行緒池狀態大於等於 STOP,那麼意味著該執行緒也要中斷
15             if ((runStateAtLeast(ctl.get(), STOP) ||
16                  (Thread.interrupted() &&
17                   runStateAtLeast(ctl.get(), STOP))) &&
18                 !wt.isInterrupted())
19                 wt.interrupt();
20             try {
21                 beforeExecute(wt, task);
22                 Throwable thrown = null;
23                 try {
24                     // 到這裡終於可以執行任務了,這裡是最重要的,task是什麼?是Worker 中的firstTask屬性
25                     
26                     task.run();
27                 } catch (RuntimeException x) {
28                     thrown = x; throw x;
29                 } catch (Error x) {
30                     thrown = x; throw x;
31                 } catch (Throwable x) {
32                     thrown = x; throw new Error(x);
33                 } finally {
34                     afterExecute(task, thrown);
35                 }
36             } finally {
37                 // 一個任務執行完了,這個執行緒還可以複用,接著去佇列中拉取任務執行
38                 // 置空 task,準備 getTask 獲取下一個任務
39                 task = null;
40                 // 累加完成的任務數
41                 w.completedTasks++;
42                 // 釋放掉 worker 的獨佔鎖
43                 w.unlock();
44             }
45         }
46         completedAbruptly = false;
47     } finally {
48         // 如果到這裡,需要執行執行緒關閉:
49         // 說明 getTask 返回 null,也就是超過corePoolSize的執行緒過了超時時間還沒有獲取到任務,也就是說,這個 worker 的使命結束了,執行關閉
50         processWorkerExit(w, completedAbruptly);
51     }
52 }

 

 由上面第6行程式碼 task 就是execute(ftask)傳入的任務,第26行 task.run(); 實際上就是 new FutureTask<T>(callable).run(),我們看看FutureTask中的run()方法

 1 public void run() {
 2     //保證callable任務只被執行一次
 3     if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
 4         return;
 5     try {
 6         Callable < V > c = callable;
 7         if (c != null && state == NEW) {
 8             V result;
 9             boolean ran;
10             try { 
11                 //執行任務,上面的例子我們可以看出,call()裡面可能是一個耗時的操作,不過這裡是同步的
12                 result = c.call();
13                 //上面的call()是同步的,只有上面的result有了結果才會繼續執行
14                 ran = true;
15             } catch (Throwable ex) {
16                 result = null;
17                 ran = false;
18                 setException(ex);
19             }
20             if (ran)
21                 //執行完了,設定result
22                 set(result);
23         }
24     }
25     finally {
26         runner = null;
27         int s = state;
28         //判斷該任務是否正在響應中斷,如果中斷沒有完成,則等待中斷操作完成
29         if (s >= INTERRUPTING)
30             handlePossibleCancellationInterrupt(s);
31     }
32 }

 

在 FutureTask的構造方法中 this.callable = callable; ,因此我們可以知道上面run()方法中第6行 c 就是 程式碼示例中的 new Callable<String>(),c.call()就是呼叫 程式碼示例中new Callable 的call方法,並且這裡可以取到返回結果,第22行處設定FutureTask 中 outcome 的值,這樣執行緒就可以取到返回值了。

1 protected void set(V v) {
2     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3         outcome = v;
4         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
5         finishCompletion();
6     }
7 }

 

取值我就不分析了,我之前的文章裡面有詳細分析。

submit(Runnable task, T result)

 1 public <T> Future<T> submit(Runnable task, T result) {
 2     if (task == null) throw new NullPointerException();
 3     RunnableFuture<T> ftask = newTaskFor(task, result);
 4     execute(ftask);
 5     return ftask;
 6 }
 7 
 8 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
 9     return new FutureTask<T>(runnable, value);
10 }

 

我們來看看FutureTask的另外一個構造方法

1 public FutureTask(Runnable runnable, V result) {
2     this.callable = Executors.callable(runnable, result);
3     this.state = NEW;       // ensure visibility of callable
4 }

 

 1 public static <T> Callable<T> callable(Runnable task, T result) {
 2     if (task == null)
 3         throw new NullPointerException();
 4     return new RunnableAdapter<T>(task, result);
 5 }
 6 
 7 static final class RunnableAdapter<T> implements Callable<T> {
 8     final Runnable task;
 9     final T result;
10     RunnableAdapter(Runnable task, T result) {
11         this.task = task;
12         this.result = result;
13     }
14     public T call() {
15         task.run();
16         return result;
17     }
18 }

 

上面將 runnable, result 封裝成了 RunnableAdapter 作為FutureTask的callable屬性,這和上面的submit(Callable<T> task) 是不同的,submit(Callable<T> task)是直接將 Callable<T> task作為FutureTask的callable屬性。我們看看FutureTask中的run()方法中第6行 c 就是FutureTask 構造方法中的new RunnableAdapter<T>(task, result) ,c.call()就是呼叫 RunnableAdapter<T>(task, result) 的call方法,call()中的task.run()就是上面程式碼示例中new Task(data) 中的 run(),run()方法中業務大程式碼改變了data物件的屬性,callable(Runnable task, T result)中也是傳的相同的物件data, 所以,result = c.call(); 就是把更改後的data返回,並且將data設定為設定FutureTask 中 outcome 的值,後面的邏輯就是一樣的了。

這裡可以看成將同一個data傳入執行緒進行處理,同時這個data也傳入FutureTask中,並且在RunnableAdapter通過屬性進行儲存data,等執行緒將data處理完了,由於是同一個物件,RunnableAdapter中的result也就是data指向的是同一個物件,然後把此result返回到FutureTask儲存在屬性outcome中,就可以通過FutureTask.get()取到執行結果了。

如果new FutureTask<T>(runnable, null),則result = c.call(); 返回的值也是null,最後從執行緒池中get的值也是null。