1. 程式人生 > >Java8 Stream 平行計算實現的原理

Java8 Stream 平行計算實現的原理

轉自:http://lvheyang.com/?p=87

這兩天組內的小夥伴在學習Java8,推廣在新專案內使用新特性。正好看到了Stream 帶來的遍歷的多執行緒併發:
    
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 8, 0, 1)
        .stream()
        .parallel()
        .collect(Collectors.groupingBy(x -> x % 10))
        .forEach((x, y) -> System.out.println(x + ":" + y));

和小夥伴一起試著用各種玩法玩了一下Java8的函數語言程式設計特性之後,感嘆到這樣子併發計算真的是越來越簡單了的。

但是深入思考之後就會很自然的想到一個問題,這個過程中,我們並沒有顯示的告訴Stream,我們需要多少個執行緒進行平行計算?我們能否複用之前的執行緒進行計算?

帶著這個問題我們先打開了VisualVM,檢視一下我們執行這樣一個任務會啟動多少個執行緒?

threads


我們可以看到預設的parallel計算啟動了三個執行緒進行並行。這三個執行緒是怎麼來的呢?抱著這個問題,我們來參考一下Jdk8的原始碼,來看看它是如何設定這個值的。

我們知道Stream 是一個惰性求值的系統(如何進行惰性求值,我會在另一篇部落格中進行分析),那麼我們只需要找它最後求值的過程,看它是怎樣進行求值的就可以了。在AbstractPipeline 這個類裡面我們找到了Stream 計算的最終求值過程的預設實現:
/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

在15行,我們可以看到,在求值的時候會檢查平行計算的標誌位,如果標誌了平行計算的話,我們就會並行求值,反之則會序列求值。我們可以進一步進入並行求值的邏輯中,這是一個TerminalOp的預設介面方法,預設實現就是直接呼叫序列求值,在FindOp、ForEachOp、MatchOp 和 ReduceOp 中得到了覆蓋。

parallel
@Override
public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<P_IN> spliterator) {
    return new FindTask<>(this, helper, spliterator).invoke();
}
如FindOp的程式碼示例,這四個操作都是建立一個Task的示例,然後執行invoke方法。這些Task的繼承關係如圖:

Java的ForkJoin執行原理-Task繼承圖

可以看出所有的Task 都繼承自Jdk7 中引入的ForkJoin 並行框架的ForkJoinTask。所以我們可以看出Stream 的並行是依賴於ForkJoin 框架的。以AbstractTask 為例我們看看它是如何進行平行計算的:

@Override
public void compute() {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}

ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
           Spliterator spliterator) {
    super(parent, spliterator);
    this.op = parent.op;
}

@Override
protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator spliterator) {
    return new ReduceTask<>(this, spliterator);
}

這裡面的主要邏輯就是

    先呼叫當前splititerator 方法的estimateSize 方法,預估這個分片中的資料量
    根據預估的資料量獲取最小處理單元的大小閾值,即當資料量已經小於這個閾值的時候進行計算,否則進行fork 將任務劃分成更小的資料塊,進行求解。這裡值得注意的是,getTargetSize 在第一次呼叫的時候會設定:
        預測資料量大小 / (預設併發度 * 4) 的結果作為最小執行單元的數量(配置的預設值是cpu 數 – 1,可以通過java.util.concurrent.ForkJoinPool.common.parallelism設定)
    如果當前分片大小仍然大於處理資料單元的閾值,且分片繼續嘗試切分成功,那麼就繼續切分,分別將左右分片的任務建立為新的Task,並且將當前的任務關聯為兩個新任務的父級任務(邏輯在makeChild 裡面)
    先後對左右子節點的任務進行fork,對另外的分割槽進行分解。同時設定pending 為1,這代表一個task 實際上只會有一個等待的子節點(被fork)。
    當任務已經分解到足夠小的時候退出迴圈,嘗試進行結束。呼叫子類實現的doLeaf方法,完成最小計算單元的計算任務,並設定到當前任務的localResult中
    呼叫tryComplete 方法進行最終任務的掃尾工作,如果該任務pending 值不等於0,則原子的減1,如果已經等於0,說明任務都已經完成,則呼叫onCompletion 回撥,如果該任務是葉子任務,則直接銷燬中間資料結束;如果是中間節點會將左右子節點的結果進行合併
    檢查如果這個任務已經沒有父級任務了,則將該任務置為正常結束,如果還有則嘗試遞迴的去呼叫父級節點的onCompletion回撥,逐級進行任務的合併。

