1. 程式人生 > >Fork/Join框架(三)加入任務的結果

Fork/Join框架(三)加入任務的結果

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛

加入任務的結果

Fork/Join框架提供了執行返回一個結果的任務的能力。這些任務的型別是實現了RecursiveTask類。這個類繼承了ForkJoinTask類和實現了執行者框架提供的Future介面。

在任務中,你必須使用Java API方法推薦的結構:

If (problem size < size){
tasks=Divide(task);
execute(tasks);
groupResults()
return result;
} else {
resolve problem;
return result;
}

如果這個任務必須解決一個超過預定義大小的問題,你應該將這個任務分解成更多的子任務,並且用Fork/Join框架來執行這些子任務。當這些子任務完成執行,發起的任務將獲得所有子任務產生的結果 ,對這些結果進行分組,並返回最終的結果。最終,當在池中執行的發起的任務完成它的執行,你將獲取整個問題地最終結果。

在這個指南中,你將學習如何使用Fork/Join框架解決這種問題,開發一個在文件中查詢單詞的應用程式。你將會實現以下兩種任務型別:

  • 一個文件任務,將在文件中的行集合中查詢一個單詞。
  • 一個行任務,將在文件的一部分資料中查詢一個單詞。

所有任務將返回單詞在文件的一部分中或行中出現的次數。

如何做…

根據以下這些步驟來實現這個例子:

1.建立一個Document類,它將產生用來模擬文件的字串的二維陣列。

