1. 程式人生 > >【Java沒基礎】Java 8 並行流 ParallelStream

【Java沒基礎】Java 8 並行流 ParallelStream

前言

在前兩篇的 Java 8 函數語言程式設計的 blog 中,我們聊了 Lambda 表示式,聊了一些常用的 Stream API 和一些收集器方法。這篇將是我們一起系統的瞭解學習 Java 8 函數語言程式設計特性的最後一篇正式的 blog,以後再遇到什麼問題或者學習到新的知識我會以填坑的形式以小篇 blog 來進行探討和填坑~

並行流

我們通過前兩篇瞭解了 Lambda 表示式的便捷、程式碼的優美,在今天,我們要考慮一下效率了。

在文章的最後我們可以瞭解到,使用 parallelStream 來對大資料量 List 進行處理是擁有著最快的速度。當然了,我們需要考慮到其中未知的效率影響,但是使用並行流 parallelStream 對程式碼效能的提升是不可忽略的。

資料並行化

資料並行化是指將資料分成塊,為每塊資料分配單獨的處理單元。它將問題分解為可在多塊資料處理器上求解的形式,然後對每塊資料執行運算,最後將各個資料處理器上得到的結果彙總,從而獲得最終答案。

對於資料平行計算之後的執行速度,我們首先要了解一個概念:阿姆達爾定律

阿姆達爾定律預測了搭載多核處理器的機器提升程式速度的理論最大值。以完全序列的程式為例,如果將其一半資料改為並行化處理,不管增加了多少處理器,其理論上的最大速度只是原來的二倍。

即:問題的求解時間將完全取決於它可以被分解為幾個部分。

使用並行流

將流進行並行化操作十分簡單,如果有一個 Stream 流物件,那麼呼叫他的parallel方法

stream.parallel()....

如果從一個集合類中建立一個具有並行處理能力的流,與建立一個流類似,只需要呼叫 parallelStream 方法,其他處理流的方法不變。

// 舉個栗子 :在汽車列表中 找出每個車主擁有的汽車數量
public int parallelCarNumOfUser(List<Car> cars){
  return cars.parallelStream()
    .flatMap(User :: getUsers)
    .count();
}

實際上,並行流的底層還是沿用了 fork/join 框架,fork 分解問題、解決問題,最後由 join 合併為結果。
《Java 8 函數語言程式設計》第六章 fork/join 分解合併問題


《Java 8 函數語言程式設計》第六章 fork/join 分解合併問題

並行流使用的是預設的 ForkJoinPool 來控制並行處理數量,這個預設的 ForkJoinPool 和 CPU 的核心數量相同,使用預設的 ForkJoinPool 適用於 全 CPU 佔用的密集計算

流操作分為有 狀態無狀態 兩種,
有狀態包括 sorted、distinct、 limit 等;
無狀態包括 map、 filter、faltMap等;
我們在操作並行流的時候,應該儘量的避開有狀態,選用無狀態

而應對於非密集計算的業務場景,可以採用自定義的 ForkJoinPool 來控制進行計算的 CPU 數量。

