1. 程式人生 > >java8中stream中的任務拆分

java8中stream中的任務拆分

在java8中流操作中,只有到終止符才會進行真正的求值。在設定了並行操作的時候,會對任務進行分解。

任務的分解用到了joinfork框架。

@Override
public void compute() {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}

以AbstractTask為例子,當流操作遇到終止符並設定了並行的時候,大部分情況都會執行AbstractTask的compute()方法。

首先會獲取目標流的spliterator,可分割的迭代器,對於任務中的所要被分割操作的資料也將在這個迭代器中被產生子迭代器在子任務中進行相應的操作。

首先會通過當前迭代器的estimateSize()方法得到當前迭代器中所要處理的資料量,以ArrayList為例子,也就是將要迭代的資料最大下標減去最小下標。而後會根據這個資料量,計算每個任務的處理的最大資料量。

public static long suggestTargetSize(long sizeEstimate) {
    long est = sizeEstimate / LEAF_TARGET;
    return est > 0L ? est : 1L;
}
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

可以看到這裡的每個分片的最大資料處理量為要處理的資料總量除以LEAF_TARGET,而LEAF_TARGET的大小為forkjoin執行緒池大小左移兩位也就是乘4,而forkjoin執行緒池的大小如下所設定。

if (parallelism < 0 && // default 1 less than #cores
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
    parallelism = 1;
if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                        "ForkJoinPool.commonPool-worker-");

可以看到該執行緒池數量為cpu數量-1,也就是說每個任務的最大資料量為要處理的資料總量除以cpu數量-1的四倍。

在確定了每個任務的最大處理資料量,就是開始分割任務。

如果當前迭代器中要處理的資料總量大於最大分片量,那麼就呼叫迭代器的trySplit()方法進行分割,得到子迭代器,並且原來的迭代器的資料也進行了分割。

在分割完畢之後,給當前任務生成左右兩個子任務,左任務的迭代器則是經過分割得到的子迭代器,而右任務的迭代器則是經過分割後得到的原迭代器,這樣的分配,也符合兩個迭代器的下標內容,之後給當前的任務設定pending為1。

接下來會交替根據左右子任務繼續進行分割,而並沒有在下一次迴圈中被分割的子任務則是丟入forkjoin執行緒池中準備執行,並在其compute()方法中會併發繼續進行分割。這裡也就可以看到pending為1的目的,由於左右任務總有一個將繼續在本執行緒中進行執行分割或者流操作,所以只需要等待一個子任務完成。

在完成任務的分割,保證當前任務的資料量不會再大於最大所允許的資料量之後,呼叫doleaf()方法得到當前任務的運算結果。

在完成當前任務的計算之後,呼叫tryCompletion()方法表示當前任務的結束。

public final void tryComplete() {
    CountedCompleter<?> a = this, s = a;
    for (int c;;) {
        if ((c = a.pending) == 0) {
            a.onCompletion(s);
            if ((a = (s = a).completer) == null) {
                s.quietlyComplete();
                return;
            }
        }
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}

首先會得到當前任務的pending,如果不為0則會直接減去1,如果為0,說明此時該任務需要進行關閉,則會先呼叫自己的onCompletion(),方法,之後不斷向上嘗試給自己父任務的pending減一。

如果為0,說明此時該任務的所有子任務已經完畢,或者當前任務是葉子任務並且已經完成。那麼呼叫當前的onComplection()方法,準備任務的結束。