public final void tryComplete() {
    CountedCompleter<?> a = this, s = a;
    for (int c;;) {
        if ((c = a.pending) == 0) {
            a.onCompletion(s);
            if ((a = (s = a).completer) == null) {
                s.quietlyComplete();
                return;
            }
        }
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}

說了這麼多,大家也基本理解了Stream 的實現原理了。其實本質上就是在ForkJoin上進行了一層封裝,將Stream 不斷嘗試分解成更小的split,然後使用fork/join 框架分而治之。

所以我們以往關於Fork/Join 的經驗也都可以派上用場,可以解答之前我們的幾個疑問:

    我在visualvm 中看到的 parallize 的3個執行緒是怎麼來的?
        答:由於 taskToFork.fork() 呼叫,parallize使用了預設的ForkJoinPool.common 預設的一個靜態執行緒池,這個執行緒池的預設執行緒個數是cpu 數量-1。由於我的程式碼是執行在四個邏輯核心的MacBook 上,所以這裡的執行緒個數為3。如下面程式碼和註釋所示:

if (parallelism < 0 && // default 1 less than #cores
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
    parallelism = 1;
if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;
    如何控制parallize 的執行緒數?
        答:我們可以自己構建一個ForkJoinPool,向其中提交一個parallize 任務,可以做到控制併發度。如以下示例程式碼:我們將之前的stream 的過程構造成一個runnable 的lambda 匿名函式 ()-> {…}。提交至執行緒池中,就可以按照我們想要的併發度進行計算了。

ForkJoinPool pool = new ForkJoinPool(2);
ret = pool.submit(() -> {
    return LongStream.range(1, 50 * 1024 * 1024).boxed().collect(Collectors.toList())
            .stream()
            .parallel()
            .map(x -> x * 2)
            .filter(x -> x < 1500)
            .reduce((x,y) -> x+y)
            .get();
}).get();
接下來打算繼續深入這兩個很有意思的問題:

    深入介紹一下Stream的惰性求值過程,最好能跟Scala 的Stream 實現進行比較:Java8 Stream 惰性求值實現分析 – 驢和羊

    深入介紹ForkJoin 的底層實現,包括它是如何進行執行緒排程和cache line sharing 優化的

參考文獻:



相關推薦

Java8 Stream 平行計算實現原理

轉自:http://lvheyang.com/?p=87 這兩天組內的小夥伴在學習Java8,推廣在新專案內使用新特性。正好看到了Stream 帶來的遍歷的多執行緒併發:     Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 8, 0, 1)

PCB 加投率計算實現基本原理--K最近鄰算法(KNN)

最近鄰 plist 控制 str 驗收 階段 分享圖片 數據量 出現 PCB行業中,客戶訂購5000pcs,在投料時不會直接投5000pcs,因為實際在生產過程不可避免的造成PCB報廢, 所以在生產前需計劃多投一定比例的板板, 例:訂單 量是5000pcs,加投3%,那就

CUDA實現矩陣相加的平行計算

(一)目的 熟悉基本的CUDA程式架構以及如何呼叫相應的API進行CUDA程式設計   (二)內容 完成矩陣相加的並行程式的實現(不用share memory實現) 要求: 實現2個矩陣(32*32)的相加,M矩陣的初始值全為2,N矩陣的初始值全為5。同時用C

利用CountDownLatch實現平行計算

import java.util.concurrent.CountDownLatch; /** * @Author pipi * @Date 2018/10/15 13:56 **/ public class ParallelComputing { private int[] nums;

利用java8 stream api 實現List集合分頁獲取工具

package com.test.paging; import java.util.Arrays; import java.util.List; /** * @author kevin.chen * Date 2017/11/9 * Time 18:05

Java8之ScheduledThreadPoolExecutor實現原理

ScheduledThreadPoolExecutor是一個可實現定時任務的執行緒池,ScheduledThreadPoolExecutor內的任務既可以在設定的時間到達時執行一次,也可以相隔固定時間週期執行。 ScheduledThreadPoolExecutor繼承自T

java8 Stream 一行實現列表去重

通常情況下,實現列表去重的方式有: 建立一個新的result列表,迴圈原列表,如果元素不在result列表中,則放入 建立一個HashSet,將列表作為構造引數傳入 下面介紹一種簡單、高效的去重方式,藉助於java8新特性引入的Stream類。程式碼如下: 實體類Per

hashmap實現原理(雜湊值計算,put方法,擴容) jdk1.8帶來的優化 hashmap併發安全 ConcurrentHashMap

HashMap的原始碼,實現原理,JDK8中對HashMap做了怎樣的優化。 ArrayList和LinkedList的優缺點——陣列的特點是:定址容易,插入和刪除困難;而連結串列的特點是:定址困難,插入和刪除容易。 hashmap底層

23. matlab平行計算原理以及parpool函式

 宣告: 因電腦問題, 只做記錄,以後嘗試。 出處: Matlab並行運算 - wenyusuran的專欄 - CSDN部落格 https://blog.csdn.net/wenyusuran/article/details/28901727  

雲端計算學習筆記002---雲端計算的理解及介紹,google雲端計算平臺實現原理

什麼是雲端計算: l  說的明白一點: •    雲端計算其實就更大限度的發揮網路的資源。 •    那為什麼叫雲,為什麼不到網際網路計算?Cloud l 大多數計算的網路拓撲圖都用一塊“雲”來表示網際網路。於是就形成了雲端計算的說法 l  狹義:是指IT基礎設施的交付和

並行程式設計報告(MPI平行計算π,實現mandelbrot集)

一.熟悉MPI並行程式設計環境 1.硬體 電腦:HP暗夜精靈 記憶體:4G 處理器:ntel® Core™ i5-6300HQ CPU @ 2.30GHz × 4 顯示卡:NVIDIA 960M 2.軟體 系統:Ubuntu 16.04

平行計算作業補充(Python實現

前情提要 之前由於平行計算的作業被視為類同或抄襲網路,正好最近學習Python,於是剛好在這裡作一個補充,新方法採用Python實現並行,一來學習用,二來為了完成作業。 Python平行計算 Python在平行計算方面使用的是GIL(Global Interp

Java 8-stream實現原理分析(一)

背景介紹 Java 8中引入了lambda和stream,極大的簡化了程式碼編寫工作,但是簡單的程式碼為何能實現如何豐富的功能,其背後又是如何實現的呢? Spliterator和Iterator Iterator Iterator是Ja

巧用zookeeper實現分散式平行計算

雲端計算的技術話題中少不了“分散式”,“平行計算” 這些個關鍵詞,我們知道硬體擴充套件的條件(​Scale-up)始終是有限制的,將計算分散到網路中更多機器的CPU上提供更高的計算效能(Scale-out),並在這基礎上能將計算同時進行,那麼總體計算瓶頸會減小,計算的效能會

python--利用concurrent.futures 來實現真正的平行計算

    由於python的全域性解釋鎖(GIL)使得我們無法使用執行緒進行真正的平行計算,因此,我們把總計算量分配到多個獨立的任務中,並在多個CPU和欣賞同時執行任務是很難實現的。    為解決該類問題,提高程式碼執行速率,我們嘗試可以引入concurrent.futures

win7之後的系統的CPU佔用計算原理實現

        經過比對,發現procexp和工作管理員在計算程序cpu佔用上面存在很大的差異,經過研究發現,procexp顯示的是正確的,而工作管理員顯示的是錯誤的,工作管理員是用以前老的方式計算的。         新的cpu計算原理應該是: 程序CPU佔用率 =

平行計算--Java--求π並行實現

 publicvoid run()    {     int i;     step=1.0/(double)num_steps_wy;     for(i=start_wy;i<num_steps_wy;i+=2)     {      x=(i+0.5)*st

Linux虛擬機器中配置多節點MPI實現平行計算完整版

虛擬機器CentOS中配置MPI多節點平行計算完整版,這是一個新手教程,從一個空白的CentOS到能正常執行的基礎環境,儘量詳細寫了每個步驟 我的環境如下: 宿主機:Windows 10 Pro 虛擬機器:VMware 12 + CentOS 6.6 一、 虛擬機器網路配置

Scala中Stream的應用場景及其實現原理

假設一個場景 需要在50個隨機數中找到前兩個可以被3整除的數字。 聽起來很簡單,我們可以這樣來寫: 1 2 3 4 5 6 7 8 9 def randomList = (1 to 50).map(_ =>