Java - 並行資料處理和效能
Java - 並行資料處理和效能
並行流
並行流是一個把元素分成多個塊的流,每個塊用不同的執行緒處理。可以自動分割槽,讓所有的處理器都忙起來。
假設要寫一個方法,接受一個數量n做引數,計算1-n的和。可以這樣實現:
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
也許可以使用parallel方法,簡單地使用平行計算,提高程式效能:
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
. limit(n)
.parallel()
.reduce(0L, Long::sum);
}
這樣,流可能在內部被分成多個塊,導致reduction操作可以在不同的塊上互不依賴地並行地各自工作。最後,reduction操作組合每個子流的並行reductions的返回值,返回的結果就是整個流的結果。見下面的示意圖
實際上,呼叫parallel方法,流自身不會有任何變化。在內部,設定一個布林型別的標記,標明你想在並行模式執行操作,接下來的操作都是並行的。
類似地,你也可以使用sequential方法,把並行流轉成序列的。你也許認為可以組合這兩個方法:
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
但是,最後一次呼叫parallel或者sequential才會全域性地影響管道。上面的例子,管道將被並行地執行。
配置並行流使用的執行緒池
並行流內部使用ForkJoinPool。預設地,執行緒數量等於處理器數量(Runtime.getRuntime().availableProcessors())。但是,可以修改系統屬性java.util.concurrent.ForkJoinPool.common.parallelism,配置執行緒數量。
這是全域性配置,所以,除非你認為對效能有幫助,否則不要修改。
測量流的效能
我們聲稱並行加法應該比序列的或者自己的迭代方法快。我們可以使用JMH測量一下。這是一個工具,使用基於註解的方法,可以為JVM程式增加
可靠的microbenchmarks。如果使用maven,可以這樣引入:
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.21</version>
</dependency>
第一個庫是核心實現,第二個包含一個註解處理器,幫助生成JAR檔案,通過它可以方便地執行你的benchmark。maven配置裡還應該有下面的plugin:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>benchmarks</finalName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
程式程式碼如下
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
//測量平均時間
@BenchmarkMode(Mode.AverageTime)
//以毫秒為單位,列印benchmark結果
@OutputTimeUnit(TimeUnit.MILLISECONDS)
//執行兩次,增加可靠性。堆空間是4Gb
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
@State(Scope.Benchmark)
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N)
.reduce(0L, Long::sum);
}
//每次執行benchmark後,執行GC
@TearDown(Level.Invocation)
public void tearDown() {
System.gc();
}
}
使用大記憶體,和每次迭代以後試著GC都是為了儘量減少GC的影響。儘管如此,結果應該再加一些鹽。很多因素會影響執行時間,比如你的機器有多少核。
預設地,JMH一般先執行5次熱身迭代,這樣可以讓HotSpot優化程式碼,然後再執行5次迭代用來計算最終的結果。你可以使用-w和-i命令列引數修改這些配置。
在我的機器上,使用JDK 1.8.0_121, Java HotSpot™ 64-Bit Server VM,執行結果是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.sequentialSum avgt 10 83.565 ± 1.841 ms/op
你應該期望,使用經典的for迴圈的迭代版本執行得更快,因為它在更低層(level)工作,而且,更重要的是,它不需要執行原始型別的裝箱和拆箱操作。我們測試一下這個方法:
@Benchmark
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
執行結果是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.iterativeSum avgt 10 6.877 ± 0.068 ms/op
證實了我們的期望:迭代版本比序列流快了10倍。讓我們使用並行流試一試:
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.parallelSum avgt 10 110.157 ± 1.882 ms/op
非常令人失望:並行版本的求和一點都沒有發揮多核的優勢,比序列版還要慢。為什麼會這樣?有兩個問題混在一起:
- 迭代生成了裝箱物件,它們在做加法前,必須拆箱成數字
- 迭代很難劃分獨立的塊來並行地執行
第二點是特別有趣的,不是所有的流都是適合並行處理的。特別是,迭代的流就很難,這是因為,函式的輸入依賴上一個函式的結果。見下圖:
這意味著,reduction過程並沒有像第一張圖裡所表示的那樣執行。reduction開始的時候,還沒有整個數字列表,所以沒法分塊。把流標記為並行的,反而增加了在不同執行緒上執行的求和要被序列處理的負擔。
使用更專業的方法
LongStream.rangeClosed方法使用的是原始long型別,所以不用裝箱和拆箱。而且,它生產的數的範圍,可以很容易地分成不依賴的塊。比如,範圍1-20可以被分成1-5、6-10、11-15和16-20。
@Benchmark
public long rangedSum() {
return LongStream.rangeClosed(1, N)
.reduce(0L, Long::sum);
}
輸出是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.rangedSum avgt 10 7.660 ± 1.643 ms/op
可以看出來,比並行流快了很多,僅比經典的for迴圈慢了一點。LongStream支援並行:
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}
輸出是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.parallelRangedSum avgt 10 4.790 ± 5.142 ms/op
可以發現,並行生效了。甚至比for迴圈還快了1/3。
正確使用並行流
濫用並行流產生錯誤的主要原因是使用了改變共享狀態的演算法。下面是一個通過改變共享的累加器來實現前n個自然數求和的例子:
public long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}
public class Accumulator {
public long total = 0;
public void add(long value) {
total += value;
}
}
這種程式碼很常見,特別對熟悉指令式程式設計正規化的開發者而言。當你迭代數字列表時,經常這樣做:初始化一個累加器,遍歷元素,使用累加器相加。
這程式碼有什麼錯?它是序列的,失去了並行性。讓我們試著使用並行流:
public long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
多執行幾次,你會發現,每次返回的結果都不一樣,而且都不是正確的50000005000000。這是因為多執行緒累加的時候,total += value並不是原子操作。那麼怎樣才能寫出並行情況下,正確的程式碼呢?
- 如果有懷疑,就做測試
- 注意裝箱問題。Java提供的原始型別流(IntStream、LongStream和DoubleStream)可以避免類似的問題,儘量使用他們
- 有些操作使用並行流效能更差。尤其是像limit和findFirst這種依賴元素順序的操作,使用並行是非常昂貴的。比如,findAny就比findFirst效能好,因為它跟順序無關。呼叫unordered方法,可以把一個有順序的流變成無順序的流。比如,如果你需要流的N個元素,而你對前M個感興趣,在一個無順序的流上呼叫limit比有順序的高效
- 如果資料量不大,不要選擇並行流
- 要考慮流的底層資料結構的可分解程度。比如,ArrayList比LinkedList分解起來更高效,因為不遍歷就可以分割。使用range工廠增加的原始型別流也很容易分割。可以通過實現自己的Spliterator分割流
- 流的特徵,以及中間操作如何修改流的元素,會改變分解過程的效能。比如,一個SIZED流可以被分解成兩個相等的部分,並且每個部分可以高效得並行處理,但是,filter會過濾掉任何不滿足條件的元素,導致流的size成了未知的
- 考慮結束操作是廉價的還是昂貴的merge步驟(比如,Collector的combiner方法)。如果是昂貴的,組合並行結果的代價會比並行流帶來的好處還要高
下面的表格,總結一些流在可分解性方面的並行友好性
源 | 可分解性 |
---|---|
ArrayList | 優秀 |
LinkedList | 差 |
IntStream.range | 優秀 |
Stream.iterate | 差 |
HashSet | 好 |
TreeSet | 好 |
fork/join框架
fork/join框架用來遞迴地把可並行的任務分解成小任務,然後組合每個子任務的結果,以生成總的結果。它實現了ExecutorService介面,這樣所有的子任務都在一個執行緒池(ForkJoinPool)內工作。
RecursiveTask
要向ForkJoinPool提交任務,你不得不增加RecursiveTask的子類-R是並行任務(以及每個子任務)的返回型別,或者
增加RecursiveAction的子類-當沒有返回值的時候。要定義RecursiveTask,需要實現它唯一的抽象方法:
protected abstract R compute();
該方法定義分割任務和不能繼續被分割時處理一個子任務的演算法的邏輯。該方法的實現,經常像下面的虛擬碼:
if (任務足夠小,不再被分) {
順序執行任務
} else {
把任務分成兩個子任務
遞迴地呼叫本方法,儘量分割每個子任務
等待所有子任務的完成
組合每個子任務的結果
}
可以發現,這是分治演算法的並行實現。我們繼續求和的例子,演示怎麼使用fork/join框架。首先需要擴充套件RecursiveTask類:
import java.util.concurrent.RecursiveTask;
/**
* Created by leishu on 18-12-11.
*/
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
//分割任務的閾值
public static final long THRESHOLD = 10_000;
//要被求和的陣列
private final long[] numbers;
private final int start;
private final int end;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
//生成子任務的私有構造器
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
//子任務的大小
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();//小於閾值,不分割
}
//增加第一個子任務
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
//非同步執行,新的子任務使用ForkJoinPool的另一個執行緒
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
//同步執行第二個子任務,允許遞迴
Long rightResult = rightTask.compute();
//讀取第一個子任務的結果,如果沒完成就等待
Long leftResult = leftTask.join();
//組合
return leftResult + rightResult;
}
//順序執行
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
然後寫一個方法,執行並行求和:
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
執行一下,輸出如下
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.forkJoinSumB avgt 4 28.458 ± 0.602 ms/op
效能不夠好,這是因為在ForkJoinSumCalculator使用的是一個long[]。
使用fork/join的最佳實踐
- 呼叫任務的join方法,會阻塞呼叫者,直到返回結果。所以,要在兩個子任務都啟動以後在呼叫它
- 不要在RecursiveTask內使用ForkJoinPool的invoke方法
- 子任務的fork方法是用來做排程的。在兩個子任務上直接呼叫它似乎是很自然的,但是,在其中一個上呼叫compute效率更高,因為這樣能重用相同的執行緒
偷工作
任務被分給ForkJoinPool裡的執行緒。每個執行緒有一個儲存任務的雙端連結串列,順序地執行連結串列中的任務。如果由於某種原因(比如I/O),一個執行緒完成了分配給他的全部任務,它會隨機地從其他執行緒選擇一個佇列,從佇列的尾部偷一個任務。這個過程會持續,直到所有的佇列都空了為止。所以,要有大量的小任務,而不是幾個大任務,這樣可以更好地平衡執行緒的負荷。
Spliterator
Spliterator是Java 8 提供的新介面,意思是“splitable iterator”,用來並行地迭代源中的元素。也許你不用開發自己的Spliterator,但是,理解了它,也就明白了並行流是如何工作的。Java 8已經在Collections框架內提供了Spliterator的預設實現。Collection介面有一個default方法spliterator(),它就返回一個Spliterator物件。我們先看看Spliterator介面的定義:
public interface Spliterator<T> {
//用來按順序消費Spliterator的元素,如果還有元素就返回true
boolean tryAdvance(Consumer<? super T> action);
//把一些元素分到一個新的Spliterator,以允許他們並行處理
Spliterator<T> trySplit();
//剩餘的可被遍歷的元素數量估值
long estimateSize();
int characteristics();
}
tryAdvance方法的行為類似於迭代器,用來按順序消費Spliterator的元素,如果還有元素就返回true。trySplit方法
用來把一些元素分到一個新的Spliterator,以允許他們並行處理。
分割過程
把一個流分割成多個部分是一個遞迴過程,如下圖所示。首先,在第一個Spliterator上呼叫trySplit生成一個新的。然後,在這兩個Spliterator上呼叫trySplit,這樣產生四個。一直進行下去,直到該方法返回null,標誌著不能再被分割。最後,當所有的trySplit都返回null時,遞迴過程結束。
分割過程也會受到Spliterator的特徵(由characteristics方法宣告)的影響。
Spliterator特徵
characteristics方法返回一個整數,用來更好地控制和優化Spliterator的用法。
Characteristic | 描述 |
---|---|
ORDERED | 元素是有順序的(比如List),所以Spliterator使用該順序做遍歷和分割槽 |
DISTINCT | 對於每對遍歷的元素x和y,x.equals(y)返回false |
SORTED | 遍歷的元素遵循預定義的排序順序 |
SIZED | 源的size是已知的(比如set),所以estimatedSize()返回的值是精確的 |
NON-NULL | 元素不會為空 |
IMMUTABLE | 源是不可變的,說明遍歷的時候,元素不會被增加、修改和刪除 |
CONCURRENT | 源是併發安全的,併發修改的時候,不用任何同步 |
SUBSIZED | Spliterator和接下來產生的Spliterator都是SIZED |
實現自己的Spliterator
我們開發一個簡單的方法,用來計算字串中的單詞數。
public int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)) {
lastSpace = true;
} else {
if (lastSpace) counter++;
lastSpace = false;
}
}
return counter;
}
要計算的字串是但丁的“地域”的第一句
public static final String SENTENCE =
" Nel mezzo del cammin di nostra vita "
+ "mi ritrovai in una selva oscura"
+ " che la dritta via era smarrita ";
System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
注意,兩個單詞間的空格數是隨機的。執行結果
Found 19 words
使用函式式實現
首先需要把字串轉換成一個流。原始型別int、long和double才有原始的的流,所以,我們使用Stream:
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
可以使用reduction計算單詞數量。當reduce的時候,你不得不攜帶由兩個變數組成的狀態:整數型的總數和布林型的字元是否是空格。因為Java沒有tuples,你得增加一個新類-WordCounter-封裝狀態:
class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
//遍歷,累加
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
//如果上一個字元是空格,而當前的不是,就加1
return lastSpace ? new WordCounter(counter + 1, false) : this;
}
}
//組合,求和
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
}
public int getCounter() {
return counter;
}
}
下面是遍歷一個新字元時,WordCounter的狀態圖