1. 程式人生 > >Java函數式編程之Stream流編程

Java函數式編程之Stream流編程

mage nds adf iter ren amp prope c4c int()

Stream流編程-概念

概念:

這個Stream並非是I/O流裏的Stream,也不是集合元素,更不是數據結構,它是JDK1.8帶來的新特性,是一種用函數式編程在集合類上進行復雜操作的工具。Stream就像工廠裏的流水線一樣,有輸入和輸出。Stream不可以重復遍歷集合裏面的數據,數據在Stream裏面就像水在渠道裏面一樣,流過了就一去不復返。

簡而言之,Stream是以內部叠代的方式處理集合數據的操作,內部叠代可以將更多的控制權交給集合類。Stream 和 Iterator 的功能類似,只不過 Iterator 是以外部叠代的形式處理集合的數據。

在JDK1.8以前,對集合的操作需要寫出處理的過程,如在集合中篩選出滿足條件的數據,需要一 一遍歷集合中的每個元素,再把每個元素逐一判斷是否滿足條件,最後將滿足條件的元素保存返回。而Stream 對集合篩選的操作提供了一種更為便捷的操作,只需將實現函數接口的篩選條件作為參數傳遞進來,Stream會自行操作並將合適的元素同樣以 stream 的方式返回,最後進行接收即可。

內部叠代與外部叠代:

使用for循環等利用Iterator進行叠代操作的,我們都叫做外部叠代,而使用stream流進行叠代操作的叫做內部叠代。內部叠代最明顯的好處就是當數量很大的情況下,我們不需要對數據進行拆分,並且可以通過調用指定函數實現並行遍歷。

外部叠代示例代碼:

int[] nums = {1, 2, 3};
// 循環屬於外部叠代
int sum = 0;
for (int num : nums) {
    sum += num;
}
System.out.println("計算結果為:" + sum);  // 計算結果為:6

使用stream內部叠代示例代碼:

int[] nums = {1, 2, 3};
// 使用stream進行內部叠代
int sum = IntStream.of(nums).sum();
System.out.println("計算結果為:" + sum);  // 計算結果為:6

使用stream流操作時的一些概念:

  • 中間操作:中間操作的結果是刻畫、描述了一個Stream,並返回了這個Stream,但此操作並沒有產生一個新集合,這種操作也叫做惰性求值方法
  • 終止操作:從Stream中得到最終的結果
  • 惰性求值:終止沒有調用的情況下,中間操作不會執行

如何區分中間操作和終止操作呢?可以根據操作的返回值類型判斷,如果返回值是Stream,則該操作為中間操作。如果返回值不是Stream或者為空,則該操作是終止操作。

如下圖所示,前兩個操作是中間操作,只有最後一個操作是終止操作:
技術分享圖片

可以形象地理解Stream的操作是對一組粗糙的工藝品原型(即對應的 Stream 數據源)進行加工成顏色統一的工藝品(即最終得到的結果),第一步篩選出合適的原型(即對應Stream的 filter 的方法),第二步將這些篩選出來的原型工藝品上色(對應Stream的map方法),第三步取下這些上好色的工藝品(即對應Stream的 collect(toList())方法)。在取下工藝品之前進行的操作都是中間操作,可以有多個或者0個中間操作,但每個Stream數據源只能有一次終止操作,否則程序會報錯。

接下來,我們通過一個簡單的示例來演示以上所提到的幾個概念,代碼如下:

package org.zero01.example.demo;

import java.util.stream.IntStream;

public class StreamDemo {

    public static void main(String[] args) {
        int[] nums = {1, 2, 3};
        // IntStream創建數字流,of則是輸入一個數組,這裏的map就是中間操作(返回stream流的操作),sum則是終止操作
        int sum = IntStream.of(nums).map(i -> i * 2).sum();
        System.out.println("計算結果為:" + sum);

        System.out.println("惰性求值就是終止操作沒有調用的情況下,中間操作不會執行");
        IntStream.of(nums).map(StreamDemo::doubleNum);
    }

    public static int doubleNum(int i) {
        System.out.println("doubleNum 方法執行了");
        return i * 2;
    }
}

運行以上代碼,控制臺輸出結果如下,可以看到由於惰性求值的原因,doubleNum方法沒有被調用:

計算結果為:12
惰性求值就是終止操作沒有調用的情況下,中間操作不會執行

流的創建

對一個流完整的操作過程:流的創建 -> 中間操作 -> 終止操作。流的創建是第一步,而流的常見創建方式如下表:
技術分享圖片

