定製在fork/join框架中執行的任務
定製在fork/join框架中執行的任務
Executor框架將任務建立和執行分開,使用此框架,只需要實現Runnable物件和使用Executor物件。只要把Runnable任務傳送給執行器,它就會建立、管理和結束必要的執行緒來執行這些任務。
Java 9在fork/join框架(Java 7中引入)中提供一種特殊的執行器,這個框架是用來解決如何使用分治技術將問題分解成更小的任務而設計的。在任務中,檢查想要解決的問題的規模,如果問題規模大於設定的大小,則將問題劃分為兩個或多個任務,並使用此框架執行它們。如果問題規模小於設定的大小,則在任務中直接解決問題,且可選擇的返回結果。fork/join框架實現了工作竊取演算法,用來改善這類問題的整體效能。
fork/join框架的主類是ForkJoinPool類,在內部有如下兩個元素:
- 等待被執行的任務佇列
- 執行任務的執行緒池
預設情況下,ForkJoinPool類執行的任務是ForkJoinTask類的物件。也可以傳送Runnable和Callable物件到ForkJoinPool類,但它們不能充分利用fork/join框架的所有優點。通常,把forkjoinask類的兩個子類中的一個傳送到ForkJoinPool物件:
- RecursiveAction:如果任務不返回結果
- RecursiveTask:如果任務返回結果
本節將通過實現繼承ForkJoinTask類的任務為fork/join框架實現自定義的任務。將要實現的任務度量其執行時間並將結果輸出到控制檯,以便控制其執行進度。還可以實現自定義fork/join任務,輸出日誌資訊來獲取任務中使用的資源,或者後處理任務結果。
實現過程
通過如下步驟實現範例:
-
建立名為MyWorkerTask的類,指定其繼承Void型別引數化的ForkJoinTask類:
public class MyWorkerThread extends ForkJoinWorkerThread{
-
定義名為name的私有String屬性,儲存任務的名稱:
private final static ThreadLocal<Integer> taskCounter= new ThreadLocal<Integer>();
-
實現類建構函式,初始化屬性:
protected MyWorkerThread
-
實現getRawResult()方法,這是ForkJoinTask類的一個抽象方法。由於MyWorkerTask任務不會返回任何結果,所以此方法必須返回null:
@Override public Void getRawResult() { return null; }
-
實現setRawResult()方法,這是ForkJoinTask類的另一個抽象方法。由於MyWorkerTask任務不會返回任何結果,所以不用寫程式:
@Override protected void setRawResult(Void value) { }
-
實現exec()方法,這是任務的main方法。這裡將任務的邏輯委託給compute()方法,計算此方法的執行時間,並輸出到控制檯:
@Override protected boolean exec() { Date startDate=new Date(); compute(); Date finishDate=new Date(); long diff=finishDate.getTime()-startDate.getTime(); System.out.printf("MyWorkerTask: %s : %d Milliseconds to complete.\n",name,diff); return true; }
-
實現getName()方法,返回任務名稱:
public String getName(){ return name; }
-
宣告抽象方法compute(),如前所述,此方法將實現任務邏輯,且必須通過MyWorkerTask類的子類來實現:
protected abstract void compute(); }
-
建立名為Task的類,繼承MyWorkerTask類:
public class Task extends MyWorkerTask{
-
宣告名為array的私有int型別陣列:
private int array[]; private int start, end;
-
實現類建構函式,初始化屬性:
public Task(String name, int array[], int start, int end){ super(name); this.array=array; this.start=start; this.end=end; }
-
實現compute()方法,此方法增加陣列中由開始和結束屬性決定的元素塊。如果元素塊中超過100個元素,則將元素塊分成兩個部分,並建立兩個Task物件來處理每個部分。是一個invokeAll()方法傳送任務到執行緒池:
@Override protected void compute() { if (end-start>100){ int mid=(end+start)/2; Task task1=new Task(this.getName()+"1",array,start,mid); Task task2=new Task(this.getName()+"2",array,mid,end); invokeAll(task1,task2);
-
如果元素塊中少於100個元素,使用for迴圈遞增所有元素:
} else { for (int i=start; i<end; i++) { array[i]++; }
-
最後,設定執行任務的執行緒休眠50毫秒:
try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } }
-
接下來,通過建立名為Main的類,新增main()方法,實現本範例主類:
public class Main { public static void main(String[] args) throws Exception {
-
建立包含10000個元素的int陣列:
int array[]=new int[10000];
-
建立名為pool的ForkJoinPool物件:
ForkJoinPool pool=new ForkJoinPool();
-
建立Task物件增加陣列中所有元素,建構函式的引數被指定為任務的名稱、陣列物件以及0和10000的值,表示此任務需要處理整個陣列:
Task task=new Task("Task",array,0,array.length);
-
使用execute()方法傳送任務到執行緒池:
pool.invoke(task);
-
使用shutdown()方法關閉執行緒池:
pool.shutdown();
-
輸出指明程式結束的資訊到控制檯:
System.out.printf("Main: End of the program.\n"); } }
工作原理
本節實現了繼承ForkJoinTask類的MyWorkerTask類,這是自定義的基類,用來實現能夠在ForkJoinPool 執行器中執行的任務,並且由於這個執行器是工作竊取演算法實現,能夠利用executor的所有優點。此類相當於RecursiveAction和RecursiveTask類。
當繼承ForkJoinTask類時,需要實現如下三個方法:
- setRawResult():此方法用來確定任務結果,由於任務沒有返回任何結果,此方法保持為空。
- getRawResult():此方法用來返回任務結果,由於任務沒有返回任務結果,此方法返回null。
- exec():此方法實現任務邏輯,本範例中,將邏輯委託給抽象compute()方法(作為RecursiveAction和RecursiveTask類)。但在exec()方法中,計算此方法的執行時間,並輸出到控制檯:
最後,在範例主類中,建立10000個元素的陣列、ForkJoinPool執行器和Task物件來處理整個陣列。執行程式,將看到執行不同的任務是如何輸出它們的執行時間到控制檯。
更多關注
- 第五章“Fork/Join框架”中的“建立fork/join池”小節