1. 程式人生 > >Fork/Join 型執行緒池與 Work-Stealing 演算法

Fork/Join 型執行緒池與 Work-Stealing 演算法

JDK 1.7 時,標準類庫添加了 ForkJoinPool,作為對 Fork/Join 型執行緒池的實現。Fork 在英文中有 分叉 的意思,而 Join有 合併 的意思。ForkJoinPool 的功能也是如此:Fork 將大任務分叉為多個小任務,然後讓小任務執行,Join 是獲得小任務的結果,然後進行合併,將合併的結果作為大任務的結果 —— 並且這會是一個遞迴的過程 —— 因為任務如果足夠大,可以將任務多級分叉直到任務足夠小。

由此可見,ForkJoinPool 可以滿足 並行 地實現 分治演算法(Divide-and-Conquer) 的需要。

ForkJoinPool 的類圖如下:

可以看到 ForkJoinPool 實現了 ExecutorService 介面,所以首先 ForkJoinPool 也是一個 ExecutorService (執行緒池)。因而 Runnable 和 Callable 型別的任務,ForkJoinPool 也可以通過 submitinvokeAll 和 invokeAny 等方法來執行。但是標準類庫還為 ForkJoinPool 定義了一種新的任務,它就是 ForkJoinTask<V>

ForkJoinTask 類圖:

ForkJoinTask<V> 用來專門定義 Fork/Join 型任務 —— 完成將大任務分割為小任務以及合併結果的工作。一般我們不需要直接使用 ForkJoinTask<V>

,而是通過繼承它的子類 RecursiveAction 和 RecursiveTask 並實現對應的抽象方法 —— compute ,來定義我們自己的任務。其中,RecursiveAction 是不帶返回值的 Fork/Join 型任務,所以使用此類任務並不產生結果,也就不涉及到結果的合併;而 RecursiveTask 是帶返回值的 Fork/Join 型任務,使用此類任務的話,在任務結束前,我們需要進行結果的合併。其中,通過 ForkJoinTask<V> 的 fork 方法,我們可以產生子任務並執行;通過 join 方法,我們可以獲得子任務的結果。

ForkJoinPool 可以使用三種方法用來執行 ForkJoinTask

invoke 方法:

public <T> T invoke(ForkJoinTask<T> task) 

invoke 方法用來執行一個帶返回值的任務(通常繼承自RecursiveTask),並且該方法是阻塞的,直到任務執行完畢,該方法才會停止阻塞並返回任務的執行結果。

submit 方法:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) 

除了從 ExecutorService 繼承的 submit 方法外,ForkJoinPool 還定義了用來執行 ForkJoinTask 的 submit 方法 —— 一般該 submit 方法用來執行帶返回值的ForkJoinTask(通常繼承自RecursiveTask)。該方法是非阻塞的,呼叫之後將任務提交給 ForkJoinPool 去執行便立即返回,返回的便是已經提交到 ForkJoinPool 去執行的 task —— 由類圖可知 ForkJoinTask 實現了 Future 介面,所以可以直接通過 task 來和已經提交的任務進行互動。

execute 方法:

public void execute(ForkJoinTask<?> task)

除了從 Executor 獲得的 execute 方法外,ForkJoinPool 也定義了用來執行ForkJoinTask 的 execute 方法 —— 一般該 execute 方法用來執行不帶返回值的ForkJoinTask(通常繼承自RecursiveAction) ,該方法同樣是非阻塞的。

現在讓我們來實踐下 ForkJoinPool 的功能:計算 π 的值。計算 π 的值有一個通過多項式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多項式的項數越多,計算出的 π 的值越精確。

首先我們定義用來估算 π 的 PiEstimateTask

class PiEstimateTask extends RecursiveTask<Double> {

    private final long begin;
    private final long end;
    private final long threshold; // 分割任務的臨界值

