實現為fork/join框架生成自定義執行緒的ThreadFactory介面
實現為fork/join框架生成自定義執行緒的ThreadFactory介面
fork/join框架是Java9中最有趣的特性之一,它是Executor和ExecutorService介面的實現,能夠直接執行Callable和Runnable任務,無需管理執行它們的執行緒。
此執行器目標是執行可以劃分為更小部分的任務,其主要組成部分如下:
- 由ForkJoinTask類實現的特殊任務。
- 提供兩個操作,用於將任務劃分為子任務(fork操作)和等待這些子任務結束(join操作)。
- 它是一種命名為“工作竊取”的演算法,優化執行緒池的使用。當任務等待其子任務時,正在執行此任務的執行緒會去執行其它任務。
fork/join框架的主類是ForkJoinPool類。在內部包括如下兩個元素:
- 等待被執行的任務佇列
- 執行任務的執行緒池
ForkJoinWorkerThread向Thread類新增新方法,比如建立執行緒時執行的onStart()方法和清理執行緒使用資源時呼叫的onTermination()方法。ForkJoinPool類使用ForkJoinWorkerThreadFactory介面的實現來建立其使用的工作執行緒。
本節將學習如何實現在ForkJoinPool類中使用的定製工作執行緒,以及在繼承ForkJoinPool類和實現ForkJoinWorkerThreadFactory介面的工廠中如何使用此執行緒。
準備工作
本範例通過Eclipse開發工具實現。如果使用諸如NetBeans的開發工具,開啟並建立一個新的Java專案。
實現過程
通過如下步驟實現範例:
-
建立名為MyWorkerThread的類,繼承ForkJoinWorkerThread類:
public class MyWorkerThread extends ForkJoinWorkerThread{
-
宣告和建立ThreadLocal屬性,由名為taskCounter的Integer類引數化:
private final static ThreadLocal<Integer> taskCounter=
-
實現類建構函式:
protected MyWorkerThread(ForkJoinPool pool) { super(pool); }
-
重寫onStart()方法,在其父類上呼叫此方法輸出資訊到控制條,且設定這個執行緒的taskCounter屬性值為零:
@Override protected void onStart() { super.onStart(); System.out.printf("MyWorkerThread %d: Initializing task counter.\n", getId()); taskCounter.set(0); }
-
重寫onTermination()方法,輸出此執行緒的taskCounter屬性值到控制檯:
@Override protected void onTermination(Throwable exception) { System.out.printf("MyWorkerThread %d: %d\n", getId(),taskCounter.get()); super.onTermination(exception); }
-
實現addTask()方法,遞增taskCounter屬性值:
public void addTask(){ taskCounter.set(taskCounter.get() + 1);; } }
-
建立名為MyWorkerThreadFactory的類,實現ForkJoinWorkerThreadFactory介面。實現newThread()方法,建立和返回MyWorkerThread物件:
public class MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory{ @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new MyWorkerThread(pool); } }
-
建立名為MyRecursiveTask的類,繼承Integer類引數化的RecursiveTask類:
public class MyRecursiveTask extends RecursiveTask<Integer> {
-
定義名為array的私有int陣列:
private int array[];
-
定義名為start和end的兩個私有int屬性:
private int start, end;
-
實現類建構函式,初始化這些屬性:
public MyRecursiveTask(int array[],int start, int end) { this.array=array; this.start=start; this.end=end; }
-
實現compute()方法,將陣列中開始和結束位置之間所有元素相加。首先將正在執行任務的執行緒轉換成MyWorkerThread物件,並且使用addTask()為此執行緒遞增任務的計數:
@Override protected Integer compute() { Integer ret; MyWorkerThread thread=(MyWorkerThread)Thread.currentThread(); thread.addTask();
-
如果陣列中開始和結束位置區間的元素數大於100,計算中間位置且建立兩個新的MyRecursiveTask任務分別處理前後兩部分。如果區間等於或小於100,計算開始和結束位置區間所有元素的和:
if (end-start>100) { int mid=(start+end)/2; MyRecursiveTask task1=new MyRecursiveTask(array,start,mid); MyRecursiveTask task2=new MyRecursiveTask(array,mid,end); invokeAll(task1,task2); ret=addResults(task1,task2); } else { int add=0; for (int i=start; i<end; i++) { add+=array[i]; } ret=add; }
-
設定執行緒休眠10毫秒,返回任務結果:
try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return ret; }
-
實現addResults()介面,計算並返回作為引數接收的兩個任務的結果之和:
private Integer addResults(MyRecursiveTask task1, MyRecursiveTask task2) { int value; try { value = task1.get().intValue()+task2.get().intValue(); } catch (InterruptedException e) { e.printStackTrace(); value=0; } catch (Exception e) { e.printStackTrace(); value=0; } return value; } }
-
通過建立名為Main的類,新增main()方法,實現本範例主類:
public class Main { public static void main(String[] args) throws Exception{
-
建立名為factory的MyWorkerThreadFactory物件:
MyWorkerThreadFactory factory=new MyWorkerThreadFactory();
-
建立名為pool的ForkJoinPool物件,傳遞之前建立的factory物件到建構函式:
ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);
-
建立100000個整數的陣列,初始化所有元素為1:
int array[]=new int[100000]; for (int i=0; i<array.length; i++){ array[i]=1; }
-
建立新的task物件,計算陣列中所有元素的和:
MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);
-
使用execute()方法傳送任務到執行緒池:
pool.execute(task);
-
使用join()方法等待任務結束:
task.join();
-
使用shutdown()方法關閉執行緒池:
pool.shutdown();
-
使用awaitTermination()方法等待執行器結束:
pool.awaitTermination(1, TimeUnit.DAYS);
-
使用get()方法,輸出任務結果到控制檯:
System.out.printf("Main: Result: %d\n",task.get());
-
輸出指明程式結束的資訊到控制檯:
System.out.printf("Main: End of the program\n"); } }
工作原理
fork/join框架使用的執行緒稱為工作執行緒。 Java提供ForkJoinWorkerThread類,此類繼承了Thread類並實現fork/join框架使用的工作執行緒。
本節實現了繼承ForkJoinWorkerThread類的MyWorkerThread類,並重寫ForkJoinWorkerThread類的兩個方法。範例目標是在每個工作執行緒中實現任務計數器,以便了解一個工作執行緒已經執行多少任務。通過ThreadLocal屬性實現計數器,這樣每個執行緒都將以透明的方式擁有自己的計數器。
重寫ForkJoinWorkerThread類的onStart()方法初始化任務計數器,當工作執行緒開始執行時呼叫此方法。還重寫了onTermination()方法輸出任務計數器的值到控制檯,當工作執行緒結束執行時呼叫此方法。此外,在MyWorkerThread類中實現了addTask()方法,用來遞增每個執行緒的任務計數器。
ForkJoinPool類像Java併發API中所有執行器一樣,使用工廠來建立執行緒。所以如果在ForkJoinPool類中使用MyWorkerThread執行緒,需要實現自定義的執行緒工廠。對於fork/join框架框架,這個工廠需要實現ForkJoinPool.ForkJoinWorkerThreadFactory類,此類只有一個方法來建立新的MyWorkerThread物件。
最後,只要使用已經建立的工廠初始化ForkJoinPool類,也就是在Main類中所做的,使用ForkJoinPool類建構函式。
下圖顯示本範例在控制檯輸出的部分執行資訊:
可以看到ForkJoinPool物件如何執行四個工作執行緒,以及每個執行緒執行多少個任務。
擴充套件學習
需要注意當執行緒正常結束或者丟擲異常時,才會呼叫ForkJoinWorkerThread類提供的onTermination()方法。此方法將Throwable物件作為引數接收,如果引數值為null,工作執行緒正常結束,但如果引數值不為空,執行緒丟擲異常,所以需要編寫必要的程式來處理這種情形。
更多關注
- 第五章“Fork/Join框架”中的“建立fork/join池”小節
- 第一章“執行緒管理”中的“工廠模式建立執行緒”小節