1. 程式人生 > >java8新特性:Stream多執行緒並行資料處理

java8新特性:Stream多執行緒並行資料處理

將一個順序執行的流轉變成一個併發的流只要呼叫 parallel()方法 public static long parallelSum(long n){
    return Stream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L,Long::sum); } 並行流就是一個把內容分成多個數據塊,並用不不同的執行緒分別處理每個資料塊的流。最後合併每個資料塊的計算結果。 將一個併發流轉成順序的流只要呼叫sequential()方法 stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce(); 這兩個方法可以多次呼叫, 只有最後一個呼叫決定這個流是順序的還是併發的。 併發流使用的預設執行緒數等於你機器的處理器核心數。 通過這個方法可以修改這個值,這是全域性屬性。 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12"); 並非使用多執行緒並行流處理資料的效能一定高於單執行緒順序流的效能,因為效能受到多種因素的影響。 如何高效使用併發流的一些建議: 1. 如果不確定, 就自己測試。 2. 儘量使用基本型別的流  IntStream, LongStream, and DoubleStream 3. 有些操作使用併發流的效能會比順序流的效能更差,比如limit,findFirst , 依賴元素順序的操作在併發流中是極其消耗效能的 。findAny的效能就會好很多,應為不依賴順序。 4. 考慮流中計算的效能(Q)和操作的效能(N)的對比, Q表示單個處理所需的時間, N表示需要處理的數量,如果Q的值越大, 使用併發流的效能就會越高。 5. 資料量不大時使用併發流,效能得不到提升。 6.考慮資料結構:併發流需要對資料進行分解,不同的資料結構被分解的效能時不一樣的。 流的資料來源和可分解性
可分解性
ArrayList 非常好
LinkedList
IntStream.range 非常好
Stream.iterate
HashSet
TreeSet
7. 流的特性以及中間操作對流的修改都會對資料對分解效能造成影響。 比如固定大小的流在任務分解的時候就可以平均分配,但是如果有filter操作,那麼流就不能預先知道在這個操作後還會剩餘多少元素。 8. 考慮最終操作的效能:如果最終操作在合併併發流的計算結果時的效能消耗太大,那麼使用併發流提升的效能就會得不償失。 9.需要理解併發流實現機制: fork/join 框架 fork/join框架是jdk1.7引入的,java8的stream多執行緒並非流的正是以這個框架為基礎的,所以想要深入理解併發流就要學習fork/join框架。 fork/join框架的目的是以遞迴方式將可以並行的任務拆分成更小的任務,然後將每個子任務的結果合併起來生成整體結果。它是ExecutorService介面的一個實現,它把子任務分配執行緒池(ForkJoinPool)中的工作執行緒。要把任務提交到這個執行緒池,必須建立RecursiveTask<R>的一個子類,如果任務不返回結果則是RecursiveAction的子類。 fork/join框架流程示意圖:
廢話不多說,上程式碼: import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * Created by sunjin on 2016/7/5.
 * 繼承RecursiveTask來建立可以用於分支/合併的框架任務
 */
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    //要求和的陣列
   
 private final long[] numbers;
    //子任務處理的陣列開始和終止的位置
   
 private final int start;
    private final int end;
    //不在將任務分解成子任務的閥值大小
   
 public static final int THRESHOLD = 10000;

    //用於建立組任務的建構函式
   
 public ForkJoinSumCalculator(long[] numbers){
        this(numbers, 0, numbers.length);
    }

    //用於遞迴建立子任務的建構函式
   
 public ForkJoinSumCalculator(long[] numbers,int start,int end){
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    //重寫介面的方法
   
 @Override
    protected Long compute() {
        //當前任務負責求和的部分的大小
       
 int length = end start;
        //如果小於等於閥值就順序執行計算結果
       
 if(length <= THRESHOLD){
            return computeSequentially();
        }
        //建立子任務來為陣列的前一半求和
       
 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbersstartstart + length/2);
        //將子任務拆分出去,丟到ForkJoinPool執行緒池非同步執行。
       
 leftTask.fork();
        //建立子任務來為陣列的後一半求和
       
 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbersstart + length/2, end);
        //第二個任務直接使用當前執行緒計算而不再開啟新的執行緒。
       
 long rightResult = rightTask.compute();
        //讀取第一個子任務的結果,如果沒有完成則等待。
       
 long leftResult = leftTask.join();
        //合併兩個子任務的計算結果
       
 return rightResult + leftResult;
    }

    //順序執行計算的簡單演算法
   
 private long computeSequentially(){
        long sum = 0;
        for(int i =start; i< end; i++){
            sum += numbers[i];
        }
        return sum;
    }
    //提供給外部使用的入口方法
   
 public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
} 注意事項: 1. 呼叫join 方法要等到呼叫這個方法的執行緒的自己的任務完成之後。 2. 不要直接去呼叫ForkJoinPool的invoke方法 ,只需要呼叫RecursiveTask的fork或者compute。 3. 拆解任務時只需要呼叫一次fork執行其中一個子任務, 另一個子任務直接利用當前執行緒計算。應為fork方法只是在ForkJoinPool中計劃一個任務。 4.任務拆分的粒度不宜太細,不否得不償失。 工作盜取 由於各種因素,即便任務拆分是平均的,也不能保證所有子任務能同時執行結束, 大部分情況是某些子任務已經結束, 其他子任務還有很多, 在這個時候就會有很多資源空閒, 所以fork/join框架通過工作盜取機制來保證資源利用最大化, 讓空閒的執行緒去偷取正在忙碌的執行緒的任務。 在沒有任務執行緒中的任務存在一個隊列當中, 執行緒每次會從頭部獲取一個任務執行,執行完了再從queue的頭部獲取一個任務,直到佇列中的所有任務執行完,這個執行緒偷取別的執行緒佇列中的任務時會從佇列到尾部獲取任務,並且執行,直到所有任務執行結束。 從這個角度分析,任務的粒度越小, 資源利用越充分。 工作盜取示意圖 可拆分迭代器Spliterator 它和Iterator一樣也是用於遍歷資料來源中的元素,但是他是為並行執行而設計的。 java8 所有資料結構都實現了 這個介面, 一般情況不需要自己寫實現程式碼。但是瞭解它的實現方式會讓你對並行流的工作原理有更深的瞭解。(未完待續)