1. 程式人生 > >Java8函數式編程(二):類比Spark RDD算子的Stream流操作

Java8函數式編程(二):類比Spark RDD算子的Stream流操作

編程方式 min 也有 ffffff 種類 spa 封裝 方法 都是

1 Stream流

對集合進行叠代時,可調用其iterator方法,返回一個iterator對象,之後便可以通過該iterator對象遍歷集合中的元素,這被稱為外部叠代(for循環本身正是封裝了其的語法糖),其示意圖如下:

技術分享圖片

除此之外,還有內部叠代方法,這正是這裏要說明的集合的stream()方法返回的Stream對象的一系列操作,比如,要統計一個數字列表的偶數元素個數,當使用Stream對象的操作時,如下:

List<Integer> list = new ArrayList<Integer>(){{
    add(1);
    add(2);
    add(3);
}};

long count = list.stream().filter(num -> num % 2 == 0).count();
System.out.println(count);  // 1

其示意圖如下:

技術分享圖片

上面提供的例子,比如filter,其參數為一個lambda表達式,所以Stream其實是用函數式編程方式在集合類上進行復雜操作的工具。

2 Stream流操作與Spark RDD算子

其實有Spark經驗的人開始使用Stream流操作時,會有似曾相識的感覺,好像一切都那麽熟悉。

參考Spark RDD算子介紹的文章:《Spark RDD算子實戰》https://blog.51cto.com/xpleaf/2108481

下面從操作對象(名詞)和對象操作(動詞)兩個角度來簡單對比一下。

2.1 操作對象

Spark RDD算子的操作對象是RDD,中文意思是彈性分布式數據集,對用戶而言,它就是類似集合一樣的對象,裏面存的是數據,只是底層它的數據可能分布於各個節點的各個partition,但不管怎樣,其本質還是數據集。

Stream流操作的操作對象是集合,集合本質也是一種數據集,只是相比RDD,它是單機的。

2.2 對象操作

Spark RDD算子有兩種類型,分別是Transformation算子和Action算子,前者是延遲計算的,它僅僅記住了數據的邏輯操作,並沒有真正執行,後者是真正觸發Transformation算子的計算。

Stream流操作也有兩種類型,分別是惰性求值和及早求值(個人覺得這翻譯不好),前者也只是記錄了惰性求值的邏輯操作,後者才是真正觸發操作。

可以看到其兩者是非常相似的,一個是對分布式數據進行的各種操作,一個是單機數據進行的各種操作,把計算分為延遲計算和觸發計算兩種,好處是顯而易見的:當對數據集進行多次邏輯操作時,有可能叠代只需要一次就可能完成,這樣真正觸發計算時,一次叠代帶來的性能提升是顯著的,比如對於過濾和計算這兩個操作(前面計算偶數的操作),在一次叠代中就能夠完成。

當然,不僅類型相似,其本身提供的操作的名稱而言,都是相似的,有些東西真的是通用的。

3 常用Stream流操作

每個操作都用一個通俗易懂的例子來進行說明。

3.1 及早求值操作

3.1.1 collect(toList())

其作用是將Stream流中的元素收集起來,形成List、Set或Map等。

List<Integer> list = Stream.of(1, 2, 3).collect(Collectors.toList());

System.out.println(list);   // [1, 2, 3]

1.Stream.of()方法用於方便地生成Stream流;

2.Collectors還有toSet()、toMap()等方法,詳見其API。

3.1.2 forEach(Consumer)

對集合中的每個元素進行操作,其參數是Consumer<T>函數接口。

Consumer<Integer> printNum = System.out::print;
Stream.of(1, 2, 3).forEach(printNum);   // 123

System.out::print表示使用System.out類中的print方法,相當於lambda表達式:element -> System.out.print(element);

上面的例子也可以一步到位:

Stream.of(1, 2, 3).forEach(System.out::print);  // 123

3.1.3 max和min

其參數為Comparator對象,返回一個Optional對象,Optional說明其結果可能有,也可能沒有(比如對空值的Stream流操作時)。

// 計算數值流中的最大值
Optional<Integer> maxOptional = Stream.of(1, 2, 3).max(Comparator.comparing(num -> num));
System.out.println(maxOptional.get());  // 3

// 找出字符串流中長度最小的字符串
Optional<String> minOptional = Stream.of("a", "ab", "abc").min(Comparator.comparing(String::length));
System.out.println(minOptional.get());  // a

另外,其確實是及早求值操作,可以驗證一下:

Stream.of(1, 2, 3).max(Comparator.comparing(num -> {
    System.out.println(num);
    return num;
}));

輸出:

1
2
2
3

3.2 惰性求值操作

3.2.1 map

其參數為Function&lt;T,R&gt;,用於將Stream流中的值轉換為另外一種流。

// 將字母轉換為大寫
Stream.of("a", "b", "hello")
    .map(String::toUpperCase)
    .forEach(element -> System.out.print(element + " "));  // A B HELLO 

3.2.2 filter

其參數為Predicate&lt;T&gt;,過濾Stream流中的元素。

// 找出偶數
List<Integer> list = Stream.of(1, 2, 3).filter(num -> num % 2 == 0).collect(Collectors.toList());

System.out.println(list);   // [2]

3.2.3 flatMap

其參數為Function&lt;T,R&gt;,只是此時R限定為Stream,將Stream流中的值轉換為更多的流。

// 找出字符串中的單詞
List<String> list = Stream.of("hello you", "hello me")
    .flatMap(line -> Arrays.stream(line.split(" "))).collect(Collectors.toList());

System.out.println(list);   // [hello, you, hello, me]

是不是感覺跟Spark的wordcount例子有點像。

3.2.4 reduce

其參數為BinaryOperator&lt;T&gt;,返回一個Optional對象,Optional說明其結果可能有,也可能沒有(比如對空值的Stream流操作時,並且沒有指定初始值),用於歸約操作。

// 求和
Integer res = Stream.of(1, 2, 3).reduce((acc, element) -> acc + element).get();

// 指定初始值6後,Stream的reduce操作結果肯定有值的,因此其返回的不是Optional,而直接是6所屬的類型,即Integer
Integer res2 = Stream.of(1, 2, 3).reduce(6, (acc, element) -> acc + element);

System.out.println(String.format("res: %s, res2: %s", res, res2));  // res: 6, res2: 12

4 參考

Java 8 Lambdas,Richard Warburton著(O’Reilly,2014)》

Java8函數式編程(二):類比Spark RDD算子的Stream流操作