1. 程式人生 > >擁抱並行流,提高程式執行速度

擁抱並行流,提高程式執行速度

## 前言 在 Java7 之前,如果想要並行處理一個集合,我們需要以下幾步 1. 手動分成幾部分 2. 為每部分建立執行緒 3. 在適當的時候合併。並且還需要關注多個執行緒之間共享變數的修改問題。而 Java8 為我們提供了並行流,可以一鍵開啟並行模式。是不是很酷呢?讓我們來看看吧 宣告:本文首發於部落格園,作者:後青春期的Keats;地址:https://www.cnblogs.com/keatsCoder/ 轉載請註明,謝謝! ## 並行流 ### 認識和開啟並行流 **什麼是並行流:**並行流就是將一個流的內容分成多個數據塊,並用不同的執行緒分別處理每個不同資料塊的流。例如有這麼一個需求: 有一個 List 集合,而 list 中每個 apple 物件只有重量,我們也知道 apple 的單價是 5元/kg,現在需要計算出每個 apple 的單價,傳統的方式是這樣: ```java List appleList = new ArrayList<>(); // 假裝資料是從庫裡查出來的 for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000); } ``` 我們通過迭代器遍歷 list 中的 apple 物件,完成了每個 apple 價格的計算。而這個演算法的時間複雜度是 O(list.size()) 隨著 list 大小的增加,耗時也會跟著線性增加。並行流 可以大大縮短這個時間。並行流處理該集合的方法如下: ```java appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000)); ``` 和普通流的區別是這裡呼叫的 `parallelStream()` 方法。當然也可以通過 stream.parallel() 將普通流轉換成並行流。並行流也能通過 sequential() 方法轉換為順序流,但要注意:**流的並行和順序轉換不會對流本身做任何實際的變化,僅僅是打了個標記而已。並且在一條流水線上對流進行多次並行 / 順序的轉換,生效的是最後一次的方法呼叫** 並行流如此方便,它的執行緒從那裡來呢?有多少個?怎麼配置呢? 並行流內部使用了預設的 ForkJoinPool 執行緒池。**預設的執行緒數量就是處理器的核心數**,而配置系統核心屬性: java.util.concurrent.ForkJoinPool.common.parallelism 可以改變執行緒池大小。不過該值是全域性變數。改變他會影響所有並行流。目前還無法為每個流配置專屬的執行緒數。一般來說採用處理器核心數是不錯的選擇 ### 測試並行流的效能 為了更容易的測試效能,我們在每次計算完蘋果價格後,讓執行緒睡 1s,表示在這期間執行了其他 IO 相關的操作,並輸出程式執行耗時,順序執行的耗時: ```java public static void main(String[] args) throws InterruptedException { List appleList = initAppleList(); Date begin = new Date(); for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000); Thread.sleep(1000); } Date end = new Date(); log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000); } ``` ![Snipaste_2020-05-21_21-49-44](https://img2020.cnblogs.com/blog/1654189/202005/1654189-20200521232706449-1060282129.jpg) 並行版本 ```java List appleList = initAppleList(); Date begin = new Date(); appleList.parallelStream().forEach(apple -> { apple.setPrice(5.0 * apple.getWeight() / 1000); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } ); Date end = new Date(); log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000); ``` 耗時情況 ![Snipaste_2020-05-21_22-16-08](https://img2020.cnblogs.com/blog/1654189/202005/1654189-20200521232706020-1662215086.jpg) 跟我們的預測一致,我的電腦是 四核I5 處理器,開啟並行後四個處理器每人執行一個執行緒,最後 1s 完成了任務! ### 並行流可以隨便用嗎? #### 可拆分性影響流的速度 通過上面的測試,有的人會輕易得到一個結論:並行流很快,我們可以完全放棄 foreach/fori/iter 外部迭代,使用 Stream 提供的內部迭代來實現了。事實真的是這樣嗎?並行流真的如此完美嗎?答案當然是否定的。大家可以複製下面的程式碼,在自己的電腦上測試。測試完後可以發現,並行流並不總是最快的處理方式。 1. 對於 iterate 方法來處理的前 n 個數字來說,不管並行與否,它總是慢於迴圈的,非並行版本可以理解為流化操作沒有迴圈更偏向底層導致的慢。可並行版本是為什麼慢呢?這裡有兩個需要注意的點: 1. iterate 生成的是裝箱的物件,必須拆箱成數字才能求和 2. 我們很難把 iterate 分成多個獨立的塊來並行執行 這個問題很有意思,我們必須意識到某些流操作比其他操作更容易並行化。對於 iterate 來說,每次應用這個函式都要依賴於前一次應用的結果。因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理。反而還因為並行化再次增加了開支。 2. 而對於 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個痛點了。它生成的是基本型別的值,不用拆裝箱操作,另外它可以直接將要生成的數字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此並行狀態下的 rangeClosed() 是快於 for 迴圈外部迭代的 ```java package lambdasinaction.chap7; import java.util.stream.*; public class ParallelStreams { public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; } public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get(); } public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get(); } public static long rangedSum(long n) { return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong(); } public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong(); } } ``` ```java package lambdasinaction.chap7; import java.util.concurrent.*; import java.util.function.*; public class ParallelStreamsHarness { public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(); public static void main(String[] args) { System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs"); System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs"); System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" ); System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs"); System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" ); } public static long measurePerf(Function f, T input) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); R result = f.apply(input); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + result); if (duration < fastest) fastest = duration; } return fastest; } } ``` #### 共享變數修改的問題 並行流雖然輕易的實現了多執行緒,但是仍未解決多執行緒中共享變數的修改問題。下面程式碼中存在共享變數 total,分別使用順序流和並行流計算前n個自然數的和 ```java public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; } } ``` 順序執行每次輸出的結果都是:50000005000000,而並行執行的結果卻五花八門了。這是因為每次訪問 totle 都會存在資料競爭,關於資料競爭的原因,大家可以看看關於 volatile 的部落格。因此當代碼中存在修改共享變數的操作時,是不建議使用並行流的。 #### 並行流的使用注意 在並行流的使用上有下面幾點需要注意: - 儘量使用 LongStream / IntStream / DoubleStream 等原始資料流代替 Stream 來處理數字,以避免頻繁拆裝箱帶來的額外開銷 - 要考慮流的操作流水線的總計算成本,假設 N 是要操作的任務總數,Q 是每次操作的時間。N * Q 就是操作的總時間,Q 值越大就意味著使用並行流帶來收益的可能性越大 例如:前端傳來幾種型別的資源,需要儲存到資料庫。每種資源對應不同的表。我們可以視作型別數為 N,儲存資料庫的網路耗時 + 插入操作耗時為 Q。一般情況下網路耗時都是比較大的。因此該操作就比較適合並行處理。當然當型別數目大於核心數時,該操作的效能提升就會打一定的折扣了。更好的優化方法在日後的部落格會為大家奉上 - 對於較少的資料量,不建議使用並行流 - 容易拆分成塊的流資料,建議使用並行流 以下是一些常見的集合框架對應流的可拆分效能表 | 源 | 可拆分性 | | --------------- | -------- | | ArrayList | 極佳 | | LinkedList | 差 | | IntStream.range | 極佳 | | Stream.iterate | 差 | | HashSet | 好 | | TreeSet | 好 | 碼字不易,如果你覺得讀完以後有收穫,不妨點個推薦讓更多的人看