代碼示例如下:

public static void main(String[] args) {
    List<String> list = new ArrayList<>();

    // 從集合創建流
    list.stream();
    // 從集合創建並行流
    list.parallelStream();

    // 從數組創建流
    Arrays.stream(new int[]{1, 2, 3, 4, 5});

    // 創建數字流
    IntStream.of(1, 2, 3, 4, 5);
    // 創建1-10的數字流
    IntStream.rangeClosed(1, 10);

    // 使用random創建一個無限流,需要調用limit來限制大小,否則會報錯
    new Random().ints().limit(10);

    // 自己創建流
    Random random = new Random();
    Stream.generate(() -> random.nextInt()).limit(20);
}

流的中間操作

然後我們來看看流的中間操作,中間操作分類兩類,一類是無狀態操作,一類則是有狀態操作。如下表:
技術分享圖片

無狀態操作:

  • 當前操作與其他操作沒有依賴關系。

有狀態操作:

  • 當前操作與其他操作有依賴關系 (一般而言有狀態操作都是有2個參數傳入) 。例如排序操作,就需要等待其他操作計算完成後才能進行一個最終的排序。

共同點:

  • 不論是有狀態操作還是無狀態操作,最終都會返回一個Stream流,可以繼續使用鏈式的操作調用下去

代碼示例如下:

public static void main(String[] args) {
    String str = "my name is zero";

    // 把每個單詞的長度打印出來
    System.out.println("---------------map---------------");
    Stream.of(str.split(" ")).map(String::length).forEach(System.out::println);

    // 只打印長度大於2的單詞
    System.out.println("---------------filter---------------");
    Stream.of(str.split(" ")).filter(s -> s.length() > 2)
            .map(String::length).forEach(System.out::println);

    // flatMap 適合用於A元素下有B屬性,並且這個B屬性是個集合,最終得到所有的A元素裏面的所有B屬性集合
    // 這裏調用了 boxed() 方法的原因是intStream\LongStream等數字流並非是Stream的子類,所以需要裝箱
    System.out.println("---------------flatMap---------------");
    Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
            .forEach(integer -> System.out.println((char) integer.intValue()));

    // peek 一般用於debug,類似於forEach,不同的是peek是中間操作而forEach是終止操作
    System.out.println("---------------peek---------------");
    Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);

    // limit 主要用於限制無限流,我們可以結合filter來產生特定區間的隨機數
    System.out.println("---------------limit---------------");
    new Random().ints().filter(i -> i > 100 && i < 1000).limit(10).forEach(System.out::println);
}

流的終止操作

接下來我們看看流的終止操作,同樣的,終止操作也分類兩類,分別是短路操作和非短路操作。如下表:
技術分享圖片

短路操作:

  • 短路操作是一種無需等待所有的結果計算完,就可以結束流的操作。例如從一組數據中,得到指定的某個數據就結束流,這種就是短路操作

非短路操作:

  • 非短路操作則反之,需等待所有的結果計算完成後才結束流。例如遍歷一個集合中的所有元素,或將一組數據轉換成集合、數組等操作就是非短路操作。

具體代碼及註釋,請參考如下示例:

public static void main(String[] args) {
    String str = "my name is zero";

    // 通常會在使用並行流的時候使用forEachOrdered,forEachOrdered可以在並行的情況下保證元素順序的一致
    str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
    System.out.println();
    // 而forEach則無法在並行的情況下保證元素順序的一致
    str.chars().parallel().forEach(i -> System.out.print((char) i));
    System.out.println();

    // collect屬於收集器,使用可以將放入流裏面的數據收集成集合類型
    List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList());
    System.out.println(list);

    // reduce用於縮減、歸約數據集,我們可以使用reduce來將數組拼接成一個字符串
    Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2);
    System.out.println(letters.orElse(""));

    // 帶初始化值的reduce,這樣我們就無需通過Optional去判斷空值了
    String reduce = Stream.of(str.split(" ")).reduce("", (s1, s2) -> s1 + "|" + s2);
    System.out.println(reduce);

    // 使用reduce計算字符串長度的總和
    Integer lengthCount = Stream.of(str.split(" ")).map(String::length).reduce(0, (s1, s2) -> s1 + s2);
    System.out.println(lengthCount);

    // 使用max可以通過傳入比較器在一組數據中取出最大值
    Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length));
    System.out.println(max.orElse(""));

    // 使用findFirst拿出第一個元素
    OptionalInt first = new Random().ints().findFirst();
    System.out.println(first.orElse(0));

    // 使用findAny隨機拿出一個元素
    OptionalInt any = new Random().ints().findAny();
    System.out.println(any.orElse(0));

    // 使用allMatch匹配所有的元素是否都為zero
    boolean is = Stream.of(str.split(" ")).allMatch("zero"::equals);
    System.out.println(is);
}