// 舉個栗子,來段虛擬碼,自定義一個使用一定數量 CPU 的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(int numbersOfCPU);
forkJoinPool.submit(() ->{
  ***.parallelStream()
    .***(* -> ***)
    .***()
}
並行流限制

並行流的加入和使用帶來了效率,同時也帶來了一些問題,比如我們想到:如果運算順序的變化會導致結果的變化的時候,這樣我們應該怎麼辦?

所以,使用並行流程式碼必須遵守一些規則和限制。

先舉個簡單的例子,我們使用 reduce 來進行求和操作。在序列流中,我們可以設定任何初始值

比如 : reduce(100, (a, b) -> a + b);

而當我們使用並行流來編寫如上程式碼的時候就要小心了,因為有幾路並行,程式碼的實現中就會加上幾個 100,這樣的話就會影響最終結果。
還有,如果運算順序影響最終結果怎麼辦呢?

所以:1、在使用並行流的時候,初值必須為組合函式的恆等值, 一般情況下,加法的初始值都為 0 ,乘法的初始值都為1。(至於為什麼?問一下小學老師就知道了。) 2、reduce 操作的另一個限制是組合要滿足結合律。這樣的話,交換運算順序不會對最終結果產生影響。

除了上面比較具有針對性的兩點,還有一點適用於並行流的通用情況,即 避免持有鎖。流框架會在需要的時候,自己處理同步操作,因此我們不需要為自己的資料結構加鎖,這會產生不必要的開銷。

並行與序列流

如果我們希望有個流不進行並行操作,只進行並行顯示進行序列操作的時候,我們可以使用 sequential 方法來將這個流顯示定義為序列的。

使用 parallel 可以將流轉換為並行流
使用 sequential 可以將流轉換為序列流

在對流進行求值時,流不能同時處於兩種模式,要麼並行要麼序列,如果同時呼叫了 parallel 和 sequential 方法,那麼最後的那個起作用。

效能要素

影響並行流效能的主要因素有幾點

1、資料大小
我們知道,將問題分解後進行處理再進行合併會帶來額外的開銷,那麼在小資料量的時候,使用並行流的執行時間有可能超過單核序列執行時間數。所以只有在 資料量足夠大 的時候,採用並行化處理才有意義。

2、源資料結構
每個管道(即並行之後的流) 的操作都基於初始資料來源,通常是集合。而不同資料結構的集合在分解的時候有著不同的開銷,這些開銷同樣影響了並行流處理資料的能力。

一般情況下
ArrayList > HashSer ≈ TreeSet > LinkedList

3、裝箱
處理基本資料型別肯定要比處理裝箱型別要快

4、核心數
根據阿姆達爾定律,能夠使用的核心數越多,程式能獲得的潛在效能提升的幅度就越大。而且,同時執行的其他程序(包括但不限於:作業系統、IDE、其他應用程式比如音樂或者其他 Java 程序),或者執行緒關聯性都會影響到系統性能,影響到你的可用核心數量。

5、單元處理開銷
單個元素的大小或者複雜程度同時也是影響並行流效能的一大主因,花在流中的每個元素的時間越長,並行操作所帶來的效能提升就越明顯

並行流運算元組

Java 8 引入 了一些針對陣列的並行操作,可以脫離流框架單獨使用。這些操作被封裝在工具類 java.util.Arrays 中,並且提供了很多的過載方法供我們挑選使用。在 Arrays 類中,同樣還包括 Java 以前提供的與陣列相關的有用方法。

方法名 說明
parallelPrefix 任意給定一個函式,計算資料的和(資料累加)
parallelSetAll 使用 Lambda 表示式更新陣列元素
parallelSort 並行化對陣列排序

測試

Lambda 表示式的測試

對 Lambda 表示式的測試有些困難,因為 Lambda 表示式沒有方法名,無法在測試程式碼中進行呼叫。然而程式碼測試是不可少的,我們來一起研究一下測試方法

方法一:複製 Lambda 表示式來進行測試
當你修改了實現程式碼,可是測試程式碼依然會通過,那麼這個測試也就沒有什麼意義。

方式二:將 Lambda 表示式放到另一個方法中進行測試
這個看起來是可行的,但是我們仔細思考一下,這麼做的話被測試的主體只是那個方法,而不是我們想要測試的 Lambda 表示式本身。

方式三:將Lambda表示式改寫為一個普通方法,對這個方法進行測試
這個方法是應該被推薦的。一個 Lambda 表示式必然可以被改寫成為一個有名字的普通方法,我們對這個方法進行測試就是對 Lambda 表示式本身進行測試,並且可以在測試用例中覆蓋所有的邊界情況。

我們也可以使用一個非常強大的測試框架:Mockito 框架來對 Lambda 表示式進行測試。
Mockito 的 Answer 介面允許使用者提供其他行為,這裡 Answer 之所以可以使用 Lambda 表示式,是因為 Answer 本事就是一個函式介面。

Stream 流的測試

當我們對一個流進行單步除錯,我們會發現除錯過程中出現了一些問題,因為流的一些操作是惰性求值的,我們無法通過列印值或者堆疊的方法來觀察他是否執行了我們希望的操作。

忽然,我們想到,我們或許可以通過 ForEach 來逐步列印啊?

然而操作的時候,我們會發現有個問題出現了,ForEach語句會觸發求值操作,而且流只能使用一次,接下來的行為我們就無法觀察,如果我們還想繼續除錯下去,就需要重新建立一個流。

幸運的是,Java工程師們想到了這一點,為我們提供了 peek 方法。
這個方法可以讓我們檢視每個值,並且可以繼續操作流

// 嗯,來段虛擬碼
***.stream()
  .peek(name -> System.out.println(name))
  .collection(***)
  .***

在 peek 方法中,我們可以使用列印方法來進行除錯,也可以使用 IDE 的除錯功能,還可以通過同樣的方式將輸出定向到日誌系統中。

結語

到這,我們對於 Java 8 新特性函數語言程式設計的系統性的學習就告一段落了。當然了,學習不能放下,坑還是要踩的,我會把未來日子中學到的新東西和新坑一點點的整理、總結。

這有個二維碼,不嫌棄的話,就關注下吧

姜某人的微信公眾號