1. 程式人生 > >定製併發類(七)實現ThreadFactory介面生成自定義的執行緒給Fork/Join框架

定製併發類(七)實現ThreadFactory介面生成自定義的執行緒給Fork/Join框架

宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González     譯者:許巧輝

實現ThreadFactory介面生成自定義的執行緒給Fork/Join框架

Fork/Join框架是Java7中最有趣的特徵之一。它是Executor和ExecutorService介面的一個實現,允許你執行Callable和Runnable任務而不用管理這些執行執行緒。

這個執行者面向執行能被拆分成更小部分的任務。主要元件如下:

  • 一個特殊任務,實現ForkJoinTask類
  • 兩種操作,將任務劃分成子任務的fork操作和等待這些子任務結束的join操作
  • 一個演算法,優化池中執行緒的使用的work-stealing演算法。當一個任務正在等待它的子任務(結束)時,它的執行執行緒將執行其他任務(等待執行的任務)。

ForkJoinPool類是Fork/Join的主要類。在它的內部實現,有如下兩種元素:

  • 一個儲存等待執行任務的列隊。
  • 一個執行任務的執行緒池

在這個指南中,你將學習如何實現一個在ForkJoinPool類中使用的自定義的工作者執行緒,及如何使用一個工廠來使用它。

準備工作…

這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。

如何做…

按以下步驟來實現的這個例子:

1.建立一個繼承ForkJoinWorkerThread類的MyWorkerThread類。

public class MyWorkerThread extends ForkJoinWorkerThread {

2.宣告和建立一個引數化為Integer類的ThreadLocal屬性,名為taskCounter。

private static ThreadLocal<Integer> taskCounter=new ThreadLocal<Integer>();

3.實現這個類的構造器。

protected MyWorkerThread(ForkJoinPool pool) {
super(pool);
}

4.重寫onStart()方法。呼叫父類的這個方法,寫入一條資訊到控制檯。設定當前執行緒的taskCounter屬性值為0。

@Override
protected void onStart() {
super.onStart();
System.out.printf("MyWorkerThread %d: Initializing task
counter.\n",getId());
taskCounter.set(0);
}

5.重寫onTermination()方法。寫入當前執行緒的taskCounter屬性值到控制檯。

@Override
protected void onTermination(Throwable exception) {
System.out.printf("MyWorkerThread %d:
%d\n",getId(),taskCounter.get());
super.onTermination(exception);
}

6.實現addTask()方法。遞增taskCounter屬性值。

public void addTask(){
int counter=taskCounter.get().intValue();
counter++;
taskCounter.set(counter);
}

7.建立一個實現ForkJoinWorkerThreadFactory介面的MyWorkerThreadFactory類。實現newThread()方法,建立和返回一個MyWorkerThread物件。

@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new MyWorkerThread(pool);
}
}

8.建立MyRecursiveTask類,它繼承一個引數化為Integer類的RecursiveTask類。

public class MyRecursiveTask extends RecursiveTask<Integer> {

9.宣告一個私有的、int型別的屬性array。

private int array[];

10.宣告兩個私有的、int型別的屬性start和end。

private int start, end;

11.實現這個類的構造器,初始化它的屬性。

public MyRecursiveTask(int array[],int start, int end) {
this.array=array;
this.start=start;
this.end=end;
}

12.實現compute()方法,用來合計陣列中在start和end位置之間的所有元素。首先,將執行這個任務的執行緒轉換成一個MyWorkerThread物件,然後使用addTask()方法來增長這個執行緒的任務計數器。

@Override
protected Integer compute() {
Integer ret;
MyWorkerThread thread=(MyWorkerThread)Thread.currentThread();
thread.addTask();
}

13.實現addResults()方法。計算和返回兩個任務(接收引數)的結果的總和。

private Integer addResults(Task task1, Task task2) {
int value;
try {
value = task1.get().intValue()+task2.get().intValue();
} catch (InterruptedException e) {
e.printStackTrace();
value=0;
} catch (ExecutionException e) {
e.printStackTrace();
value=0;
}

14.令這個執行緒睡眠10毫秒,然後返回任務的結果。

try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return value;
}

15.實現這個例子的主類,通過建立Main類,並實現main()方法。

public class Main {
public static void main(String[] args) throws Exception {

16.建立一個名為factory的MyWorkerThreadFactory物件。

MyWorkerThreadFactory factory=new MyWorkerThreadFactory();

17.建立一個名為pool的ForkJoinPool物件,將前面建立的factory物件作為引數傳給它的構造器。

ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);

18.建立一個大小為100000的整數陣列,將所有元素初始化為值1。

int array[]=new int[100000];
for (int i=0; i<array.length; i++){
array[i]=1;
}

19.建立一個新的Task物件,用來合計陣列中的所有元素。

MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);

20.使用execute()方法,將這個任務提交給池。

pool.execute(task);

21.使用join()方法,等待這個任務的結束。

task.join();

22.使用shutdown()方法,關閉這個池。

pool.shutdown();

23.使用awaitTermination()方法,等待這個執行者的結束。

pool.awaitTermination(1, TimeUnit.DAYS);

24.使用get()方法,將任務的結束寫入到控制檯。

System.out.printf("Main: Result: %d\n",task.get());

25.寫入一條資訊到控制檯,表明程式的結束。

System.out.printf("Main: End of the program\n");

它是如何工作的…

Fork/Join框架使用的執行緒叫工作者執行緒。Java包含繼承Thread類的ForkJoinWorkerThread類和使用Fork/Join框架實現工作者執行緒。

在這個指南中,你已實現了繼承ForkJoinWorkerThread類的MyWorkerThread類,並重寫這個類的兩個方法。你的目標是實現每個工作者執行緒的任務計數器,以至於你可以知道每個工作者執行緒執行多少個任務。你已經通過一個ThreadLocal屬性實現計數器。這樣,每個執行緒都擁有它自己的計數器,對於來你說是透明的。

你已重寫ForkJoinWorkerThread類的onStart()方法來實現任務的計數器。當工作者執行緒開始它的執行時,這個方法將被呼叫。你也重寫了onTermination()方法,將任務計數器的值寫入到控制檯。當工作者執行緒結束它的執行時,這個方法將被呼叫。你也在MyWorkerThread類中實現addTask()方法,用來增加每個執行緒的任務計數器。

對於ForkJoinPool類,與Java併發API中的所有執行者一樣,使用工廠來建立它。所以,如果你想在ForkJoinPool類中使用MyWorkerThread執行緒,你必須實現自己的執行緒工廠。對於Fork/Join框架,這個工廠必須實現ForkJoinPool.ForkJoinWorkerThreadFactory類。為此,你已實現MyWorkerThreadFactory類。這個類只有一個用來建立一個新的MyWorkerThread物件的方法。

最後,你只要使用已建立的工廠來初始化ForkJoinPool類。你已在Main類中通過使用ForkJoinPool的構造器實現了。

以下截圖顯示了這個程式的部分輸出:

4

你可以看出ForkJoinPool物件如何執行4個工作者執行緒及每個工作者執行緒執行多少個任務。

不止這些…

考慮一下,當一個執行緒正常結束或丟擲一個Exception異常時,呼叫的ForkJoinWorkerThread提供的onTermination()方法。這個方法接收一個Throwable物件作為引數。如果這個引數值為null時,表明這個工作者執行緒正常結束。但是,如果這個引數的值不為null,表明這個執行緒丟擲一個異常。你必須包含必要的程式碼來處理這種情況。

參見