1. 程式人生 > >Java8 In Action-2.函式式資料處理(四)

Java8 In Action-2.函式式資料處理(四)

4.並行資料處理與效能 4.1並行流 Stream.parallelStream:把集合轉換為並行流 並行流就是一個把內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流。這樣一來,你就可以自動把給定操作的工作負荷分配給多核處理器的所有核心,讓它們都忙起來。

public static void main(String[] args) throws IOException {
        long sum = getSum(10000000);
        long sum2 = getSum2(10000000);
        System.out.println(sum + " " + sum2);
/** * 並行流內部使用了預設的ForkJoinPool(7.2節會進一步講到分支/合併框架),它預設的 * 執行緒數量就是你的處理器數量,這個值是由Runtime.getRuntime().availableProcessors()得到的。 * 可以通過以下設定來改變執行緒池大小,但一般不建議這樣做 * System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); */ //使用並行流高效求和,注意要避免共享可變狀態,確保並行Stream得到正確的結果
long total = LongStream.rangeClosed(1, 10000000).parallel().reduce(0L, Long::sum); System.out.println(total); } //求和:傳統的java程式碼 public static long getSum(long n){ long result = 0; for (int i=1;i<=n;i++){ result += i; } return result; }
//求和:java8流 public static long getSum2(long n){ return Stream.iterate(1L,i -> i + 1) //生成自然數無限流 .limit(n) //限制到前n個數 .reduce(0L,Long::sum); //對所有數字求和來歸納流 }

使用並行流的建議:

  • 測試,檢驗當把順序流變為並行流時的效能提升;
  • 留意裝箱.。自動裝箱和拆箱操作會大大降低效能。 Java 8中有原始型別流(IntStream、LongStream、 DoubleStream)來避免這種操作,但凡有可能都應該用這些流。
  • 有些操作本身在並行流上的效能就比順序流差。特別是limit和findFirst等依賴於元素順序的操作,它們在並行流上執行的代價非常大。
  • 還要考慮流的操作流水線的總計算成本。
  • 對於較小的資料量,選擇並行流幾乎從來都不是一個好的決定。並行處理少數幾個元素的好處還抵不上並行化造成的額外開銷。
  • 要考慮流背後的資料結構是否易於分解。例如, ArrayList的拆分效率比LinkedList高得多,因為前者用不著遍歷就可以平均拆分,而後者則必須遍歷。另外,用range工廠方法建立的原始型別流也可以快速分解。可以實現Spliterator來完全掌控分解過程.
  • 流自身的特點,以及流水線中的中間操作修改流的方式,都可能會改變分解過程的效能。
  • 還要考慮終端操作中合併步驟的代價是大是小(例如Collector中的combiner方法) 。如果這一步代價很大,那麼組合每個子流產生的部分結果所付出的代價就可能會超出通過並行流得到的效能提升。 在這裡插入圖片描述 最後,我們還要強調並行流背後使用的基礎架構是Java 7中引入的分支/合併框架。並行彙總的示例證明了要想正確使用並行流,瞭解它的內部原理至關重要.

4.2分支/合併框架 分支/合併框架的目的是以遞迴方式將可以並行的任務拆分成更小的任務,然後將每個子任務的結果合併起來生成整體結果。它是ExecutorService介面的一個實現,它把子任務分配給執行緒池(稱為ForkJoinPool)中的工作執行緒。

使用 RecursiveTask 要把任務提交到這個池,必須建立RecursiveTask<V>的一個子類,其中V是並行化任務 (以及所有子任務)產生的結果型別,或者如果任務不返回結果,則是RecursiveAction型別(當然它可能會更新其他非區域性機構)。要定義RecursiveTask, 只需實現它唯一的抽象方法compute:

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    /**
     * The result of the computation.
     */
    V result;

    /**
     * The main computation performed by this task.
     * @return the result of the computation
     */
    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    /**
     * Implements execution conventions for RecursiveTask.
     */
    protected final boolean exec() {
        result = compute();
        return true;
    }

}

這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。正由於此,這個方法的實現類似於下面的虛擬碼:

if (任務足夠小或不可分) {
	順序計算該任務
} else {
	將任務分成兩個子任務
	遞迴呼叫本方法,拆分每個子任務,等待所有子任務完成
	合併每個子任務的結果
}

分而治之

package com.h.java8;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * Created by John on 2018/9/30.
 */
public class ForkJoinSumCalculator extends RecursiveTask<Long>{

    private final long[] numbers;

    private final int start;

    private final int end;

    public static final long THRESHOLD = 10_000;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD){
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers,start,start + length/2);
        leftTask.fork();
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers,start + length/2,end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially(){
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

工作竊取

4.3Spliterator