定製併發類(七)實現ThreadFactory介面生成自定義的執行緒給Fork/Join框架
宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González 譯者:許巧輝
實現ThreadFactory介面生成自定義的執行緒給Fork/Join框架
Fork/Join框架是Java7中最有趣的特徵之一。它是Executor和ExecutorService介面的一個實現,允許你執行Callable和Runnable任務而不用管理這些執行執行緒。
這個執行者面向執行能被拆分成更小部分的任務。主要元件如下:
- 一個特殊任務,實現ForkJoinTask類
- 兩種操作,將任務劃分成子任務的fork操作和等待這些子任務結束的join操作
- 一個演算法,優化池中執行緒的使用的work-stealing演算法。當一個任務正在等待它的子任務(結束)時,它的執行執行緒將執行其他任務(等待執行的任務)。
ForkJoinPool類是Fork/Join的主要類。在它的內部實現,有如下兩種元素:
- 一個儲存等待執行任務的列隊。
- 一個執行任務的執行緒池
在這個指南中,你將學習如何實現一個在ForkJoinPool類中使用的自定義的工作者執行緒,及如何使用一個工廠來使用它。
準備工作…
這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。
如何做…
按以下步驟來實現的這個例子:
1.建立一個繼承ForkJoinWorkerThread類的MyWorkerThread類。
public class MyWorkerThread extends ForkJoinWorkerThread {
2.宣告和建立一個引數化為Integer類的ThreadLocal屬性,名為taskCounter。
private static ThreadLocal<Integer> taskCounter=new ThreadLocal<Integer>();
3.實現這個類的構造器。
protected MyWorkerThread(ForkJoinPool pool) { super(pool); }
4.重寫onStart()方法。呼叫父類的這個方法,寫入一條資訊到控制檯。設定當前執行緒的taskCounter屬性值為0。
@Override protected void onStart() { super.onStart(); System.out.printf("MyWorkerThread %d: Initializing task counter.\n",getId()); taskCounter.set(0); }
5.重寫onTermination()方法。寫入當前執行緒的taskCounter屬性值到控制檯。
@Override protected void onTermination(Throwable exception) { System.out.printf("MyWorkerThread %d: %d\n",getId(),taskCounter.get()); super.onTermination(exception); }
6.實現addTask()方法。遞增taskCounter屬性值。
public void addTask(){ int counter=taskCounter.get().intValue(); counter++; taskCounter.set(counter); }
7.建立一個實現ForkJoinWorkerThreadFactory介面的MyWorkerThreadFactory類。實現newThread()方法,建立和返回一個MyWorkerThread物件。
@Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new MyWorkerThread(pool); } }
8.建立MyRecursiveTask類,它繼承一個引數化為Integer類的RecursiveTask類。
public class MyRecursiveTask extends RecursiveTask<Integer> {
9.宣告一個私有的、int型別的屬性array。
private int array[];
10.宣告兩個私有的、int型別的屬性start和end。
private int start, end;
11.實現這個類的構造器,初始化它的屬性。
public MyRecursiveTask(int array[],int start, int end) { this.array=array; this.start=start; this.end=end; }
12.實現compute()方法,用來合計陣列中在start和end位置之間的所有元素。首先,將執行這個任務的執行緒轉換成一個MyWorkerThread物件,然後使用addTask()方法來增長這個執行緒的任務計數器。
@Override protected Integer compute() { Integer ret; MyWorkerThread thread=(MyWorkerThread)Thread.currentThread(); thread.addTask(); }
13.實現addResults()方法。計算和返回兩個任務(接收引數)的結果的總和。
private Integer addResults(Task task1, Task task2) { int value; try { value = task1.get().intValue()+task2.get().intValue(); } catch (InterruptedException e) { e.printStackTrace(); value=0; } catch (ExecutionException e) { e.printStackTrace(); value=0; }
14.令這個執行緒睡眠10毫秒,然後返回任務的結果。
try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return value; }
15.實現這個例子的主類,通過建立Main類,並實現main()方法。
public class Main { public static void main(String[] args) throws Exception {
16.建立一個名為factory的MyWorkerThreadFactory物件。
MyWorkerThreadFactory factory=new MyWorkerThreadFactory();
17.建立一個名為pool的ForkJoinPool物件,將前面建立的factory物件作為引數傳給它的構造器。
ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);
18.建立一個大小為100000的整數陣列,將所有元素初始化為值1。
int array[]=new int[100000]; for (int i=0; i<array.length; i++){ array[i]=1; }
19.建立一個新的Task物件,用來合計陣列中的所有元素。
MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);
20.使用execute()方法,將這個任務提交給池。
pool.execute(task);
21.使用join()方法,等待這個任務的結束。
task.join();
22.使用shutdown()方法,關閉這個池。
pool.shutdown();
23.使用awaitTermination()方法,等待這個執行者的結束。
pool.awaitTermination(1, TimeUnit.DAYS);
24.使用get()方法,將任務的結束寫入到控制檯。
System.out.printf("Main: Result: %d\n",task.get());
25.寫入一條資訊到控制檯,表明程式的結束。
System.out.printf("Main: End of the program\n");
它是如何工作的…
Fork/Join框架使用的執行緒叫工作者執行緒。Java包含繼承Thread類的ForkJoinWorkerThread類和使用Fork/Join框架實現工作者執行緒。
在這個指南中,你已實現了繼承ForkJoinWorkerThread類的MyWorkerThread類,並重寫這個類的兩個方法。你的目標是實現每個工作者執行緒的任務計數器,以至於你可以知道每個工作者執行緒執行多少個任務。你已經通過一個ThreadLocal屬性實現計數器。這樣,每個執行緒都擁有它自己的計數器,對於來你說是透明的。
你已重寫ForkJoinWorkerThread類的onStart()方法來實現任務的計數器。當工作者執行緒開始它的執行時,這個方法將被呼叫。你也重寫了onTermination()方法,將任務計數器的值寫入到控制檯。當工作者執行緒結束它的執行時,這個方法將被呼叫。你也在MyWorkerThread類中實現addTask()方法,用來增加每個執行緒的任務計數器。
對於ForkJoinPool類,與Java併發API中的所有執行者一樣,使用工廠來建立它。所以,如果你想在ForkJoinPool類中使用MyWorkerThread執行緒,你必須實現自己的執行緒工廠。對於Fork/Join框架,這個工廠必須實現ForkJoinPool.ForkJoinWorkerThreadFactory類。為此,你已實現MyWorkerThreadFactory類。這個類只有一個用來建立一個新的MyWorkerThread物件的方法。
最後,你只要使用已建立的工廠來初始化ForkJoinPool類。你已在Main類中通過使用ForkJoinPool的構造器實現了。
以下截圖顯示了這個程式的部分輸出:
你可以看出ForkJoinPool物件如何執行4個工作者執行緒及每個工作者執行緒執行多少個任務。
不止這些…
考慮一下,當一個執行緒正常結束或丟擲一個Exception異常時,呼叫的ForkJoinWorkerThread提供的onTermination()方法。這個方法接收一個Throwable物件作為引數。如果這個引數值為null時,表明這個工作者執行緒正常結束。但是,如果這個引數的值不為null,表明這個執行緒丟擲一個異常。你必須包含必要的程式碼來處理這種情況。
參見