1. 程式人生 > >fork/join 分支/合併框架和自動機制拆分流Spliterator

fork/join 分支/合併框架和自動機制拆分流Spliterator

利用fork/join求和程式碼分析

// 整合RecursiveTask用來建立可以用於分支/合併框架的任務
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    // 不再將任務分解為子任務的陣列大小
    public static final long THRESHOLD = 10_000;
    // 需要求和的陣列
    private final long[] numbers;
    // 子任務處理陣列的起始和終止位置
    private final int start;
    private final int end;
    // 建構函式用於建立主任務
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }
    // 用於遞迴方式為主任務建立子任務
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    // 這裡舉一個用分支/合併 框架的實際例子,用這個框架為一個數字範圍(這裡用一個 long[]陣列表示)求和
    @Override
    protected Long compute() {
        // 該任務用於求和的部分 的大小
        int length = end - start;
        // 小於閾值順序計算結果
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        // 建立子任務為陣列的前一半求和
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        // 利用forkjoinpool執行緒非同步執行新建立的子任務
        leftTask.fork();
        // 建立一個子任務用於為陣列後半部分求和
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
        // 同步執行第二個子任務,有可能進一步進行遞迴劃分
        Long rightResult = rightTask.compute();
        // 讀取第一個子任務的執行結果,如果沒有執行完成就等待
        Long leftResult = leftTask.join();
        // 該任務的結果是兩個子任務的結果的組合
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    // 現在編寫一個方法來並行對前n個自然數求和就很簡單了。你只需把想要的數字陣列傳給 ForkJoinSumCalculator的建構函式
    public static long forkJoinSum(long n) {
        // 這裡用了一個LongStream來生成包含前n個自然數的陣列
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        // 然後建立一個ForkJoinTask (RecursiveTask的父類),並把陣列傳遞給ForkJoinSumCalculator的公共建構函式
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        // 建立了一個新的ForkJoinPool,並把任務傳給它的呼叫方法 。在 ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果
        return FORK_JOIN_POOL.invoke(task);
    }
}

有幾點需要注意:

1.一個任務可以分解成多個獨立的子任務,才能讓效能在並行化時 有所提升

2.不應該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直 接呼叫compute或fork方法,只有順序程式碼才應該用invoke來啟動平行計算。 

3.每個子任務都必須等待另一個子任務完成才能啟動

自動機制拆分流Spliterator

public interface Spliterator<T> { // T是Spliterator遍歷的元素的型別   
    // tryAdvance方法的行為類似於普通的 Iterator 
    boolean tryAdvance(Consumer<? super T> action);    
    //  trySplit是專為Spliterator介面設計的,因為它可以把一些元素劃出去分 給第二個Spliterator(由該方法返回),讓它們兩個並行處理。 
    Spliterator<T> trySplit();   
    // estimateSize方法估計還剩下多少元素要遍歷,因為即使不那麼確切,能快速算出來是一個值 也有助於讓拆分均勻一點  
    long estimateSize();    
    // characteristics方法,它將返回一個int,代表Spliterator本身特性集的編碼。使用Spliterator的客戶可以用這些特性來更好地控制和 優化它的使用。 
    int characteristics(); }

遞迴拆分過程

Spliterator的特性 :

ORDERED 元素有既定的順序(例如List),因此Spliterator在遍歷和劃分時也會遵循這一順序

DISTINCT 對於任意一對遍歷過的元素x和y,x.equals(y)返回false

SORTED 遍歷的元素按照一個預定義的順序排序

SIZED 該Spliterator由一個已知大小的源建立(例如Set),因此estimatedSize()返回的是準確值

NONNULL 保證遍歷的元素不會為null IMMUTABL E Spliterator的資料來源不能修改。這意味著在遍歷時不能新增、刪除或修改任何元素

CONCURRE NT 該Spliterator的資料來源可以被其他執行緒同時修改而無需同步

SUBSIZED 該Spliterator和所有從它拆分出來的Spliterator都是SIZED