    public PiEstimateTask(long begin, long end, long threshold) {
        this.begin = begin;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Double compute() {  // 實現 compute 方法
        if (end - begin <= threshold) {  // 臨界值之下,不再分割,直接計算

            int sign; // 符號,多項式中偶數位取 1,奇數位取 -1(位置從 0 開始)
            double result = 0.0;
            
            for (long i = begin; i < end; i++) {
                sign = (i & 1) == 0 ? 1 : -1;
                result += sign / (i * 2.0 + 1);
            }

            return result * 4;
        }

        // 分割任務
        long middle = (begin + end) / 2;
        PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
        PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);

        leftTask.fork();  // 非同步執行 leftTask
        rightTask.fork(); // 非同步執行 rightTask

        double leftResult = leftTask.join();   // 阻塞,直到 leftTask 執行完畢返回結果
        double rightResult = rightTask.join(); // 阻塞,直到 rightTask 執行完畢返回結果

        return leftResult + rightResult; // 合併結果
    }

}

然後我們使用 ForkJoinPool 的 invoke 執行 PiEstimateTask

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    
        // 計算 10 億項,分割任務的臨界值為 1 千萬
        PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    
        double pi = forkJoinPool.invoke(task); // 阻塞,直到任務執行完畢返回結果
    
        System.out.println("π 的值:" + pi);
        
        forkJoinPool.shutdown(); // 向執行緒池傳送關閉的指令
    }
}

執行結果:

我們也可以使用 submit 方法非同步的執行任務(此處 submit 方法返回的 future 指向的物件即提交任務時的 task):

public static void main(String[] args) throws Exception {
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    Future<Double> future = forkJoinPool.submit(task); // 不阻塞
    
    double pi = future.get();
    System.out.println("π 的值:" + pi);
    System.out.println("future 指向的物件是 task 嗎:" + (future == task));
    
    forkJoinPool.shutdown(); // 向執行緒池傳送關閉的指令
}

執行結果:

值得注意的是,選取一個合適的分割任務的臨界值,對 ForkJoinPool 執行任務的效率有著至關重要的影響。臨界值選取過大,任務分割的不夠細,則不能充分利用 CPU;臨界值選取過小,則任務分割過多,可能產生過多的子任務,導致過多的執行緒間的切換和加重 GC 的負擔從而影響了效率。所以,需要根據實際的應用場景選擇一個合適的分割任務的臨界值。

ForkJoinPool 相比於 ThreadPoolExecutor,還有一個非常重要的特點(優點)在於,ForkJoinPool具有 Work-Stealing (工作竊取)的能力。所謂 Work-Stealing,在 ForkJoinPool 中的實現為:執行緒池中每個執行緒都有一個互不影響的任務佇列(雙端佇列),執行緒每次都從自己的任務佇列的隊頭中取出一個任務來執行;如果某個執行緒對應的佇列已空並且處於空閒狀態,而其他執行緒的佇列中還有任務需要處理但是該執行緒處於工作狀態,那麼空閒的執行緒可以從其他執行緒的佇列的隊尾取一個任務來幫忙執行 —— 感覺就像是空閒的執行緒去偷人家的任務來執行一樣,所以叫 “工作竊取”。

Work-Stealing 的適用場景是不同的任務的耗時相差比較大,即某些任務需要執行較長時間,而某些任務會很快的執行完成,這種情況下用 Work-Stealing 很合適;但是如果任務的耗時很平均,則此時 Work-Stealing 並不適合,因為竊取任務時不同執行緒需要搶佔鎖,這可能會造成額外的時間消耗,而且每個執行緒維護雙端佇列也會造成更大的記憶體消耗。所以 ForkJoinPool 並不是 ThreadPoolExecutor 的替代品,而是作為對 ThreadPoolExecutor 的補充。

總結:  ForkJoinPool 和 ThreadPoolExecutor 都是 ExecutorService(執行緒池),但ForkJoinPool 的獨特點在於:

  1. ThreadPoolExecutor 只能執行 Runnable 和 Callable 任務,而 ForkJoinPool 不僅可以執行 Runnable 和 Callable 任務,還可以執行 Fork/Join 型任務 —— ForkJoinTask —— 從而滿足並行地實現分治演算法的需要;
  2. ThreadPoolExecutor 中任務的執行順序是按照其在共享佇列中的順序來執行的,所以後面的任務需要等待前面任務執行完畢後才能執行,而 ForkJoinPool 每個執行緒有自己的任務佇列,並在此基礎上實現了 Work-Stealing 的功能,使得在某些情況下 ForkJoinPool 能更大程度的提高併發效率。

參考文獻:

jdk1.8原始碼

宣告 部分圖片來源於網路,共同學習