1. 程式人生 > >Fork/Join框架簡單介紹與使用

Fork/Join框架簡單介紹與使用

為了偷個懶,我就直接將我自己寫程式的註釋拿過來貼上去了。

/**  * @author   * Fork/join框架是Java7提供的一個用於並行任務執行的框架,將一個大任務分解為若干個小任務,再彙總每個小任務  * 通常我們使用fork/join框架,需先建立一個任務,這裡涉及的類是:ForkJoinTask,但實際我們都採用繼承該類的子類來  * 實現建立任務這個功能。  * 該框架提供的子類:  * ①RecursiveAction  用於沒有返回結果的任務  * ②RecursiveTask    用於有返回結果的任務  * 當然這些子任務必須由ForkJionPool來執行,這個框架池由ForkJoinTask陣列和ForkJoinWorkerThread陣列  * 組成,ForkJoinTask負責將存放程式交給框架池,ForkJoinWorkerThread則負責執行這些任務  * 當我們呼叫ForkJoinTask的fork方法時,程式會將任務放在ForkJoinWorkerThread的pushTask的workQueue  * 中,非同步地執行這個任務,然後立即返回結果。  *   *直接上個例子,需求是:1+2+3+4+5+6+7+8+9+10的結果  *使用每個任務的首先要考慮的是如何分割,分割成幾個任務,上述例子我們希望每個子任務最多兩個數相加,那麼我們設定的閾值  *應該就是2,由於是4個數相加,所以ForkJoin框架會把這個大任務fork成兩個子任務,最後再將結果join  *  *首先明確因為是有結果的任務,所以必須繼承RecursiveTask  *  *另外我在網上看到有人這樣解釋Fork/Join框架:Fork/Join是Java7原生的多執行緒並行處理框架,類似hadoop提供的  *mapreduce框架,只是mapreduce的任務可以針對叢集內的所有計算節點,可以充分利用叢集的能力完成計算任務。  *Fork/Join框架更加類似於單機版的MapReduce(這裡提到下AKKA基於Actor模型的框架,大家有時間可以瞭解下)。  *Fork/Join針對具有明顯可以進行任務分割特性需求的實現。  *  *Fork/Join框架使用了工作竊取的思想(work-stealing)演算法從其他佇列中竊取任務來執行。通過遞迴把問題劃分為子任務  *然後並行執行這些子任務,等所有的子任務都結束的時候再合併這種方式來支援平行計算。  *  */

public class CountTask extends RecursiveTask<Integer>{

    private int start;     private int end;     private static final int THREAD_HOLD = 5;          public CountTask(int start,int end){         this.start = start;         this.end = end;     }

    @Override     protected Integer compute() {         int sum = 0;         boolean canCompute = (end -start) <= THREAD_HOLD;         if(canCompute){             for(int i=start; i<=end; i++){                 sum += i;             }         }         else{             int middle = (start +end)/2;             CountTask left = new CountTask(start,middle);             CountTask right = new CountTask(middle+1,end);                          //執行子任務             left.fork();             right.fork();             //獲取子任務的結果             int lResult = left.join();             int rResule = right.join();                          sum = lResult + rResule;         }         return sum;     }      //測試程式入口     public static void main(String[] args){         ForkJoinPool pool = new ForkJoinPool();         CountTask task = new CountTask(1,10);         Future<Integer> result = pool.submit(task);     //    System.out.printf("執行緒池",pool.getActiveThreadCount());         try{             System.out.println(result.get());         }catch(Exception e){             e.printStackTrace();         }     }           }

該例子匯入的類都是java.util.concurrent 這個包下的。

另外推薦一個更全的介紹該框架的部落格地址,個人覺得推薦的地址部落格寫得算是目前最全的。ifeve.com/talk-concurrency-forkjoin/  網上好的文章還是要積極去推薦的。O(∩_∩)O