public class Document {

2.建立一個帶有一些單詞的字串陣列。這個陣列將被用來生成字串二維陣列。

private String words[]={"the","hello","goodbye","packt", "java","thread","pool","random","class","main"};

3.實現generateDocument()方法。它接收以下引數:行數、每行的單詞數。這個例子返回一個字串二維陣列,來表示將要查詢的單詞。

public String[][] generateDocument(int numLines, int numWords,String word){

4.首先,建立生成這個文件必需的物件:字串二維物件和生成隨機數的Random物件。

int counter=0;
String document[][]=new String[numLines][numWords];
Random random=new Random();

5.用字串填充這個陣列。儲存在每個位置的字串是單詞陣列的隨機位置,統計這個程式將要在生成的陣列中查詢的單詞出現的次數。你可以使用這個值來檢查程式是否執行正確。

for (int i=0; i<numLines; i++){
for (int j=0; j<numWords; j++) {
int index=random.nextInt(words.length);
document[i][j]=words[index];
if (document[i][j].equals(word)){
counter++;
}
}
}

6.將單詞出現的次數寫入控制檯,並返回生成的二維陣列。

System.out.println("DocumentMock: The word appears "+counter+" times in the document");

return document;

7.建立一個DocumentTask類,指定它繼承RecursiveTask類,並引數化為Integer型別。該類將實現統計單詞在一組行中出現的次數的任務。

public class DocumentTask extends RecursiveTask<Integer> {

8.宣告一個私有的String型別的二維陣列document,兩個私有的int型別的屬性名為start和end,一個私有的String型別的屬性名為word。

private String document[][];
private int start, end;
private String word;

9.實現這個類的構造器,用來初始化這些屬性。

public DocumentTask (String document[][], int start, int end, String word){
this.document=document;
this.start=start;
this.end=end;
this.word=word;
}

10.實現compute()方法。如果屬性end和start的差小於10,那麼這個任務統計單詞位於行在呼叫processLines()方法的這些位置中出現的次數。

@Override
protected Integer compute() {
int result;
if (end-start<10){
result=processLines(document, start, end, word);

11.否則,用兩個物件分解行組,建立兩個新的DocumentTask物件用來處理這兩個組,並且在池中使用invokeAll()方法來執行它們。

} else {
int mid=(start+end)/2;
DocumentTask task1=new DocumentTask(document,start,mid,word);
DocumentTask task2=new DocumentTask(document,mid,end,word);
invokeAll(task1,task2);

12.然後,使用groupResults()方法將這兩個任務返回的結果相加。最後,返回這個任務統計的結果。

try {
result=groupResults(task1.get(),task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;

13.實現processLines()方法。它接收以下引數:字串二維陣列、start屬性、end屬性、任務將要查詢的word屬性。

private Integer processLines(String[][] document, int start, int
end,String word) {

14.對於任務要處理的每行,建立LineTask物件來處理整行,並且將它們儲存在任務數列中。

List&lt;LineTask&gt; tasks=new ArrayList&lt;LineTask&gt;();
for (int i=start; i&lt;end; i++){
LineTask task=new LineTask(document[i], 0, document[i].
length, word);
tasks.add(task);
}

15.在那個數列中使用invokeAll()執行所有任務。

invokeAll(tasks);

16.合計所有這些任務返回的值,並返回這個結果。

int result=0;
for (int i=0; i&lt;tasks.size(); i++) {
LineTask task=tasks.get(i);
try {
result=result+task.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return new Integer(result);

17.實現groupResults()方法。它相加兩個數,並返回這個結果。

private Integer groupResults(Integer number1, Integer number2) {
Integer result;
result=number1+number2;
return result;
}

18.建立LineTask類,指定它繼承RecursiveTask類,並引數化為Integer型別。這個類將實現統計單詞在一行中出現的次數的任務。

public class LineTask extends RecursiveTask&lt;Integer&gt;{

19.宣告這個類的序列號版本UID。這個元素是必需的,因為RecursiveTask類的父類,ForkJoinTask類實現了Serializable介面。宣告一個私有的、String型別的屬性line,兩個私有的、int型別的屬性start和end,一個私有的、String型別的屬性word。

private static final long serialVersionUID = 1L;
private String line[];
private int start, end;
private String word;

20.實現這個類的構造器,初始化這些屬性。

public LineTask(String line[], int start, int end, String word)
{
this.line=line;
this.start=start;
this.end=end;
this.word=word;
}

21.實現這個類的compute()方法。如果屬性end和start之差小於100,這個任務在行中由start和end屬性使用count()方法決定的片斷中查詢單詞。

@Override
protected Integer compute() {
Integer result=null;
if (end-start&lt;100) {
result=count(line, start, end, word);

22.否則,將行中的單片語分成兩部分,建立兩個新的LineTask物件來處理這兩個組,在池中使用invokeAll()方法執行它們。

} else {
int mid=(start+end)/2;
LineTask task1=new LineTask(line, start, mid, word);
LineTask task2=new LineTask(line, mid, end, word);
invokeAll(task1, task2);

23.然後,使用groupResults()方法將這兩個任務返回的值相加。最後,返回這個任務計算的結果。

try {
result=groupResults(task1.get(),task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;

24.實現count()方法。它接收以下引數:完整行的字串陣列、start屬性、end屬性、任務將要查詢的word屬性。

private Integer count(String[] line, int start, int end, String
word) {

25.比較這個任務將要查詢的word屬性中的在start和end屬性之間的位置的單詞,如果它們相等,則增加count變數。

int counter;
counter=0;
for (int i=start; i&lt;end; i++){
if (line[i].equals(word)){

counter++;
}
}

26.為了顯示示例的執行,令任務睡眠10毫秒。

try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}

27.返回counter變數的值。

return counter;

28.實現groupResults()方法。它合計兩個數的值,並返回這個結果。

private Integer groupResults(Integer number1, Integer number2) {
Integer result;
result=number1+number2;
return result;
}

29.實現示例的主類,通過建立Main類,並實現main()方法。

public class Main{
public static void main(String[] args) {

30.使用DocumentMock類,建立一個帶有100行,每行1000個單詞的Document。

DocumentMock mock=new DocumentMock();
String[][] document=mock.generateDocument(100, 1000, "the");

31.建立一個新的DocumentTask物件,用來更新整個文件的產品。引數start值為0,引數end值為100。

DocumentTask task=new DocumentTask(document, 0, 100, &quot;the&quot;);

32.使用無參構造器建立一個ForkJoinPool物件,在池中使用execute()方法執行這個任務。

ForkJoinPool pool=new ForkJoinPool();
pool.execute(task);

33.實現一個程式碼塊,用來顯示關於池變化的資訊。每秒向控制檯寫入池的某些引數的值,直到任務完成它的執行。

do {
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n",pool.getParallelism());
System.out.printf("Main: Active Threads: %d\n",pool.getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n",pool.getStealCount());
System.out.printf("******************************************\n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());

34.使用shutdown()方法關閉這個池。

pool.shutdown();

35.使用awaitTermination()方法等待任務的結束。

try {
System.out.printf("Main: The word appears %d in the document",task.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

36.列印單詞在文件中出現的次數。檢查這個數是否與DocumentMock類中寫入的數一樣。

try {
System.out.printf("Main: The word appears %d in the document",task.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

它是如何工作的…

在這個示例中,你已經實現了兩種不同的任務:

  • DocumentTask類:這個類的任務將處理由start和end屬性決定的文件中的行組。如果這個行組的大小小於10,它為每行建立LineTask物件,並且當它們完成它們的執行時,它合計這些任務的結果,並返回這個合計值。如果這個任務要處理的行組大小不小於10,它將這個組分成兩個並建立兩個DocumentTask物件來處理這些新組。當這些任務完成它們的執行時,這個任務合計它們的結果,並返回這個合計值。
  • LineTask類:這個類的任務將處理文件中的一行的單片語。如果這個單片語小於10,這個任務直接在這個單片語中查詢單詞,並且返回這個單詞出現的次數。否則,它將這個單片語分成兩個並建立兩個LineTask物件來處理。當這些任務完成它們的執行,這個任務合計這些任務的結果並返回這個合計值。

在Main類中,你已經使用預設構造器一個ForkJoinPool物件,並且你在它裡面執行一個DocumentTask類,這個類將處理一個擁有100行,每行有1000個單詞的文件。這個任務將使用其他的DocumentTask物件和LineTask物件來分解這個問題,當所有任務完成它們的執行,你可以使用啟動任務來獲取單詞在整個文件中出現的總次數。由於任務返回一個結果,所以它們繼承RecursiveTask類。

為了獲取Task返回的結果,你已經使用了get()方法 。這個方法是在Future介面中宣告的,由RecursiveTask類實現的。

當你執行這個程式,你可以比較在控制檯中的第一行和最後一行。第一行是文件生成時計算的單詞出現的次數,最後一行是由Fork/Join任務計算的。

不止這些…

ForkJoinTask類提供其他的方法來完成一個任務的執行,並返回一個結果,這就是complete()方法。這個方法接收一個RecursiveTask類的引數化型別的物件,並且當join()方法被呼叫時,將這個物件作為任務的結果返回。 它被推薦使用在:提供非同步任務結果。

由於RecursiveTask類實現了Future介面,get()方法其他版本如下:

  • get(long timeout, TimeUnit unit):這個版本的get()方法,如果任務的結果不可用,在指定的時間內等待它。如果超時並且結果不可用,那麼這個方法返回null值。TimeUnit類是一個列舉類,它有以下常量:DAYS, HOURS,MICROSECONDS,MILLISECONDS, MINUTES, NANOSECONDS和SECONDS。

參見

  • 在第5章,Fork/Join框架中的建立一個Fork/Join池的指南
  • 在第8章,測試併發應用程式中的監控Fork/Join池的指南