Fork/Join框架(六)取消任務
宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González 譯者:許巧輝 校對:方騰飛
取消任務
當你在一個ForkJoinPool類中執行ForkJoinTask物件,在它們開始執行之前,你可以取消執行它們。ForkJoinTask類提供cancel()方法用於這個目的。當你想要取消一個任務時,有一些點你必須考慮一下,這些點如下:
- ForkJoinPool類並沒有提供任何方法來取消正在池中執行或等待的所有任務。
- 當你取消一個任務時,你不能取消一個已經執行的任務。
在這個指南中,你將實現取消ForkJoinTask物件的例子。你將查詢數在陣列中的位置。第一個找到這個數的任務將取消剩下的任務(未找到這個數的任務)。由於Fork/Join框架並沒有提供這種功能,所以,你將實現一個輔助類來做這個取消功能。
準備工作…
這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。
如何做…
按以下步驟來實現這個例子:
1.建立ArrayGenerator類。這個類將產生一組隨機的、指定大小的整數數字。實現generateArray()方法。它將產生一組數字,它接收陣列大小作為引數。
public class ArrayGenerator { public int[] generateArray(int size) { int array[]=new int[size]; Random random=new Random(); for (int i=0; i<size; i++){ array[i]=random.nextInt(10); } return array; }
2.建立一個TaskManager類。我們將使用這個類來儲存在ForkJoinPool中執行的所有任務。由於ForkJoinPool和ForkJoinTask類的侷限性,你將使用這個類來取消ForkJoinPool類的所有任務。
public class TaskManager {
3.宣告一個物件引數化為ForkJoinTask型別的數列,其中ForkJoinTask類引數化為Integer型別。
private List<ForkJoinTask<Integer>> tasks;
4.實現這個類的構造器,它初始化任務數列。
public TaskManager(){ tasks=new ArrayList<>(); }
5.實現addTask()方法。它新增ForkJoinTask物件到任務數列。
public void addTask(ForkJoinTask<Integer> task){ tasks.add(task); }
6.實現cancelTasks()方法。它將使用cancel()方法取消在數列中的所有ForkJoinTask物件。它接收一個想要取消剩餘任務的ForkJoinTask物件作為引數。這個方法取消所有任務。
public void cancelTasks(ForkJoinTask<Integer> cancelTask){ for (ForkJoinTask<Integer> task :tasks) { if (task!=cancelTask) { task.cancel(true); ((SearchNumberTask)task).writeCancelMessage(); } } }
7.實現SearchNumberTask類,指定它繼承引數化為Integer型別的RecursiveTask類。這個類將查詢在整數陣列的元素塊中的數。
public class SearchNumberTask extends RecursiveTask<Integer> {
8.宣告一個私有的、int型別的數字陣列。
private int numbers[];
9.宣告兩個私有的、int型別的屬性start和end。這些屬性將決定任務要處理的陣列的元素。
private int start, end;
10.宣告一個私有的、int型別的屬性number,它將儲存你將要查詢的數。
private int number;
11.宣告一個私有的、TaskManager型別的屬性manager。你將使用這個物件來取消所有任務。
private TaskManager manager;
12.宣告一個私有的、int型別的常量並初始化它為值-1。當任務沒有找到這個數時,它將作為任務的返回值。
private final static int NOT_FOUND=-1;
13.實現這個類的構造器來初始化它的屬性。
public Task(int numbers[], int start, int end, int number, TaskManager manager){ this.numbers=numbers; this.start=start; this.end=end; this.number=number; this.manager=manager; }
14.實現compute()方法。寫入一條資訊(start和end屬性值)到控制檯表明這個方法的開始。
@Override protected Integer compute() { System.out.println("Task: "+start+":"+end);
15.如果start和end之差大於10(這個任務將處理超過10個元素的陣列),呼叫launchTasks()方法,將這個任務的工作拆分成兩個任務。
int ret; if (end-start>10) { ret=launchTasks();
16.否則,這個任務呼叫lookForNumber()方法來查詢在陣列塊中的數。
} else { ret=lookForNumber(); }
17.返回任務的結果。
return ret;
18.實現lookForNumber()方法。
private int lookForNumber() {
19.對於任務要處理的元素塊中的所有元素,將你想要查詢的數與儲存在元素中的值進行比較。如果他們相等,寫入一條資訊到控制檯表明這種情形,使用TaskManager物件的cancelTasks()方法來取消所有任務,並返回你已經找到的這個數對應元素的位置。
for (int i=start; i<end; i++){ if (array[i]==number) { System.out.printf("Task: Number %d found in position %d\n",number,i); manager.cancelTasks(this); return i; }
20.在迴圈的內部,令任務睡眠1秒。
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }
21.最後,返回值-1。
return NOT_FOUND; }
22.實現launchTasks()方法。首先,將這個任務要處理的數塊分成兩個部分,然後,建立兩個Task物件來處理它們。
private int launchTasks() { int mid=(start+end)/2; Task task1=new Task(array,start,mid,number,manager); Task task2=new Task(array,mid,end,number,manager);
23.新增這個任務到TaskManager物件中。
manager.addTask(task1); manager.addTask(task2);
24.使用fork()方法非同步執行這兩個任務。
task1.fork(); task2.fork();
25.等待這個任務的結束,返回第一個任務的結果(如果它不等於1),或第二個任務的結果。
int returnValue; returnValue=task1.join(); if (returnValue!=-1) { return returnValue; } returnValue=task2.join(); return returnValue;
26.實現writeCancelMessage()方法,當任務取消時,寫一條資訊到控制檯。
public void writeCancelMessage(){ System.out.printf("Task: Canceled task from %d to %d",start,end); }
27.實現這個例子的主類,通過建立Main類,並實現main()方法。
public class Main { public static void main(String[] args) {
28.使用ArrayGenerator類,建立一個有1000個數字的陣列。
ArrayGenerator generator=new ArrayGenerator(); int array[]=generator.generateArray(1000);
29.建立一個TaskManager物件。
TaskManager manager=new TaskManager();
30.使用預設的構造器建立一個ForkJoinPool物件。
ForkJoinPool pool=new ForkJoinPool();
31.建立一個Task物件來處理前面生成的陣列。
Task task=new Task(array,0,1000,5,manager);
32.使用execute()方法,在池中非同步執行任務。
pool.execute(task);
33.使用shutdown()方法關閉這個池。
pool.shutdown();
34.使用ForkJoinPool類的awaitTermination()方法,等待任務的結束。
try { pool.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
35.寫入一條資訊到控制檯,表明程式的結束。
System.out.printf("Main: The program has finished\n");
它是如何工作的…
ForkJoinTask提供cancel()方法,允許你取消一個還未執行的任務。這是一個非常重要的點。如果任務已經開始它的執行,那麼呼叫cancel()方法對它沒有影響。這個方法接收一個Boolean值,名為mayInterruptIfRunning的引數。這個名字可能讓你覺得,如果你傳入一個true值給這個方法,這個任務將被取消,即使它正在執行。
Java API文件指出,在ForkJoinTask類的預設實現中,這個屬性不起作用。任務只能在它們還未開始執行時被取消。一個任務的取消不會影響到已經提到到池的(其他)任務。它們繼續它們的執行。 Fork/Join框架的一個侷限性是,它不允許取消在ForkJoinPool中的所有任務。為了克服這個限制,你實現了TaskManager類。它儲存被提到池中的所有任務。它有一個方法取消它儲存的所有任務。如果一個任務由於它正在執行或已經完成而不能被取消,cancel()方法返回false值,所以,你可以嘗試取消所有任務,而不用擔心可能有間接的影響。 在這個例子中,你已經實現一個任務,用來在一個數字陣列中查詢一個數。如Fork/Join框架所推薦的,你將問題分解成更小的子問題。你只關心這個數的出現,所以當你找到它,你取消了其他任務。 以下截圖顯示這個例子執行的一部分: 參見
- 在第5章,Fork/Join框架中的建立一個Fork/Join池指南