1. 程式人生 > >Fork/Join框架(六)取消任務

Fork/Join框架(六)取消任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛

取消任務

當你在一個ForkJoinPool類中執行ForkJoinTask物件,在它們開始執行之前,你可以取消執行它們。ForkJoinTask類提供cancel()方法用於這個目的。當你想要取消一個任務時,有一些點你必須考慮一下,這些點如下:

  • ForkJoinPool類並沒有提供任何方法來取消正在池中執行或等待的所有任務。
  • 當你取消一個任務時,你不能取消一個已經執行的任務。

在這個指南中,你將實現取消ForkJoinTask物件的例子。你將查詢數在陣列中的位置。第一個找到這個數的任務將取消剩下的任務(未找到這個數的任務)。由於Fork/Join框架並沒有提供這種功能,所以,你將實現一個輔助類來做這個取消功能。

準備工作…

這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。

如何做…

按以下步驟來實現這個例子:

1.建立ArrayGenerator類。這個類將產生一組隨機的、指定大小的整數數字。實現generateArray()方法。它將產生一組數字,它接收陣列大小作為引數。

public class ArrayGenerator {
public int[] generateArray(int size) {
int array[]=new int[size];
Random random=new Random();
for (int i=0; i<size; i++){
array[i]=random.nextInt(10);
}
return array;
}

2.建立一個TaskManager類。我們將使用這個類來儲存在ForkJoinPool中執行的所有任務。由於ForkJoinPool和ForkJoinTask類的侷限性,你將使用這個類來取消ForkJoinPool類的所有任務。

public class TaskManager {

3.宣告一個物件引數化為ForkJoinTask型別的數列,其中ForkJoinTask類引數化為Integer型別。

private List<ForkJoinTask<Integer>> tasks;

4.實現這個類的構造器,它初始化任務數列。

public TaskManager(){
tasks=new ArrayList<>();
}

5.實現addTask()方法。它新增ForkJoinTask物件到任務數列。

public void addTask(ForkJoinTask<Integer> task){
tasks.add(task);
}

6.實現cancelTasks()方法。它將使用cancel()方法取消在數列中的所有ForkJoinTask物件。它接收一個想要取消剩餘任務的ForkJoinTask物件作為引數。這個方法取消所有任務。

public void cancelTasks(ForkJoinTask<Integer> cancelTask){
for (ForkJoinTask<Integer> task :tasks) {
if (task!=cancelTask) {
task.cancel(true);
((SearchNumberTask)task).writeCancelMessage();
}
}
}

7.實現SearchNumberTask類,指定它繼承引數化為Integer型別的RecursiveTask類。這個類將查詢在整數陣列的元素塊中的數。

public class SearchNumberTask extends RecursiveTask<Integer> {

8.宣告一個私有的、int型別的數字陣列。

private int numbers[];

9.宣告兩個私有的、int型別的屬性start和end。這些屬性將決定任務要處理的陣列的元素。

private int start, end;

10.宣告一個私有的、int型別的屬性number,它將儲存你將要查詢的數。

private int number;

11.宣告一個私有的、TaskManager型別的屬性manager。你將使用這個物件來取消所有任務。

private TaskManager manager;

12.宣告一個私有的、int型別的常量並初始化它為值-1。當任務沒有找到這個數時,它將作為任務的返回值。

private final static int NOT_FOUND=-1;

13.實現這個類的構造器來初始化它的屬性。

public Task(int numbers[], int start, int end, int number,
TaskManager manager){
this.numbers=numbers;
this.start=start;
this.end=end;
this.number=number;
this.manager=manager;
}

14.實現compute()方法。寫入一條資訊(start和end屬性值)到控制檯表明這個方法的開始。

@Override
protected Integer compute() {
System.out.println("Task: "+start+":"+end);

15.如果start和end之差大於10(這個任務將處理超過10個元素的陣列),呼叫launchTasks()方法,將這個任務的工作拆分成兩個任務。

int ret;
if (end-start>10) {
ret=launchTasks();

16.否則,這個任務呼叫lookForNumber()方法來查詢在陣列塊中的數。

} else {
ret=lookForNumber();
}

17.返回任務的結果。

return ret;

18.實現lookForNumber()方法。

private int lookForNumber() {

19.對於任務要處理的元素塊中的所有元素,將你想要查詢的數與儲存在元素中的值進行比較。如果他們相等,寫入一條資訊到控制檯表明這種情形,使用TaskManager物件的cancelTasks()方法來取消所有任務,並返回你已經找到的這個數對應元素的位置。

for (int i=start; i<end; i++){
if (array[i]==number) {
System.out.printf("Task: Number %d found in position
%d\n",number,i);
manager.cancelTasks(this);
return i;
}

20.在迴圈的內部,令任務睡眠1秒。

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

21.最後,返回值-1。

return NOT_FOUND;
}

22.實現launchTasks()方法。首先,將這個任務要處理的數塊分成兩個部分,然後,建立兩個Task物件來處理它們。

private int launchTasks() {
int mid=(start+end)/2;
Task task1=new Task(array,start,mid,number,manager);
Task task2=new Task(array,mid,end,number,manager);

23.新增這個任務到TaskManager物件中。

manager.addTask(task1);
manager.addTask(task2);

24.使用fork()方法非同步執行這兩個任務。

task1.fork();
task2.fork();

25.等待這個任務的結束,返回第一個任務的結果(如果它不等於1),或第二個任務的結果。

int returnValue;
returnValue=task1.join();
if (returnValue!=-1) {
return returnValue;
}
returnValue=task2.join();
return returnValue;

26.實現writeCancelMessage()方法,當任務取消時,寫一條資訊到控制檯。

public void writeCancelMessage(){
System.out.printf("Task: Canceled task from %d to
%d",start,end);
}

27.實現這個例子的主類,通過建立Main類,並實現main()方法。

public class Main {
public static void main(String[] args) {

28.使用ArrayGenerator類,建立一個有1000個數字的陣列。

ArrayGenerator generator=new ArrayGenerator();
int array[]=generator.generateArray(1000);

29.建立一個TaskManager物件。

TaskManager manager=new TaskManager();

30.使用預設的構造器建立一個ForkJoinPool物件。

ForkJoinPool pool=new ForkJoinPool();

31.建立一個Task物件來處理前面生成的陣列。

Task task=new Task(array,0,1000,5,manager);

32.使用execute()方法,在池中非同步執行任務。

pool.execute(task);

33.使用shutdown()方法關閉這個池。

pool.shutdown();

34.使用ForkJoinPool類的awaitTermination()方法,等待任務的結束。

try {
pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}

35.寫入一條資訊到控制檯,表明程式的結束。

System.out.printf("Main: The program has finished\n");

它是如何工作的…

ForkJoinTask提供cancel()方法,允許你取消一個還未執行的任務。這是一個非常重要的點。如果任務已經開始它的執行,那麼呼叫cancel()方法對它沒有影響。這個方法接收一個Boolean值,名為mayInterruptIfRunning的引數。這個名字可能讓你覺得,如果你傳入一個true值給這個方法,這個任務將被取消,即使它正在執行。

Java API文件指出,在ForkJoinTask類的預設實現中,這個屬性不起作用。任務只能在它們還未開始執行時被取消。一個任務的取消不會影響到已經提到到池的(其他)任務。它們繼續它們的執行。 Fork/Join框架的一個侷限性是,它不允許取消在ForkJoinPool中的所有任務。為了克服這個限制,你實現了TaskManager類。它儲存被提到池中的所有任務。它有一個方法取消它儲存的所有任務。如果一個任務由於它正在執行或已經完成而不能被取消,cancel()方法返回false值,所以,你可以嘗試取消所有任務,而不用擔心可能有間接的影響。 在這個例子中,你已經實現一個任務,用來在一個數字陣列中查詢一個數。如Fork/Join框架所推薦的,你將問題分解成更小的子問題。你只關心這個數的出現,所以當你找到它,你取消了其他任務。 以下截圖顯示這個例子執行的一部分: 5 參見

  • 在第5章,Fork/Join框架中的建立一個Fork/Join池指南