1. 程式人生 > >Fork/Join框架(四)非同步執行任務

Fork/Join框架(四)非同步執行任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛

非同步執行任務

當你在ForkJoinPool中執行ForkJoinTask時,你可以使用同步或非同步方式來實現。當你使用同步方式時,提交任務給池的方法直到提交的任務完成它的執行,才會返回結果。當你使用非同步方式時,提交任務給執行者的方法將立即返回,所以這個任務可以繼續執行。

你應該意識到這兩個方法有很大的區別,當你使用同步方法,呼叫這些方法(比如:invokeAll()方法)的任務將被阻塞,直到提交給池的任務完成它的執行。這允許ForkJoinPool類使用work-stealing演算法,分配一個新的任務給正在執行睡眠任務的工作執行緒。反之,當你使用非同步方法(比如:fork()方法),這個任務將繼續它的執行,所以ForkJoinPool類不能使用work-stealing演算法來提高應用程式的效能。在這種情況下,只有當你呼叫join()或get()方法來等待任務的完成時,ForkJoinPool才能使用work-stealing演算法。

在這個指南中,你將學習如何使用ForkJoinPool和ForkJoinTask類提供的非同步方法來管理任務。你將實現一個程式,在一個資料夾及其子資料夾內查詢確定副檔名的檔案。你將實現ForkJoinTask類來處理資料夾的內容。對於資料夾裡的每個子資料夾,它將以非同步的方式提交一個新的任務給ForkJoinPool類。對於資料夾裡的每個檔案,任務將檢查檔案的副檔名,如果它被處理,並把它新增到結果列表。

如何做…

按以下步驟來實現這個例子:

1.建立FolderProcessor類,指定它繼承RecursiveTask類,並引數化為List<String>型別。

