1. 程式人生 > >(拿來主義) 多線程 | Java的Fork/Join任務

(拿來主義) 多線程 | Java的Fork/Join任務

似的 sys 最大並發數 超過 result com 一個 出了 res

轉載自https://www.liaoxuefeng.com/article/001493522711597674607c7f4f346628a76145477e2ff82000

當我們需要執行大量的小任務時,有經驗的Java開發人員都會采用線程池來高效執行這些小任務。然而,有一種任務,例如,對超過1000萬個元素的數組進行排序,這種任務本身可以並發執行,但如何拆解成小任務需要在任務執行的過程中動態拆分。這樣,大任務可以拆成小任務,小任務還可以繼續拆成更小的任務,最後把任務的結果匯總合並,得到最終結果,這種模型就是Fork/Join模型。

Java7引入了Fork/Join框架,我們通過RecursiveTask這個類就可以方便地實現Fork/Join模式。

例如,對一個大數組進行並行求和的RecursiveTask,就可以這樣編寫:

class SumTask extends RecursiveTask<Long> {

    static final int THRESHOLD = 100;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
    this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    
protected Long compute() { if (end - start <= THRESHOLD) { // 如果任務足夠小,直接計算: long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(String.format(
"compute %d~%d = %d", start, end, sum)); return sum; } // 任務太大,一分為二: int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } }

編寫這個Fork/Join任務的關鍵在於,在執行任務的compute()方法內部,先判斷任務是不是足夠小,如果足夠小,就直接計算並返回結果(註意模擬了1秒延時),否則,把自身任務一拆為二,分別計算兩個子任務,再返回兩個子任務的結果之和。

最後寫一個main()方法測試:

public static void main(String[] args) throws Exception {
    // 創建隨機數組成的數組:
    long[] array = new long[400];
    fillRandom(array);
    // fork/join task:
    ForkJoinPool fjp = new ForkJoinPool(4); // 最大並發數4
    ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
    long startTime = System.currentTimeMillis();
    Long result = fjp.invoke(task);
    long endTime = System.currentTimeMillis();
    System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}

關鍵代碼是fjp.invoke(task)來提交一個Fork/Join任務並發執行,然後獲得異步執行的結果。

我們設置任務的最小閥值是100,當提交一個400大小的任務時,在4核CPU上執行,會一分為二,再二分為四,每個最小子任務的執行時間是1秒,由於是並發4個子任務執行,整個任務最終執行時間大約為1秒。

新手在編寫Fork/Join任務時,往往用搜索引擎搜到一個例子,然後就照著例子寫出了下面的代碼:

protected Long compute() {
    if (任務足夠小?) {
        return computeDirect();
    }
    // 任務太大,一分為二:
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // 分別對子任務調用fork():
    subtask1.fork();
    subtask2.fork();
    // 合並結果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}

很遺憾,這種寫法是錯!誤!的!這樣寫沒有正確理解Fork/Join模型的任務執行邏輯。

JDK用來執行Fork/Join任務的工作線程池大小等於CPU核心數。在一個4核CPU上,最多可以同時執行4個子任務。對400個元素的數組求和,執行時間應該為1秒。但是,換成上面的代碼,執行時間卻是兩秒。

這是因為執行compute()方法的線程本身也是一個Worker線程,當對兩個子任務調用fork()時,這個Worker線程就會把任務分配給另外兩個Worker,但是它自己卻停下來等待不幹活了!這樣就白白浪費了Fork/Join線程池中的一個Worker線程,導致了4個子任務至少需要7個線程才能並發執行。

打個比方,假設一個酒店有400個房間,一共有4名清潔工,每個工人每天可以打掃100個房間,這樣,4個工人滿負荷工作時,400個房間全部打掃完正好需要1天。

Fork/Join的工作模式就像這樣:首先,工人甲被分配了400個房間的任務,他一看任務太多了自己一個人不行,所以先把400個房間拆成兩個200,然後叫來乙,把其中一個200分給乙。

緊接著,甲和乙再發現200也是個大任務,於是甲繼續把200分成兩個100,並把其中一個100分給丙,類似的,乙會把其中一個100分給丁,這樣,最終4個人每人分到100個房間,並發執行正好是1天。

如果換一種寫法:

// 分別對子任務調用fork():
subtask1.fork();
subtask2.fork();

這個任務就分!錯!了!

比如甲把400分成兩個200後,這種寫法相當於甲把一個200分給乙,把另一個200分給丙,然後,甲成了監工,不幹活,等乙和丙幹完了他直接匯報工作。乙和丙在把200分拆成兩個100的過程中,他倆又成了監工,這樣,本來只需要4個工人的活,現在需要7個工人才能1天內完成,其中有3個是不幹活的。

其實,我們查看JDK的invokeAll()方法的源碼就可以發現,invokeAll的N個任務中,其中N-1個任務會使用fork()交給其它線程執行,但是,它還會留一個任務自己執行,這樣,就充分利用了線程池,保證沒有空閑的不幹活的線程。

(拿來主義) 多線程 | Java的Fork/Join任務