並行流

以上的例子中大多數創建的都是單線程流,其實我們可以創建多線程並行的Stream流,即並行流。使用並行流時,我們並不需關心多線程執行以及任務拆分等問題,因為Stream都已經幫我們管理好了,所以用起來也是很方便的。

我們先來看一個不使用並行流的示例,以下代碼會每隔3秒打印一行信息:

public static void main(String[] args) {
    // 不使用並行流
    IntStream.range(1, 100).peek(ParallelStreamDemo::debug).sum();
}

public static void debug(int i) {
    System.out.println("debug" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

運行結果如下:
技術分享圖片

而使用並行流後,會發現同時打印了多行信息。代碼如下:

public static void main(String[] args) {
    // 不使用並行流
    IntStream.range(1, 100).parallel().peek(ParallelStreamDemo::debug).sum();
}

public static void debug(int i) {
    System.out.println("debug" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

至於同時會打印多少行,默認取決於cpu的核心數量,例如我電腦cpu有4個核心,所以會同時打印四行,也就是說開啟了四個線程。運行結果如下:
技術分享圖片

通過以上的例子,我們得知可以調用parallel方法來創建並行流。那麽同樣的,也可以調用類似的方法創建串行流,這個方法就是sequential。如果現在有一個需求:當進行第一步操作時需使用並行流,而第二步操作則需使用串行流。那麽我們可以通過結合這兩個方法來實現這個需求嗎?我們來看一個簡單的例子就知道了,代碼如下:

public static void main(String[] args) {
    IntStream.range(1, 100)
            // 1.調用parallel產生並行流
            .parallel().peek(ParallelStreamDemo::debug)
            // 2.調用sequential產生串行流
            .sequential().peek(ParallelStreamDemo::debug2).sum();
}

public static void debug(int i) {
    System.out.println("debug" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public static void debug2(int i) {
    System.err.println("debug2" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

運行結果如下:
技術分享圖片

從運行結果可以看到,運行過程始終是串行的,是一行行打印的。所以可以得出一個結論:多次調用 parallel/sequential方法,會以最後一次調用的為準,自然就無法實現以上所提到的需求了。


接下來我們看看並行流裏線程相關的東西,在上文中,我們提到了默認情況下,並行流開啟的線程數量取決於cpu的核心數量。那麽並行流使用的是哪個線程池?如何設置開啟的線程數量?

先來回答第一個問題,並行流裏使用的線程池是java.util.concurrent.ForkJoinPool,這一點可以直接在方法裏打印線程名稱得知,所以這裏就不演示了。對ForkJoinPool感興趣的話,可以查閱fork/join相關的概念。

關於第二個設置線程數量的問題,則是需要在創建並行流之前,設置ForkJoinPool裏parallelism屬性的值,例如我要開啟20個線程,具體代碼如下:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

還有一點需要註意的就是,所有的並行流默認都將會使用同一個ForkJoinPool線程池,若我們的並行任務比較多的話,可能會出現任務阻塞的情況。如果想要防止一些比較關鍵的任務出現阻塞的情況,則需要自行創建線程池去處理這些任務。如下示例:

public static void main(String[] args) {
    // 使用自己創建的線程池,不使用默認線程池,防止任務阻塞
    ForkJoinPool forkJoinPool = new ForkJoinPool(20);
    forkJoinPool.submit(() -> IntStream.range(1, 100).parallel().peek(ParallelStreamDemo::debug).sum());
    forkJoinPool.shutdown();  // 關閉線程池

    // 防止主線程提前結束
    synchronized (forkJoinPool) {
        try {
            forkJoinPool.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

收集器

本小節我們來看一下收集器相關的東西,收集器就是將流處理完後的數據收集起來,例如將數據收集到一個集合裏,或者對數據求和、拼接成字符串等行為都屬於收集器。

以下使用一組例子來演示一下收集器的常見使用方式。首先定義一個Student類以及相關的枚舉類,代碼如下:

// ...省略getter/setter以及全參/無參構造函數...
public class Student {
    private String name;
    private int age;
    private Gender gender;
    private Grade grade;
}

/**
 * 性別
 */
enum Gender {
    MALE, FEMALE
}

/**
 * 班級
 */
enum Grade {
    ONE, TWO, THREE, FOUR;
}

使用收集器的示例代碼如下:

public static void main(String[] args) {
    // 測試數據
    List<Student> students = Arrays.asList(
            new Student("小明", 10, Gender.MALE, Grade.ONE),
            new Student("大明", 9, Gender.MALE, Grade.THREE),
            new Student("小白", 8, Gender.FEMALE, Grade.TWO),
            new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
            new Student("小紅", 7, Gender.FEMALE, Grade.THREE),
            new Student("小黃", 13, Gender.MALE, Grade.ONE),
            new Student("小青", 13, Gender.FEMALE, Grade.THREE),
            new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
            new Student("小王", 6, Gender.MALE, Grade.ONE),
            new Student("小李", 6, Gender.MALE, Grade.ONE),
            new Student("小馬", 14, Gender.FEMALE, Grade.FOUR),
            new Student("小劉", 13, Gender.MALE, Grade.FOUR));

    // 得到所有學生的年齡列表
    List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toList());
    // 可以通過toCollection指定集合實現類型
    // List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toCollection(LinkedList::new));
    System.out.println("所有學生的年齡列表: " + ages);

    // 統計匯總信息
    IntSummaryStatistics ageSummaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
    System.out.println("年齡匯總信息: " + ageSummaryStatistics);

    // 分塊-按照規則把數據分成兩塊,這裏按性別將學生數據分為兩塊
    Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
    // 這裏使用了Apache集合工具類進行打印
    MapUtils.verbosePrint(System.out, "男女學生列表: ", genders);

    // 分組-按照規則把數據分為多組數據,這裏按班級將學生數據進行分組
    Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade));
    MapUtils.verbosePrint(System.out, "學生班級列表: ", grades);

    // 統計每個分組裏的數據-統計所有班級裏學生的數量
    Map<Grade, Long> gradesCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
    MapUtils.verbosePrint(System.out, "所有班級學生人數列表: ", gradesCount);
}

運行結果如下:

所有學生的年齡列表: [10, 9, 8, 13, 7, 13, 13, 9, 6, 6, 14, 13]
年齡匯總信息: IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女學生列表:  = 
{
    false = [Student(name=小白, age=8, gender=FEMALE, grade=TWO), Student(name=小黑, age=13, gender=FEMALE, grade=FOUR), Student(name=小紅, age=7, gender=FEMALE, grade=THREE), Student(name=小青, age=13, gender=FEMALE, grade=THREE), Student(name=小紫, age=9, gender=FEMALE, grade=TWO), Student(name=小馬, age=14, gender=FEMALE, grade=FOUR)]
    true = [Student(name=小明, age=10, gender=MALE, grade=ONE), Student(name=大明, age=9, gender=MALE, grade=THREE), Student(name=小黃, age=13, gender=MALE, grade=ONE), Student(name=小王, age=6, gender=MALE, grade=ONE), Student(name=小李, age=6, gender=MALE, grade=ONE), Student(name=小劉, age=13, gender=MALE, grade=FOUR)]
}
學生班級列表:  = 
{
    TWO = [Student(name=小白, age=8, gender=FEMALE, grade=TWO), Student(name=小紫, age=9, gender=FEMALE, grade=TWO)]
    FOUR = [Student(name=小黑, age=13, gender=FEMALE, grade=FOUR), Student(name=小馬, age=14, gender=FEMALE, grade=FOUR), Student(name=小劉, age=13, gender=MALE, grade=FOUR)]
    ONE = [Student(name=小明, age=10, gender=MALE, grade=ONE), Student(name=小黃, age=13, gender=MALE, grade=ONE), Student(name=小王, age=6, gender=MALE, grade=ONE), Student(name=小李, age=6, gender=MALE, grade=ONE)]
    THREE = [Student(name=大明, age=9, gender=MALE, grade=THREE), Student(name=小紅, age=7, gender=FEMALE, grade=THREE), Student(name=小青, age=13, gender=FEMALE, grade=THREE)]
}
所有班級學生人數列表:  = 
{
    TWO = 2
    FOUR = 3
    ONE = 4
    THREE = 3
}

Stream運行機制

通過以上幾個小節的內容,我們已經掌握了流的基本操作。但是我們對流的運行機制還不太清楚,所以本小節我們將簡單認識一下Stream的運行機制。

同樣的,我們首先來編寫一段簡單的Stream操作代碼,如下:

public static void main(String[] args) {
    Random random = new Random();
    // 隨機產生數據
    Stream<Integer> stream = Stream.generate(random::nextInt)
            // 產生500個 ( 無限流需要短路操作. )
            .limit(500)
            // 第1個無狀態操作
            .peek(s -> print("peek: " + s))
            // 第2個無狀態操作
            .filter(s -> {
                print("filter: " + s);
                return s > 1000000;
            });

    // 終止操作
    stream.count();
}

public static void print(String s) {
    // 5毫秒打印一次日誌
    System.out.println(Thread.currentThread().getName() + " > " + s);
    try {
        TimeUnit.MILLISECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

運行以上代碼,控制臺輸出如下:
技術分享圖片

1.從運行結果中可以看到,peek和filter是交替執行的,也就是說所有操作都是鏈式調用,一個元素只叠代一次

2.既然是一個鏈式的調用,那麽這條鏈是怎麽樣的呢?是如何維護的呢?我們在終止操作上打上斷點,通過debug運行。如下圖,可以看到每一個中間操作返回一個新的流,而流裏面有一個屬性sourceStage,它都指向同一個地方,就是鏈表的頭Head:
技術分享圖片

3.而Head裏指向了nextStage,nextStage裏又指向了nextStage,一直指向到鏈尾的null值。就如:Head -&gt; nextStage -&gt; nextStage -&gt; ... -&gt; null
技術分享圖片

這就是Stream裏實現鏈式調用所需的一個鏈表結構,是一條單鏈


以上的例子只有無狀態操作,如果加入有狀態操作,會發生什麽變化呢?示例代碼如下:

public static void main(String[] args) {
    Random random = new Random();
    // 隨機產生數據
    Stream<Integer> stream = Stream.generate(random::nextInt)
            // 產生500個 ( 無限流需要短路操作. )
            .limit(500)
            // 第1個無狀態操作
            .peek(s -> print("peek: " + s))
            // 第2個無狀態操作
            .filter(s -> {
                print("filter: " + s);
                return s > 1000000;
            })
            // 有狀態操作
            .sorted((i1, i2) -> {
                print("排序: " + i1 + ", " + i2);
                return i1.compareTo(i2);
            })
            // 又一個無狀態操作
            .peek(s -> print("peek2: " + s)).parallel();

    // 終止操作
    stream.count();
}

運行以上代碼,控制臺輸出如下:

main > peek: -1564323985
main > filter: -1564323985
main > peek: -779802182
main > filter: -779802182
main > peek: -498652682
main > filter: -498652682
main > 排序: 78555310, 50589406
main > 排序: 74439402, 50589406
main > 排序: 56492454, 50589406
main > 排序: 39808935, 50589406
main > 排序: 39808935, 39002482
main > peek2: 25284397
main > peek2: 29672249
main > peek2: 29800626
main > peek2: 32299397

從輸出的日誌信息可以發現,用於排序的中間操作截斷了流,並沒有像無狀態操作那樣交替執行。所以我們就可以得知有狀態操作會把無狀態操作截斷,會單獨進行處理而不會交替執行。

然後我們再加上並行流,看看並行情況下又是怎樣的,輸出的日誌信息如下:

ForkJoinPool.commonPool-worker-3 > peek: -332079048
ForkJoinPool.commonPool-worker-3 > filter: -332079048
ForkJoinPool.commonPool-worker-1 > filter: 1974510987
ForkJoinPool.commonPool-worker-4 > peek: -1727742841
ForkJoinPool.commonPool-worker-4 > filter: -1727742841
main > 排序: 58979900, 74247464
main > 排序: 58979900, 57671811
main > 排序: 53543451, 57671811
main > 排序: 53543451, 42862261
main > 排序: 43624983, 42862261
ForkJoinPool.commonPool-worker-0 > peek2: 1152454167
ForkJoinPool.commonPool-worker-2 > peek2: 1468420859
ForkJoinPool.commonPool-worker-5 > peek2: 736525554
ForkJoinPool.commonPool-worker-6 > peek2: 1×××50615

從日誌打印可以看到,排序操作依舊是main線程執行的,而其他的操作則是線程池裏的線程執行的。所以我們通過這個例子可以得知即便在並行的環境下,有狀態的中間操作不一定能並行操作。

順帶說明一下 parallel/ sequetial 這2個操作也是中間操作 (也是返回stream) ,但是區別在於它們不會創建流,,它們只修改 Head 的並行標誌,因為這兩個方法修改的是同一個地方,所以才會以最後一次調用的為準:
技術分享圖片

Java函數式編程之Stream流編程