1. 程式人生 > >《Java 7併發程式設計實戰手冊》第四章執行緒執行器

《Java 7併發程式設計實戰手冊》第四章執行緒執行器

感謝人民郵電大學授權併發網釋出此書樣章,新書購買傳送門=》噹噹網Snip20140120_1

本章將介紹下列內容:

  • 建立執行緒執行器
  • 建立固定大小的執行緒執行器
  • 在執行器中執行任務並返回結果
  • 執行多個任務並處理第一個結果
  • 執行多個任務並處理所有結果
  • 在執行器中延時執行任務
  • 在執行器中週期性執行任務
  • 在執行器中取消任務
  • 在執行器中控制任務的完成
  • 在執行器中分離任務的啟動與結果的處理
  • 處理在執行器中被拒絕的任務

4.1 簡介

通常,使用Java來開發一個簡單的併發應用程式時,會建立一些 Runnable 物件,然後建立對應的 Thread 物件來執行它們。但是,如果需要開發一個程式來執行大量的併發任務,這個方法將突顯以下劣勢:

  • 必須實現所有與 Thread 物件管理相關的程式碼,比如執行緒的建立、結束以及結果獲取;
  • 需要為每一個任務建立一個 Thread 物件。如果需要執行大量的任務,這將大大地影響應用程式的處理能力;
  • 計算機的資源需要高效地進行控制和管理,如果建立過多的執行緒,將會導致系統負荷過重。

