Fork/Join框架與Java8 Stream API 之並行流的速度比較
Fork/Join 框架有特定的ExecutorService和執行緒池構成。ExecutorService可以執行任務,並且這個任務會被分解成較小的任務,它們從執行緒池中被fork(被不同的執行緒執行)出來,在join(即它的所有的子任務都完成了)之前會一直等待。
Fork/Join 使用了任務竊取來最小化執行緒的徵用和開銷。執行緒池中的每條工作執行緒都有自己的雙端工作佇列並且會將新任務放到這個佇列中去。它從佇列的頭部讀取任務。如果佇列是空的,工作執行緒就嘗試從另外一個佇列的末尾獲取一個任務。竊取操作不會很頻繁,因為工作執行緒會採用後進先出的順序將任務放入它們的佇列中,同時工作項的規模會隨著問題分割成子問題而變小。你一開始把任務交給一箇中心的工作執行緒,之後它會繼續將這個任務分解成更小的任務。最終所有的工作執行緒都只會設計很少量的同步操作。
Stream介紹(引)
Stream 作為 Java 8 的一大亮點,它與 java.io 包裡的 InputStream 和 OutputStream 是完全不同的概念。它也不同於 StAX 對 XML 解析的 Stream,也不是 Amazon Kinesis 對大資料實時處理的 Stream。Java 8 中的 Stream 是對集合(Collection)物件功能的增強,它專注於對集合物件進行各種非常便利、高效的聚合操作(aggregate operation),或者大批量資料操作 (bulk data operation)。Stream API 藉助於同樣新出現的 Lambda 表示式,極大的提高程式設計效率和程式可讀性。同時它提供序列和並行兩種模式進行匯聚操作,併發模式能夠充分利用多核處理器的優勢,使用 fork/join 並行方式來拆分任務和加速處理過程。通常編寫並行程式碼很難而且容易出錯, 但使用 Stream API 無需編寫一行多執行緒的程式碼,就可以很方便地寫出高效能的併發程式。所以說,Java 8 中首次出現的 java.util.stream 是一個函式式語言+多核時代綜合影響的產物。
Stream 不是集合元素,它不是資料結構並不儲存資料,它是有關演算法和計算的,它更像一個高階版本的 Iterator。原始版本的 Iterator,使用者只能顯式地一個一個遍歷元素並對其執行某些操作;高階版本的 Stream,使用者只要給出需要對其包含的元素執行什麼操作,比如 “過濾掉長度大於 10 的字串”、“獲取每個字串的首字母”等,Stream 會隱式地在內部進行遍歷,做出相應的資料轉換。
Stream 就如同一個迭代器(Iterator),單向,不可往復,資料只能遍歷一次,遍歷過一次後即用盡了,就好比流水從面前流過,一去不復返。
而和迭代器又不同的是,Stream 可以並行化操作,迭代器只能命令式地、序列化操作。顧名思義,當使用序列方式去遍歷時,每個 item 讀完後再讀下一個 item。而使用並行去遍歷時,資料會被分成多個段,其中每一個都在不同的執行緒中處理,然後將結果一起輸出。Stream 的並行操作依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。
所以說,實際上Stream並行流實際上就是一個幫你fork/join 後的API,為了驗證效率,我編寫了一個對1000_000個數進行排序的程式
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; public class ParallelMergeSort { public static void main(String[] args) { final int SIZE = 10000000; int[] list1 = new int[SIZE]; int[] list2 = new int[SIZE]; Integer[] list3 = new Integer[SIZE]; for (int i = 0; i < list1.length; i++) { list1[i] = list2[i] = (int)(Math.random() * 10000000); list3[i] = list1[i]; } long startTime = System.currentTimeMillis(); parallelMergeSort(list1); long endTime = System.currentTimeMillis(); System.out.println("Parallel time with " + Runtime.getRuntime().availableProcessors() + " processors is " + (endTime - startTime) + " milliseconds"); startTime = System.currentTimeMillis(); MergeSort.mergeSort(list2); endTime = System.currentTimeMillis(); System.out.println("Sequent time is " + (endTime - startTime) + " milliseconds"); List<Integer> tmp = new ArrayList<Integer>(); Collections.addAll(tmp, list3); startTime = System.currentTimeMillis(); IntStream tmp1 = tmp.stream().parallel().mapToInt(Integer::intValue).sorted(); endTime = System.currentTimeMillis(); System.out.println("ParallelStream time is " + (endTime - startTime) + " milliseconds"); tmp1.limit(100).forEachOrdered(System.out::println); /* for(int i = 0; i < 100; i++) { System.out.println(tmp2.get(i)); }*/ } public static void parallelMergeSort(int[] list) { RecursiveAction mainTask = new SortTask(list); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(mainTask); } public static class SortTask extends RecursiveAction{ /** * */ private static final long serialVersionUID = 1L; private final int THRESHOLD = 500; private int[] list; SortTask(int[] list){ this.list = list; } @Override protected void compute() { if (list.length < THRESHOLD) java.util.Arrays.sort(list); else { //Obtain the first half int[] firstHalf = new int[list.length / 2]; System.arraycopy(list, 0, firstHalf, 0, list.length / 2); //Obtain the second half int secondHalfLength = list.length - list.length / 2; int[] secondHalf = new int[secondHalfLength]; System.arraycopy(list, list.length /2, secondHalf, 0, secondHalfLength); //Recursively sort the two halves invokeAll(new SortTask(firstHalf), new SortTask(secondHalf)); //Merge firstHalf with second MergeSort.merge(firstHalf, secondHalf, list); } } } public static class MergeSort { /** The method for sorting the numbers */ public static void mergeSort(int[] list) { if (list.length > 1) { // Merge sort the first half int[] firstHalf = new int[list.length / 2]; System.arraycopy(list, 0, firstHalf, 0, list.length / 2); mergeSort(firstHalf); // Merge sort the second half int secondHalfLength = list.length - list.length / 2; int[] secondHalf = new int[secondHalfLength]; System.arraycopy(list, list.length / 2, secondHalf, 0, secondHalfLength); mergeSort(secondHalf); // Merge firstHalf with secondHalf into list merge(firstHalf, secondHalf, list); } } /** Merge two sorted lists */ public static void merge(int[] list1, int[] list2, int[] temp) { int current1 = 0; // Current index in list1 int current2 = 0; // Current index in list2 int current3 = 0; // Current index in temp while (current1 < list1.length && current2 < list2.length) { if (list1[current1] < list2[current2]) temp[current3++] = list1[current1++]; else temp[current3++] = list2[current2++]; } while (current1 < list1.length) temp[current3++] = list1[current1++]; while (current2 < list2.length) temp[current3++] = list2[current2++]; } } }
程式碼可以看到,利用三種方法,對隨機生成的 int 資料排序
第一種是自己編寫的fork/join利用二分法排序
第二種是單執行緒下的二分法排序
第三種是並行流的排序
為了驗證並行流是否排序正確,輸出流前100個數
結果如圖:
但是這是為沒有收集器的情況,並行流很快的完成並且得到IntStream,加上收集器後:
可以看出,排序很快完成,在最後的型別轉換上花費了大量的時間,
而根據Stream 的介紹,實驗fork/join方法完成的時間應該不會與並行流差距太大,實際上,實驗中編寫的程式碼在fork分解階段和join階段花費了大量時間,遠不如直接使用API快速
但是如果正確使用fork/join框架的話也不會很慢
但是相比單執行緒已經遠遠提升了效率
&n