public class FolderProcessor extends RecursiveTask<List<String>> {

2.宣告這個類的序列號版本UID。這個元素是必需的,因為RecursiveTask類的父類,ForkJoinTask類實現了Serializable介面。

private static final long serialVersionUID = 1L;

3.宣告一個私有的、String型別的屬性path。這個屬性將儲存任務將要處理的資料夾的全路徑。

private String path;

4.宣告一個私有的、String型別的屬性extension。這個屬性將儲存任務將要查詢的檔案的副檔名。

private String extension;

5.實現這個類的構造器,初始化它的屬性。

public FolderProcessor (String path, String extension) {
this.path=path;
this.extension=extension;
}

6.實現compute()方法。正如你用List<String>型別引數化RecursiveTask類,這個方法將返回這個型別的一個物件。

@Override
protected List<String> compute() {

7.宣告一個String物件的數列,用來儲存儲存在資料夾中的檔案。

List<String> list=new ArrayList<>();

8.宣告一個FolderProcessor任務的數列,用來儲存將要處理儲存在資料夾內的子資料夾的子任務。

List<FolderProcessor> tasks=new ArrayList<>();

9.獲取資料夾的內容。

File file=new File(path);
File content[] = file.listFiles();

10.對於資料夾裡的每個元素,如果是子資料夾,則建立一個新的FolderProcessor物件,並使用fork()方法非同步地執行它。

if (content != null) {
for (int i = 0; i < content.length; i++) {
if (content[i].isDirectory()) {
FolderProcessor task=new FolderProcessor(content[i].
getAbsolutePath(), extension);
task.fork();
tasks.add(task);

11.否則,使用checkFile()方法比較這個檔案的副檔名和你想要查詢的副檔名,如果它們相等,在前面宣告的字串數列中儲存這個檔案的全路徑。

} else {
if (checkFile(content[i].getName())){
list.add(content[i].getAbsolutePath());
}
}
}

12.如果FolderProcessor子任務的數列超過50個元素,寫入一條資訊到控制檯表明這種情況。

if (tasks.size()>50) {
System.out.printf("%s: %d tasks ran.\n",file.
getAbsolutePath(),tasks.size());
}

13.呼叫輔助方法addResultsFromTask(),將由這個任務發起的子任務返回的結果新增到檔案數列中。傳入引數:字串數列和FolderProcessor子任務數列。

addResultsFromTasks(list,tasks);

14.返回字串數列。

return list;

15.實現addResultsFromTasks()方法。對於儲存在tasks數列中的每個任務,呼叫join()方法,這將等待任務執行的完成,並且返回任務的結果。使用addAll()方法將這個結果新增到字串數列。

private void addResultsFromTasks(List<String> list,
List<FolderProcessor> tasks) {
for (FolderProcessor item: tasks) {
list.addAll(item.join());
}
}

16.實現checkFile()方法。這個方法將比較傳入引數的檔名的結束擴充套件是否是你想要查詢的。如果是,這個方法返回true,否則,返回false。

private boolean checkFile(String name) {
return name.endsWith(extension);
}

17.實現這個例子的主類,通過建立Main類,並實現main()方法。

public class Main {
public static void main(String[] args) {

18.使用預設構造器建立ForkJoinPool。

ForkJoinPool pool=new ForkJoinPool();

19.建立3個FolderProcessor任務。用不同的資料夾路徑初始化每個任務。

FolderProcessor system=new FolderProcessor("C:\\Windows",
"log");
FolderProcessor apps=new
FolderProcessor("C:\\Program Files","log");
FolderProcessor documents=new FolderProcessor("C:\\Documents
And Settings","log");

20.在池中使用execute()方法執行這3個任務。

pool.execute(system);
pool.execute(apps);
pool.execute(documents);

21.將關於池每秒的狀態資訊寫入到控制檯,直到這3個任務完成它們的執行。

do {
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n",pool.
getParallelism());
System.out.printf("Main: Active Threads: %d\n",pool.
getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n",pool.
getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n",pool.
getStealCount());
System.out.printf("*****************************************
*\n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while((!system.isDone())||(!apps.isDone())||(!documents.
isDone()));

22.使用shutdown()方法關閉ForkJoinPool。

pool.shutdown();

23.將每個任務產生的結果數量寫入到控制檯。

List<String> results;
results=system.join();
System.out.printf("System: %d files found.\n",results.size());
results=apps.join();
System.out.printf("Apps: %d files found.\n",results.size());
results=documents.join();
System.out.printf("Documents: %d files found.\n",results.
size());

它是如何工作的…

以下截圖顯示了這個例子執行的一部分:

3

這個例子的關鍵是FolderProcessor類。每個任務處理資料夾的內容。如你所知,這個內容有以下兩種元素:

  • 檔案
  • 其他資料夾

如果任務找到一個資料夾,它建立另一個Task物件來處理這個資料夾,並使用fork()方法把它(Task物件)提交給池。這個方法提交給池的任務將被執行,如果池中有空閒的工作執行緒或池可以建立一個新的工作執行緒。這個方法會立即返回,所以這個任務可以繼續處理資料夾的內容。對於每個檔案,任務將它的擴充套件與所想要查詢的(擴充套件)進行比較,如果它們相等,將檔名新增到結果數列。

一旦這個任務處理完指定資料夾的所有內容,它將使用join()方法等待已提交到池的所有任務的結束。這個方法在一個任務等待其執行結束時呼叫,並返回compute()方法返回的值。這個任務將它自己傳送的所有任務的結果和它自己的結果分組,並返回作為compute()方法的一個返回值的陣列。

ForkJoinPool類同時允許任務的執行以非同步的方式。你已經使用execute()方法,提交3個初始任務給池。在Main類中,你也使用shutdown()方法結束池,並列印關於正在池中執行任務的狀態和變化的資訊。ForkJoinPool類包含更多方法,可用於這個目的(非同步執行任務)。參見監控一個Fork/Join池指南,看這些方法完整的列表。

不止這些…

在這個示例中,你已經使用了join()方法來等待任務的結束,並獲得它們的結果。對於這個目的,你也可以使用get()方法的兩個版本之一:

  • get():這個版本的get()方法,如果ForkJoinTask已經結束它的執行,則返回compute()方法的返回值,否則,等待直到它完成。
  • get(long timeout, TimeUnit unit):這個版本的get()方法,如果任務的結果不可用,則在指定的時間內等待它。如果超時並且任務的結果仍不可用,這個方法返回null值。TimeUnit類是一個列舉類,包含以下常量:DAYS,HOURS,MICROSECONDS, MILLISECONDS,MINUTES, NANOSECONDS 和 SECONDS。

get()和join()有兩個主要的區別:

  • join()方法不能被中斷。如果你中斷呼叫join()方法的執行緒,這個方法將丟擲InterruptedException異常。
  • 如果任務丟擲任何未受檢異常,get()方法將返回一個ExecutionException異常,而join()方法將返回一個RuntimeException異常。

參見

  • 在第5章,Fork/Join框架中的建立一個Fork/Join池指南
  • 在第8章,測試併發應用程式中的監控一個Fork/Join池指南