自從Java 5開始,Java併發API提供了一套意在解決這些問題的機制。這套機制稱之為執行器框架(Executor Framework,圍繞著 Executor 介面和它的子介面 ExecutorService,以及實現這兩個介面的 ThreadPoolExecutor 類展開。

這套機制分離了任務的建立和執行。通過使用執行器,僅需要實現 Runnable

介面的物件,然後將這些物件傳送給執行器即可。執行器通過建立所需的執行緒,來負責這些 Runnable 物件的建立、例項化以及執行。但是執行器功能不限於此,它使用了執行緒池來提高應用程式的效能。當傳送一個任務給執行器時,執行器會嘗試使用執行緒池中的執行緒來執行這個任務,避免了不斷地建立和銷燬執行緒而導致系統性能下降。

執行器框架另一個重要的優勢是 Callable 介面。它類似於 Runnable 介面,但是卻提供了兩方面的增強。

  • 這個介面的主方法名稱為 call() ,可以返回結果。
  • 當傳送一個 Callable 物件給執行器時,將獲得一個實現了 Future 介面的物件。可以使用這個物件來控制 Callable
    物件的狀態和結果。

本章接下來將使用上述由Java併發API提供的類及其變體來展示如何使用執行器框架。

4.2 建立執行緒執行器

使用執行器框架(Executor Framework)的第一步是建立 ThreadPoolExecutor 物件。可以 ThreadPoolExecutor類提供的四個構造器或者使用Executors工廠類來建立 ThreadPoolExecutor 物件。一旦有了執行器,就可以將RunnableCallable物件傳送給它去執行了。

在本節,我們將學習如何使用兩種操作來實現一個範例,這個範列將模擬一個Web伺服器來應對來自不同客戶端的請求。

準備工作

請先行閱讀1.2節來學習用Java建立執行緒的基本機制。然後比較這兩種機制,並根據不同的問題來選擇最佳的一種。

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.實現將被Web伺服器執行的任務。建立一個名為 Task 的類,並實現 Runnable 介面。

public class Task implements Runnable {

2.宣告一個名為 initDate 的私有 Date 屬性,用來儲存任務的建立時間,然後建立一個名為 name 的私有 String 屬性,用來儲存任務的名稱。

private Date initDate;

private String name;

3.實現類的構造器,用來初始化這兩個屬性。

public Task(String name){

initDate=new Date();

this.name=name;

}

4.實現 run() 方法。

@Override

public void run() {

5.在控制檯上輸出 initDate 屬性和實際時間,即任務的開始時間。

System.out.printf("%s: Task %s: Created on: %s\n",Thread.

currentThread().getName(),name,initDate);

System.out.printf("%s: Task %s: Started on: %s\n",Thread.

currentThread().getName(),name,new Date());

6.將任務休眠一段隨機時間。

try {

Long duration=(long)(Math.random()*10);

System.out.printf("%s: Task %s: Doing a task during %d

seconds\n",Thread.currentThread().getName(),name,duration);

TimeUnit.SECONDS.sleep(duration);

} catch (InterruptedException e) {

e.printStackTrace();

}

7.在控制檯輸入任務的完成時間。

System.out.printf("%s: Task %s: Finished on: %s\n",Thread.

currentThread().getName(),name,new Date());

8.建立一個名為 Server的類,它將執行通過執行器接收到的每一個任務。

public class Server {

9.宣告一個名為executorThreadPoolExecutor屬性。

private ThreadPoolExecutor executor;

10.實現類的構造器,通過 Executors 類來初始化 ThreadPoolExecutor 物件。

public Server(){

executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();

}

11.實現 executeTask() 方法。它接收一個 Task 物件作為引數,並將 Task 物件傳送給執行器。在控制檯輸出一條資訊表示新的任務已經到達。

public void executeTask(Task task){

System.out.printf("Server: A new task has arrived\n");

12.呼叫執行器的 execute() 方法將任務傳送給Task

executor.execute(task);

13.在控制檯輸出一些執行器相關的資料來觀察執行器的狀態。

System.out.printf("Server: Pool Size: %d\n",executor.

getPoolSize());

System.out.printf("Server: Active Count: %d\n",executor.

getActiveCount());

System.out.printf("Server: Completed Tasks: %d\n",executor.

getCompletedTaskCount());

14.實現 endServer() 方法。在這個方法裡,呼叫執行器的 shutdown() 方法來結束它的執行。

public void endServer() {

executor.shutdown();

}

15.實現範例的主類,建立 Main 主類,並實現 main() 方法。

public class Main {

public static void main(String[] args) {

Server server=new Server();

for (int i=0; i<100; i++){

Task task=new Task("Task "+i);

server.executeTask(task);

}

server.endServer();

}

}

工作原理

這個範例的核心在於 Server 類,這個類建立和使用 ThreadPoolExecutor 執行器來執行任務。

第一個關鍵點是在 Server 類的構造器中建立 ThreadPoolExecutor 物件。ThreadPoolExecutor 類有4個不同的構造器,但是,由於這些構造器在使用上的複雜性,Java併發API提供 Executors 工廠類來構造執行器和其他相關的物件。雖然可以直接通過 ThreadPoolExecutor 其中之一的構造器來建立 ThreadPoolExecutor 物件,但是推薦使用 Executors 工廠類來建立它。

在這個示例中,通過使用 Executors 工廠類的 newCachedThreadPool() 方法建立了一個快取執行緒池。這個方法返回一個 ExecutorService 物件,因此它將被強制轉換為 ThreadPoolExecutor 型別,並擁有所有的方法。如果需要執行新任務,快取執行緒池就會建立新執行緒;如果執行緒所執行的任務執行完成後並且這個執行緒可用,那麼快取執行緒池將會重用這些執行緒。執行緒重用的優點是減少了建立新執行緒所花費的時間。然而,新任務固定會依賴執行緒來執行,因此快取執行緒池也有缺點,如果傳送過多的任務給執行器,系統的負荷將會過載。

備註:僅當執行緒的數量是合理的或者執行緒只會執行很短的時間時,適合採用 Executors 工廠類的 newCachedThreadPool() 方法來建立執行器。

一旦建立了執行器,就可以使用執行器的 execute() 方法來發送 RunnableCallable 型別的任務。這個範例傳送實現了 Runnable 介面的 Task 型別的物件給執行器。

範例中也列印了一些執行器相關的日誌資訊,專門使用瞭如下方法。

  • getPoolSize() :返回執行器執行緒池中實際的執行緒數。
  • getActiveCount() :返回執行器中正在執行任務的執行緒數。
  • getCompletedTaskCount() :返回執行器已經完成的任務數。

執行器以及 ThreadPoolExecutor 類一個重要的特性是,通常需要顯示地去結束它。如果不這樣做,那麼執行器將繼續執行,程式也不會結束。如果執行器沒有任務可執行了,它將繼續等待新任務的到來,而不會結束執行。Java應用程式不會結束直到所有非守護執行緒結束它們的執行,因此,如果有終止執行器,應用程式將永遠不會結束。

為了完成執行器的執行,可以使用 ThreadPoolExecutor 類的 shutdown() 方法。當執行器執行完成所有待執行的任務後,它將結束執行。呼叫 shutdown() 方法之後,如果嘗試再發送另一個任務給執行器,任務將被拒絕,並且執行器也將丟擲 RejectedExecutionException 異常。

下面的截圖展示了範例執行的部分結果。

Java Concurrency Cook Book 4.1

當最後一個任務到達伺服器時,執行器擁有由100項任務和90個活動執行緒組成的池。

更多資訊

ThreadPoolExecutor 類提供了許多方法來獲取自身狀態的資訊。在範例中,已經使用了 getPoolSize() 方法來獲取執行緒池的大小,用 getActiveCount() 方法來獲取執行緒池中活動執行緒的數量,用 getCompletedTaskCount() 方法來獲取執行器完成的任務數量。也可以使用 getLargestPoolSize() 方法來返回曾經同時位於執行緒池中的最大執行緒數。

ThreadPoolExecutor 類也提供了結束執行器的相關方法。

  • shutdownNow() :這個方法會立即關閉執行器。執行器將不再執行那些正在等待執行的任務。這個方法將返回等待執行的任務列表。呼叫時,正在執行的任務將繼續執行,但是這個方法並不等待這些任務完成。
  • isTerminated():如果呼叫了shutdown()shutdownNow()方法,並且執行器完成了關閉的過程,那麼這個方法將返回 true
  • isShutdown():如果呼叫了shutdown()方法,那麼這個方法將返回true
  • awaitTermination(long timeout, TimeUnit unit):這個方法將阻塞所呼叫的執行緒,直到執行器完成任務或者達到所指定的 timeout值。

TimeUnit是一個列舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。

備註:如果想等待任務的結束,而不管任務的持續時間,可以使用一個大的超時時間,比如DAYS

參見

  • 參見4.12節。
  • 參見8.4節。

4.3 建立固定大小的執行緒執行器

當使用Executors類的newCachedThreadPool()方法建立基本的 ThreadPoolExecutor 時,執行器執行過程中將碰到執行緒數量的問題。如果執行緒池裡沒有空閒的執行緒可用,那麼執行器將為接收到的每一個任務建立一個新執行緒,當傳送大量的任務給執行器並且任務需要持續較長的時間時,系統將會超負荷,應用程式也將隨之效能不佳。

為了避免這個問題,Executors 工廠類提供了一個方法來建立一個固定大小的執行緒執行器。這個執行器有一個執行緒數的最大值,如果傳送超過這個最大值的任務給執行器,執行器將不再建立額外的執行緒,剩下的任務將被阻塞直到執行器有空閒的執行緒可用。這個特性可以保證執行器不會給應用程式帶來效能不佳的問題。

在本節,我們將通過修改本章4.2節的範例來學習如何建立固定大小的執行緒執行器。

準備工作

請先行閱讀本章的4.2節,並實現其中所闡述的範例,因為本節將對其繼續修改。

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.實現本章4.2節所描述的範例。開啟 Server 類並修改它的構造器,使用 newFixedThreadPool() 方法來建立執行器,並傳遞數字 5 作為它的引數。

public Server(){

executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);

}

2.修改 executeTask() 方法,增加一行列印日誌資訊。呼叫 getTaskCount() 方法來獲取已傳送到執行器上的任務數。

System.out.printf("Server: Task Count: %d\n",executor.

getTaskCount());

工作原理

在這個示例中,使用 Executors 工廠類的 newFixedThreadPool() 方法來建立執行器。這個方法建立了具有執行緒最大數量值的執行器。如果傳送超過執行緒數的任務給執行器,剩餘的任務將被阻塞直到執行緒池裡有空閒的執行緒來處理它們。newFixedThreadPool() 方法接收執行器將擁有的執行緒數量的最大值作為引數。這個例子建立了一個執行緒數量的最大值為 5 的執行器。

下面的截圖展示了範例執行的部分結果。

Java Concurrency Cook Book 4.2

為了在程式中輸出相關資訊,已經使用的 ThreadPoolExecutor 類的一些方法如下。

  • getPoolSize():返回執行器中執行緒的實際數量。
  • getActiveCount():返回執行器正在執行任務的執行緒數量。

將看到,控制檯輸出的資訊是 5,表示執行器擁有 5 個執行緒,並且執行器不會超過這個最大的執行緒連線數。

當傳送最後一個任務給執行器時,由於執行器只有 5 個活動的執行緒,所以剩餘的 95 個任務只能等待空閒執行緒。getTaskCount() 方法可以用來顯示有多少個任務已經發送給執行器。

更多資訊

Executors 工廠類也提供 newSingleThreadExecutor() 方法。這是一個建立固定大小執行緒執行器的極端場景,它將建立一個只有單個執行緒的執行器。因此,這個執行器只能在同一時間執行一個任務。

參見

  • 參見4.12節。
  • 參見8.4節。

4.4 在執行器中執行任務並返回結果

執行器框架(Executor Framework的優勢之一是,可以執行併發任務並返回結果。Java併發API通過以下兩個介面來實現這個功能。

Callable:這個介面聲明瞭 call() 方法。可以在這個方法裡實現任務的具體邏輯操作。Callable 介面是一個泛型介面,這就意味著必須宣告 call() 方法返回的資料型別。

Future:這個介面聲明瞭一些方法來獲取由 Callable 物件產生的結果,並管理它們的狀態。

在本節,我們將學習如何實現任務的返回結果,並在執行器中執行任務。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.建立名為FactorialCalculator的類,並實現Callable介面,介面的泛型引數為Integer 型別。

public class FactorialCalculator implements Callable<Integer> {

2.宣告一個名為 number 的私有 Integer 屬性,儲存任務即將用來計算的數字。

private Integer number;

3.實現類的構造器,用來初始化類的屬性。

public FactorialCalculator(Integer number){

this.number=number;

}

4.實現call()方法。這個方法返回FactorialCalculator類的 number 屬性的階乘(Factorial)。

@Override

public Integer call() throws Exception {

5.建立並初始化在call()方法內使用的內部變數。

int result = 1;

6.如果number值是0或1,則返回1;否則計算number的階乘。為了演示效果,在兩個乘法之間,將任務休眠20毫秒。

if ((num==0)||(num==1)) {

result=1;

} else {

for (int i=2; i<=number; i++) {

result*=i;

TimeUnit.MILLISECONDS.sleep(20);

}

}

7.在控制檯輸出操作的結果。

System.out.printf("%s: %d\n",Thread.currentThread().

getName(),result);

8.返回操作的結果。

return result;

9.實現範例的主類,建立 Main 主類,並實現 main() 方法。

public class Main {

public static void main(String[] args) {

10.通過Executors工廠類的newFixedThreadPool()方法建立ThreadPoolExecutor執行器來執行任務。傳遞引數2給newFixedThreadPool()方法表示執行器將最多建立兩個執行緒。

ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.

newFixedThreadPool(2);

11.建立一個 Future<Integer> 型別的列表物件 resultList

List<Future<Integer>> resultList=new ArrayList<>();

12.通過 Random 類建立一個名 random 的隨機數字生成器。

Random random=new Random();

13.生成 10 個介於 0~10 之間的隨機整數。

for (int i=0; i<10; i++){

Integer number= random.nextInt(10);

14.建立 FactorialCaculator 物件,並將隨機數 number 傳遞給它作為引數。

FactorialCalculator calculator=new

FactorialCalculator(number);

15.呼叫執行器的 submit() 方法傳送 FactorialCalculator 任務給執行器。這個方法返回一個 Future<Integer> 物件來管理任務和得到的最終結果。

Future<Integer> result=executor.submit(calculator);

16.將 Future 物件新增到前面建立的 resultList 列表中。

resultList.add(result);

}

17.建立一個 do 迴圈來監控執行器的狀態。

do {

18.通過執行器的 getCompletedTaskNumber() 方法,在控制檯輸出資訊表示任務完成的數量。

System.out.printf("Main: Number of Completed Tasks:

%d\n",executor.getCompletedTaskCount());

19.遍歷 resultList 列表中的 10 個 Future 物件,通過呼叫 isDone() 方法來輸出表示任務是否完成的資訊。

for (int i=0; i<resultList.size(); i++) {

Future<Integer> result=resultList.get(i);

System.out.printf("Main: Task %d: %s\n",i,result.

isDone());

}

20.將執行緒休眠 50 毫秒。

try {

TimeUnit.MILLISECONDS.sleep(50);

} catch (InterruptedException e) {

e.printStackTrace();

}

21.若執行器中完成的任務數量小於 10 ,則一直重複執行這個迴圈。

} while (executor.getCompletedTaskCount()<resultList.size());

22.在控制檯上輸出每一個任務得到的結果。對於每一個 Future 物件來講,通過呼叫 get() 方法將得到由任務返回的 Integer 物件。

System.out.printf("Main: Results\n");

for (int i=0; i<resultList.size(); i++) {

Future<Integer> result=resultList.get(i);

Integer number=null;

try {

number=result.get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

23.在控制檯上打印出數字number

System.out.printf("Main: Task %d: %d\n",i,number);

}

24.呼叫執行器的 shutdown() 方法結束執行。

executor.shutdown();

工作原理

在本節,我們學習瞭如何使用 Callable 介面來啟動併發任務並返回結果。我們編寫了 FactorialCalculator 類,它實現了帶有泛型引數 Integer 型別的 Callable 介面。因此,這個 Integer 型別將作為在呼叫 call() 方法時返回的型別。

範例的另一個關鍵點在 Main 主類中。我們通過 submit() 方法傳送一個 Callable 物件給執行器去執行,這個 submit() 方法接收 Callable 物件作為引數,並返回 Future 物件。Future 物件可以用於以下兩個主要目的。

  • 控制任務的狀態:可以取消任務和檢查任務是否已經完成。為了達到這個目的,可使用 isDone() 方法來檢查任務是否已經完成。
  • 通過 call() 方法獲取返回的結果。為了達到這個目的,可使用 get() 方法。這個方法一直等待直到 Callable 物件的 call() 方法執行完成並返回結果。如果 get() 方法在等待結果時執行緒中斷了,則將丟擲一個 InterruptedException異常。如果 call() 方法丟擲異常那麼 get() 方法將隨之丟擲 ExecutionException 異常。

更多資訊

在呼叫Future物件的get()方法時,如果Future物件所控制的任務並未完成,那麼這個方法將一直阻塞到任務完成。Future 介面也提供了get()方法的其他呼叫方式。

  • get(long timeout,TimeUnit unit):如果呼叫這個方法時,任務的結果並未準備好,則方法等待所指定的timeout時間。如果等待超過了指定的時間而任務的結果還沒有準備好,那麼這個方法將返回null

TimeUnit是一個列舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDSSECONDS

參見

  • 參見4.2節。
  • 參見4.5節。
  • 參見4.6節。

4.5 執行多個任務並處理第一個結果

併發程式設計比較常見的一個問題是,當採用多個併發任務來解決一個問題時,往往只關心這些任務中的第一個結果。比如,對一個數組進行排序有很多種演算法,可以併發啟動所有演算法,但是對於一個給定的陣列,第一個得到排序結果的演算法就是最快的排序演算法。

在本節,我們將學習如何使用 ThreadPoolExecutor 類來實現這個場景。範例允許使用者可以通過兩種驗證機制進行驗證,但是,只要有一種機制驗證成功,那麼這個使用者就被驗證通過了。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.建立一個名為 UserValidator 的類,它將實現使用者驗證的過程。

public class UserValidator {

2.宣告一個名為 name 的私有 String 屬性,用來儲存使用者驗證系統的名稱。

private String name;

3.實現類的構造器,用來初始化類的屬性。

public UserValidator(String name) {

this.name=name;

}

4.實現 validate() 方法。它接收兩個 String 引數,分別取名為使用者名稱name 和密碼 password,這兩個引數也將被用來進行使用者驗證。

public boolean validate(String name, String password) {

5.建立一個名為 randomRandom  型別的隨機物件。

Random random=new Random();

6.等待一段隨機時間來模擬使用者驗證的過程。

try {

long duration=(long)(Math.random()*10);

System.out.printf("Validator %s: Validating a user during %d

seconds\n",this.name,duration);

TimeUnit.SECONDS.sleep(duration);

} catch (InterruptedException e) {

return false;

}

7.返回隨機的 boolean 值。當用戶通過驗證時,這個方法返回 true 值,如果使用者沒有通過驗證則返回 false 值。

return random.nextBoolean();

}

8.實現 getName() 方法。這個方法返回 name 屬性值。

public String getName(){

return name;

}

9.建立一個名為 TaskValidator 的類,它將通過 UserValidation 物件作為併發任務來執行使用者驗證的過程。這個類實現了帶有 String 泛型引數的 Callable 介面。

public class TaskValidator implements Callable<String> {

10.宣告一個名為 validator 的私有 UserValidator 屬性。

private UserValidator validator;

11.宣告兩個私有的 String 屬性,分別為使用者名稱 user 和密碼 password

private String user;

private String password;

12.實現類的構造器,用來初始化類的屬性。

public TaskValidator(UserValidator validator, String user,

String password){

this.validator=validator;

this.user=user;

this.password=password;

}

13.實現call()方法,並返回String物件。

@Override

public String call() throws Exception {

14.如果使用者沒有通過 UserValidator 物件的驗證,就在控制檯輸出沒有找到這個使用者,表明該使用者未通過驗證,並丟擲 Exception 型別的異常。

if (!validator.validate(user, password)) {

System.out.printf("%s: The user has not been found\

n",validator.getName());

throw new Exception("Error validating user");

}

15.否則,就在控制檯輸出使用者已經找到,表明該使用者已經通過驗證,然後返回 UserValidator 物件的名稱。

System.out.printf("%s: The user has been found\n",validator.

getName());

return validator.getName();

16.實現範例的主類,建立 Main 主類,並實現 main() 方法。

public class Main {

public static void main(String[] args) {

17.建立兩個 String 物件,分別取名為 usernamepassword,並初始化這兩個屬性值為test。

String username="test";

String password="test";

18.建立兩個 UserValidator 物件,分別取名為 ldapValidatordbValidator

UserValidator ldapValidator=new UserValidator("LDAP");

UserValidator dbValidator=new UserValidator("DataBase");

19.建立兩個TaskValidator物件,分別取名為ldapTask和dbTask,並分別用ldapValidator 和dbValidator來初始化他們。

TaskValidator ldapTask=new TaskValidator(ldapValidator,

username, password);

TaskValidator dbTask=new TaskValidator(dbValidator,

username,password);

20.建立一個名為 taksList 的 TaskValidator 型別列表,並將 ldapTask 和 dbTask 新增到列表中。

List<TaskValidator> taskList=new ArrayList<>();

taskList.add(ldapTask);

taskList.add(dbTask);

21.通過Executors工廠類的newCachedThreadPool()方法建立一個新的 ThreadPoolExecutor 執行器物件,並建立一個名為 result 的 String 物件。

ExecutorService executor=(ExecutorService)Executors.

newCachedThreadPool();

String result;

22.呼叫執行器的 invokeAny() 方法。這個方法接收 taskList 作為引數,並返回String 物件。然後,在控制檯上輸出這個方法返回的 String 物件。

try {

result = executor.invokeAny(taskList);

System.out.printf("Main: Result: %s\n",result);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

23.通過shutdown()方法來終止執行器,並在控制檯輸出資訊表示程式已經執行結束。

executor.shutdown();

System.out.printf("Main: End of the Execution\n");

工作原理

這個範例的關鍵點在 Main 主類中。ThreadPoolExecutor 類的 invokeAny() 方法接收到一個任務列表,然後執行任務,並返回第一個完成任務並且沒有丟擲異常的任務的執行結果。這個方法返回的型別與任務裡的 call() 方法返回的型別相同,在這個範例中,它將返回 String 型別值。

下面的截圖展示了當範例執行後,有一個任務成功地驗證了使用者後的執行結果。

Java Concurrency Cook Book 4.3

範例中有兩個UserValidator物件,它們返回隨機的boolean值。每一個UserValidator物件被TaskValidator物件使用,TaskValidator物件實現了Callable介面。如果 UserValidator類的validate()方法返回false值,那麼TaskValidator類將丟擲Exception異常。否則,返回true值。

因此,我們有兩個任務可以返回true值或丟擲Exception異常。從而,可以有如下4種可能性。

  • 如果兩個任務都返回true值,那麼invokeAny()方法的結果就是首先完成任務的名稱。
  • 如果第一個任務返回true值,第二個任務丟擲Exception異常,那麼invokeAny() 方法的結果就是第一個任務的名稱。
  • 如果第一個任務丟擲Exception異常,第二個任務返回true值,那麼invokeAny() 方法的結果就是第二個任務的名稱。
  • 如果兩個任務都丟擲Exception異常,那麼invokeAny()方法將丟擲 ExecutionException異常。

將這個範例多執行幾次,那麼將得到如上所述的四種可能的結果。以下截圖則顯示當兩個任務同時丟擲異常時,應用程式得到的結果。

Java Concurrency Cook Book 4.4

更多資訊

ThreadPoolExecutor 類還提供了 invokeAny() 方法的其他版本:

invokeAny(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):這個方法執行所有的任務,如果在給定的超時期滿之前某個任務已經成功完成(也就是未丟擲異常),則返回其結果。

TimeUnit是一個列舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。

參見

  • 參見4.6節。

4.6 執行多個任務並處理所有結果

執行器框架(Executor Framework允許執行併發任務而不需要去考慮執行緒建立和執行。它還提供了可以用來控制在執行器中執行任務的狀態和獲取任務執行結果的 Future 類。

如果想要等待任務結束,可以使用如下兩種方法。

  • 如果任務執行結束,那麼Future介面的isDone()方法將返回true
  • 在呼叫shutdown()方法後,ThreadPoolExecutor類的awaitTermination()方法會將執行緒休眠,直到所有的任務執行結束。

這兩個方法有一些缺點:第一個方法,僅可以控制任務的完成與否;第二個方法,必須關閉執行器來等待一個執行緒,否則呼叫這個方法執行緒將立即返回。

ThreadPoolExecutor 類還提供一個方法,它允許傳送一個任務列表給執行器,並等待列表中所有任務執行完成。在本節,我們將編寫範例,執行三個任務,當它們全部執行結束後打印出結果資訊,用來學習如何使用這個特性。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.建立一個名為 Result 的類,用來儲存範例中併發任務產生的結果。

public class Result {

2.宣告兩個私有屬性。一個名為 nameString 屬性,一個名為 valueint 屬性。

private String name;

private int value;

3.實現對應的 get()set() 方法來設定和返回 namevalue 屬性。

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getValue() {

return value;

}

public void setValue(int value) {

this.value = value;

}

4.建立一個名為Task的類,並實現Callable介面,介面的泛型引數為Result型別。

public class Task implements Callable<Result> {

5.宣告一個名為 name 的私有 String 屬性。

private String name;

6.實現類的構造器,用來初始化類的屬性。

public Task(String name) {

this.name=name;

}

7.實現call()方法。在這個範例中,這個方法將返回一個Result型別的物件。

@Override

public Result call() throws Exception {

8.在控制檯輸出表示任務開始的資訊。

System.out.printf("%s: Staring\n",this.name);

9.等待一段隨機時間。

try {

long duration=(long)(Math.random()*10);

System.out.printf("%s: Waiting %d seconds for results.\

n",this.name,duration);

TimeUnit.SECONDS.sleep(duration);

} catch (InterruptedException e) {

e.printStackTrace();

}

10.生成一個int值,準備作為返回Result物件中的int屬性,這個int值為5個隨機數的總和。

int value=0;

for (int i=0; i<5; i++){

value+=(int)(Math.random()*100);

}

11.建立一個Result物件,並用任務的名稱和上一步計算的int值來對其進行初始化。

Result result=new Result();

result.setName(this.name);

result.setValue(value);

12.在控制檯輸出資訊表示任務執行結束。

System.out.println(this.name+": Ends");

13.返回Result物件。

return result;

}

14.實現範例的主類,建立Main主類,並實現main()方法。

public class Main {

public static void main(String[] args) {

15.通過Executors工廠類的newCachedThreadPool()方法建立一個ThreadPoolExecutor 執行器物件。

ExecutorService executor=(ExecutorService)Executors.

newCachedThreadPool();

16.建立一個Task型別的任務列表taskList。建立3個Task任務並將它們新增到任務列表taskList中。

List<Task> taskList=new ArrayList<>();

for (int i=0; i<3; i++){

Task task=new Task(i);

taskList.add(task);

}

17.建立一個 Future 型別的結果列表 resultList。這些物件泛型引數為 Result 型別。

List<Future<Result>>resultList=null;

18.呼叫 ThreadPoolExecutor 類的 invokeAll() 方法。這個方法將返回上一步所建立的 Future 型別的列表。

try {

resultList=executor.invokeAll(taskList);

} catch (InterruptedException e) {

e.printStackTrace();

}

19.呼叫shutdown()方法結束執行器。

executor.shutdown();

20.在控制檯輸出任務處理的結果,即Future型別列表中的Result結果。

System.out.println("Main: Printing the results");

for (int i=0; i<resultList.size(); i++){

Future<Result> future=resultList.get(i);

try {

Result result=future.get();

System.out.println(result.getName()+": "+result.

getValue());

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

}

工作原理

在本節,我們學習瞭如何傳送任務列表給執行器,並且通過invokeAll()方法等待所有任務的完成。這個方法接收一個Callable物件列表,並返回一個Future物件列表。在這個列表中,每一個任務對應一個Future物件。Future物件列表中的第一個物件控制Callable列表中第一個任務,以此類推。

需要注意的一點是,在儲存結果的列表宣告中,用在Future介面中的泛型引數的資料型別必須與Callable介面的泛型資料型別相相容。在這個例子中,我們使用的是相同的資料型別:Result類。

另一個關於invokeAll()方法重要的地方是,使用Future物件僅用來獲取任務的結果。當所有的任務執行結束時這個方法也執行結束了,如果在返回的Future物件上呼叫isDone()方法,那麼所有的呼叫將返回true值。

更多資訊

ExecutorService 介面還提供了 invokeAll() 方法的另一個版本:

  • invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):當所有任務執行完成,或者超時的時候(無論哪個首先發生),這個方法將返回保持任務狀態和結果的Future列表。

TimeUnit是一個列舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。

參見

  • 參見4.4節。
  • 參見4.5節。

4.7 在執行器中延時執行任務

執行器框架(Executor Framework提供了 ThreadPoolExecutor 類並採用執行緒池來執行 CallableRunnable 型別的任務,採用執行緒池可以避免所有執行緒的建立操作而提高應用程式的效能。當傳送一個任務給執行器時,根據執行器的相應配置,任務將盡可能快地被執行。但是,如果並不想讓任務馬上被執行,而是想讓任務在過一段時間後才被執行,或者任務能夠被週期性地執行。為了達到這個目的,執行器框架提供了 ScheduledThreadPoolExecutor 類。

在本節,我們將學習如何建立 ScheduledThreadPoolExecutor 執行器,以及如何使用它在經過一個給定的時間後開始執行任務。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.建立一個名為Task的類,並實現Callable介面,介面的泛型引數為String型別。

public class Task implements Callable<String> {

2.宣告一個名為name的私有String屬性,用來儲存任務的名稱。

private String name;

3.實現類的構造器,並初始化 name 屬性。

public Task(String name) {

this.name=name;

}

4.實現call()方法。在控制檯輸出實際的時間,並返回一個文字資訊,比如“Hello,world”。

public String call() throws Exception {

System.out.printf("%s: Starting at : %s\n",name,new Date());

return "Hello, world";

}

5.實現範例的主類,建立 Main 主類,並實現 main() 方法。

public class Main {

public static void main(String[] args) {

6.通過Executors工廠類的newScheduledThreadPool()方法建立一個 ScheduledThreadPoolExecutor 執行器,並傳遞 1 作為引數。

ScheduledThreadPoolExecutor executor=(ScheduledThreadPoolExecu

tor)Executors.newScheduledThreadPool(1);

7.初始化一些任務(在我們的示例中是 5 個),然後通過ScheduledThreadPoolExecutor 例項的 schedule() 方法來啟動這些任務。

System.out.printf("Main: Starting at: %s\n",new Date());

for (int i=0; i<5; i++) {

Task task=new Task("Task "+i);

executor.schedule(task,i+1 , TimeUnit.SECONDS);

}

8.呼叫執行器的 shutdown() 方法來結束執行器。

executor.shutdown();

9.呼叫執行器的 awaitTermination() 方法等待所有任務結束。

try {

executor.awaitTermination(1, TimeUnit.DAYS);

} catch (InterruptedException e) {

e.printStackTrace();

}

10.在控制檯輸出資訊表示程式執行結束的時間。

System.out.printf("Main: Ends at: %s\n",new Date());

工作原理

這個範例的關鍵點在於 Main 主類和 ScheduledThreadPoolExecutor 執行器的管理。雖然可以通過 ThreadPoolExecutor 類來建立定時執行器,但是在Java併發API中則推薦利用 Executors 工廠類來建立。在這個範例中,必須使用 newScheduledThreadPool() 方法,並且傳遞數字 1 作為方法的引數,這個引數就是執行緒池裡擁有的執行緒數。

為了在定時執行器中等待一段給定的時間後執行一個任務,需要使用 schedule() 方法。這個方法接收如下的引數:

  • 即將執行的任務;
  • 任務執行前所要等待的時間;
  • 等待時間的單位,由 TimeUnit 類的一個常量來指定。

在這個示例中,每個任務將等待 N 秒(TimeUnit.SECONDS),這個 N 值則等於任務在陣列中的位置加 1。

備註:如果想在一個給定的時間點來定時執行任務,那就需要計算這個給定時間點和當前時間的差異值,然後用這個差異值作為任務的延遲值。

通過下面的截圖,可以看到範例執行的部分結果。

Java Concurrency Cook Book 4.5

從結果可知,每隔 1 秒鐘就有一個任務開始執行;這是因為所有的任務被同時傳送到執行器,但每個任務都比前一個任務延遲了 1 秒鐘。

更多資訊

也可以使用Runnable介面來實現任務,因為ScheduledThreadPoolExecutor類的 schedule()方法可以同時接受這兩種型別的任務。

雖然ScheduledThreadPoolExecutor 類是 ThreadPoolExecutor 類的子類,因而繼承了 ThreadPoolExecutor 類所有的特性。但是,Java推薦僅在開發定時任務程式時採用 ScheduledThreadPoolExecutor 類。

最後,在呼叫shutdown()方法而仍有待處理的任務需要執行時,可以配置 ScheduledThreadPoolExecutor的行為。預設的行為是不論執行器是否結束,待處理的任務仍將被執行。但是,通過呼叫ScheduledThreadPoolExecutor類的 setExecuteExisting
DelayedTasksAfterShutdownPolicy()方法則可以改變這個行為。傳遞false引數給這個方法,執行shutdown()方法後,待處理的任務將不會被執行。

參見

  • 參見4.4節。

4.8 在執行器中週期性執行任務

執行器框架(Executor Framework提供了 ThreadPoolExecutor 類,通過執行緒池來執行併發任務從而避免了所有執行緒的建立操作。當傳送一個任務給執行器後,根據執行器的配置,它將盡快地執行這個任務。當任務執行結束後,這個任務就會從執行器中刪除;如果想再次執行這個任務,則需要再次傳送這個任務到執行器。

但是,執行器框架提供了 ScheduledThreadPoolExecutor 類來執行週期性的任務。在本節,我們將學習如何使用這個類的功能來計劃執行週期性的任務。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.建立一個名為 Task 的類,並實現 Runnable 介面。

public class Task implements Runnable {

2.宣告一個名為 name 的私有 String 屬性,用來儲存任務的名稱。

private String name;

3.實現類的構造器,用來初始化類的屬性。

public Task(String name) {

this.name=name;

}

4.實現 run() 方法。在控制檯輸出實際的時間,用來檢驗任務將在指定的一段時間內執行。

@Override

public String call() throws Exception {

System.out.printf("%s: Starting at : %s\n",name,new Date());

return "Hello, world";

}

5.實現範例的主類,建立 Main 主類,並實現 main() 方法。

public class Main {

public static void main(String[] args) {

6.通過呼叫Executors工廠類的newScheduledThreadPool()方法建立ScheduledThreadPoolExecutor 執行器物件,傳遞 1 作為這個方法的引數。

ScheduledExecutorService executor=Executors.

newScheduledThreadPool(1);

7.在控制檯輸出實際時間。

System.out.printf("Main: Starting at: %s\n",new Date());

8.建立一個新的Task物件。

Task task=new Task("Task");

9.呼叫scheduledAtFixRate()方法將這個任務傳送給執行器。傳遞給這個方法的引數分別為上一步建立的task物件、數字1、數字2,以及TimeUnit.SECONDS常量。這個方法返回一個用來控制任務狀態的ScheduledFuture物件。

ScheduledFuture<?> result=executor.scheduleAtFixedRate(task,

1, 2, TimeUnit.SECONDS);

10.建立一個10步的迴圈,在控制檯輸出任務下一次將要執行的剩餘時間。在迴圈體內,用ScheduledFuture類的getDelay()方法來獲取任務下一次將要執行的毫秒數,然後將執行緒休眠500毫秒。

for (int i=0; i<10; i++){

System.out.printf("Main: Delay: %d\n",result.

getDelay(TimeUnit.MILLISECONDS));

Sleep the thread during 500 milliseconds.

try {

TimeUnit.MILLISECONDS.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

11.呼叫 shutdown() 方法結束執行器。

executor.shutdown();

12.將執行緒休眠 5 秒,等待週期性的任務全部執行完成。

try {

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

e.printStackTrace();

}

13.在控制檯輸出資訊表示程式結束。

System.out.printf("Main: Finished at: %s\n",new Date());

工作原理

想要通過執行器框架來執行一個週期性任務時,需要一個ScheduledExecutorService 物件。同建立執行器一樣,在Java中推薦使用Executors工廠類來建立 ScheduledExecutorService物件。Executors類就是執行器物件的工廠。在這個例子中,可以使用newScheduledThreadPool()方法來建立一個ScheduledExecutorService物件。這個方法接收一個表示執行緒池中的執行緒數來作為引數。在這個範例中,因為僅有一個任務,所以只需要傳遞數字 1 作為引數即可。

一旦有了可以執行週期性任務的執行器,就可以傳送任務給這個執行器。在範例中,我們使用scheduledAtFixedRate()方法傳送任務。這個方法接收4個引數,分別為將被週期性執行的任務,任務第一次執行後的延時時間,兩次執行的時間週期,以及第2個和第3個引數的時間單位。這個單位是TimeUnit列舉的常量。TimeUnit是一個列舉類,有如下的常量:DAYSHOURSMICROSECONDSMILLISECONDSMINUTESNANOSECONDSSECONDS

另一個需要注意的是,兩次執行之間的週期是指任務在兩次執行開始時的時間間隔。如果有一個週期性的任務需要執行 5 秒鐘,但是卻讓它每 3 秒鐘執行一次,那麼,在任務執行的過程中將會有兩個任務例項同時存在。

scheduleAtFixedRate()方法返回一個ScheduledFuture物件,ScheduledFuture介面則擴充套件了Future介面,於是它帶有了定時任務的相關操作方法。ScheduledFuture是一個泛型引數化的介面。在這個示例中,任務是Runnable物件,並沒有泛型引數化,必須通過 ? 符號作為引數來泛型化它們。

我們已經使用過 ScheduledFuture 介面中的一個方法。getDelay()方法返回任務到下一次執行時所要等待的剩餘時間。這個方法接收一個 TimeUnit 常量作為時間單位。

下面的截圖顯示了範例的部分執行結果。

Java Concurrency Cook Book 4.6

通過控制上面的資訊,可以看到任務是每 2 秒執行一次;剩餘的延遲時間會每隔 500 毫秒在控制檯上輸出,這個 500 毫秒則是主執行緒將被休眠的時間。當關閉執行器時,定時任務將結束執行,然後在控制檯上也看不到更多的資訊了。

更多資訊

ScheduledThreadPoolExecutor 類還提供了其他方法來安排週期性任務的執行,比如,scheduleWithFixedRate()方法。這個方法與 scheduledAtFixedRate() 方法具有相同的引數,但是略有一些不同需要引起注意。在 scheduledAtFixedRate() 方法中,第 3 個引數表示任務兩次執行開始時間的間隔,而在 schedulledWithFixedDelay () 方法中,第 3 個引數則是表示任務上一次執行結束的時間與任務下一次開始執行的時間的間隔。

也可以配置ScheduledThreadPoolExecutor實現shutdown()方法的行為,預設行為是當呼叫shutdown()方法後,定時任務就結束了。可以通過 ScheduledThreadPoolExecutor類的setContinueExistingPeriodicTasksAfterShutdownPolicy() 方法來改變這個行為,傳遞引數true給這個方法,這樣呼叫shutdown()方法後,週期性任務仍將繼續執行。

參見

  • 參見4.2節。
  • 參見4.7節。

4.9 在執行器中取消任務

使用執行器時,不需要管理執行緒,只需要實現 RunnableCallable 任務併發送任務給執行器即可。執行器負責建立執行緒,管理執行緒池中的執行緒,當執行緒不再需要時就銷燬它們。有時候,我們可能需要取消已經發送給執行器的任務。在這種情況下,可以使用 Future 介面的 cancel() 方法來執行取消操作。在本節,我們將學習如何使用這個方法來取消已經發送給執行器的任務。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.建立一個名為Task的類,並實現Callable介面,介面的泛型引數為String型別。接著實現call()方法,構造一個無限迴圈,先在控制檯輸出資訊,然後休眠100毫秒。

public class Task implements Callable<String> {

@Override

public String call() throws Exception {

while (true){

System.out.printf("Task: Test\n");

Thread.sleep(100);

}

}

2.實現範例主類,建立 Main 主類,並實現 main() 方法。

public class Main {

public static void main(String[] args) {

3.通過Executors工廠類的newCachedThreadPool()方法建立一個ThreadPoolExecutor執行器物件。

ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.

newCachedThreadPool();

4.建立一個新的Task物件。

Task task=new Task();

5.呼叫submit()方法將任務傳送給執行器。

System.out.printf("Main: Executing the Task\n");

Future<String> result=executor.submit(task);

6.讓主執行緒休眠2秒。

try {

TimeUnit.SECONDS.sleep(2);

} catch (InterruptedException e) {

e.printStackTrace();

}

7.執行器的submit()方法返回名為resultFuture物件,呼叫Future物件的cancel() 方法來取消任務的執行。傳遞引數true給這個cancel()方法。

System.out.printf("Main: Canceling the Task\n");

result.cancel(true);

8.在控制檯輸出呼叫isCancelled() 方法和isDone()方法的結果,來驗證任務已經被取消和已完成。

System.out.printf("Main: Cancelled: %s\n",result.isCancelled());

System.out.printf("Main: Done: %s\n",result.isDone());

9.呼叫shutdown()方法結束執行器,然後在控制檯輸出資訊表示程式執行結束。