JDK 8 中的 Streams API 詳解
Stream API介紹
Java 8引入了全新的Stream API,此Stream與java I/O包裡的InputStream和OutputStream是完全不同的概念,它不同於StAX對XML解析的Stream,也不同於Amazon Kinesis對大資料實時處理的Stream。Stream API更像具有Iterable的集合類,但行為和集合類又有所不同,它是對集合物件功能的增強,專注於對集合物件進行各種非常便捷、高效的聚合操作或大批量資料操作。
Stream API引入的目的在於彌補Java函數語言程式設計的缺陷。對於很多支援函數語言程式設計的語言,map()、reduce()基本上都內建到語言的標準庫中了,不過,Java 8的Stream API總體來講仍然是非常完善和強大,足以用很少的程式碼完成許多複雜的功能。
Java 8的Stream API充分利用Lambda表示式的特性,極大的提高程式設計效率和程式可讀性。同時它提供序列和並行兩種模式進行匯聚操作,併發模式能夠充分利用多核處理器的優勢,使用fork/join並行方式來拆分任務和加速處理過程。通常編寫並行程式碼很難而且容易出錯,但使用Stream API無需編寫一行多執行緒的程式碼,就可以很方便地寫出高效能的併發程式。
在Stream API中,一個流基本上代表一個元素序列,Stream API提供了豐富的操作函式來計算這些元素。以前我們在開發業務應用時,通常很多操作的實現是這樣做的:我們使用迴圈對集合做遍歷,針對集合中的元素實現各種操作,定義各種變數來實現目的,這樣我們就得到了一大堆醜陋的順序程式碼。
如果我們使用Stream API做同樣的事情,使用Lambda表示式和其它函式進行抽象,可以使得程式碼更易於理解、更為乾淨。有了這些抽象,還可以做一些優化,比如實現並行等。
什麼是聚合操作
在傳統的 J2EE 應用中,Java 程式碼經常不得不依賴於關係型資料庫的聚合操作來完成諸如:
- 客戶每月平均消費金額
- 最昂貴的在售商品
- 本週完成的有效訂單(排除了無效的)
- 取十個資料樣本作為首頁推薦
這類的操作。
但在當今這個資料大爆炸的時代,在資料來源多樣化、資料海量化的今天,很多時候不得不脫離 RDBMS,或者以底層返回的資料為基礎進行更上層的資料統計。而 Java 的集合 API 中,僅僅有極少量的輔助型方法,更多的時候是程式設計師需要用 Iterator 來遍歷集合,完成相關的聚合應用邏輯。這是一種遠不夠高效、笨拙的方法。在 Java 7 中,如果要發現 type 為 grocery 的所有交易,然後返回以交易值降序排序好的交易 ID 集合,我們需要這樣寫:
清單 1. Java 7 的排序、取值實現
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
List< Transaction >
groceryTransactions = new Arraylist<>();
for(Transaction
t: transactions){
if(t.getType()
== Transaction.GROCERY){
groceryTransactions.add(t);
}
}
Collections.sort(groceryTransactions,
new Comparator(){
public
int compare(Transaction t1, Transaction t2){
return
t2.getValue().compareTo(t1.getValue());
}
});
List< Integer >
transactionIds = new ArrayList<>();
for(Transaction
t: groceryTransactions){
transactionsIds.add(t.getId());
}
|
而在 Java 8 使用 Stream,程式碼更加簡潔易讀;而且使用併發模式,程式執行速度更快。
清單 2. Java 8 的排序、取值實現
1 2 3 4 5 |
List< Integer >
transactionsIds = transactions.parallelStream().
filter(t
-> t.getType() == Transaction.GROCERY).
sorted(comparing(Transaction::getValue).reversed()).
map(Transaction::getId).
collect(toList());
|
Stream 總覽
什麼是流
Stream 不是集合元素,它不是資料結構並不儲存資料,它是有關演算法和計算的,它更像一個高階版本的 Iterator。原始版本的 Iterator,使用者只能顯式地一個一個遍歷元素並對其執行某些操作;高階版本的 Stream,使用者只要給出需要對其包含的元素執行什麼操作,比如 “過濾掉長度大於 10 的字串”、“獲取每個字串的首字母”等,Stream 會隱式地在內部進行遍歷,做出相應的資料轉換。
Stream 就如同一個迭代器(Iterator),單向,不可往復,資料只能遍歷一次,遍歷過一次後即用盡了,就好比流水從面前流過,一去不復返。
而和迭代器又不同的是,Stream 可以並行化操作,迭代器只能命令式地、序列化操作。顧名思義,當使用序列方式去遍歷時,每個 item 讀完後再讀下一個 item。而使用並行去遍歷時,資料會被分成多個段,其中每一個都在不同的執行緒中處理,然後將結果一起輸出。Stream 的並行操作依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。Java 的並行 API 演變歷程基本如下:
- 1.0-1.4 中的 java.lang.Thread
- 5.0 中的 java.util.concurrent
- 6.0 中的 Phasers 等
- 7.0 中的 Fork/Join 框架
- 8.0 中的 Lambda
Stream 的另外一大特點是,資料來源本身可以是無限的。
流的構成
當我們使用一個流的時候,通常包括三個基本步驟:
獲取一個數據源(source)→ 資料轉換→執行操作獲取想要的結果,每次轉換原有 Stream 物件不改變,返回一個新的 Stream 物件(可以有多次轉換),這就允許對其操作可以像鏈條一樣排列,變成一個管道,如下圖所示。
圖 1. 流管道 (Stream Pipeline) 的構成
有多種方式生成 Stream Source:
- 從 Collection 和陣列
-
- Collection.stream()
- Collection.parallelStream()
- Arrays.stream(T array) or Stream.of()
- java.io.BufferedReader.lines()
- 靜態工廠
- java.util.stream.IntStream.range()
- java.nio.file.Files.walk()
- 自己構建
-
- java.util.Spliterator
- Random.ints()
- BitSet.stream()
- Pattern.splitAsStream(java.lang.CharSequence)
- JarFile.stream()
流的操作型別分為兩種:
- Intermediate:一個流可以後面跟隨零個或多個 intermediate 操作。其目的主要是開啟流,做出某種程度的資料對映/過濾,然後返回一個新的流,交給下一個操作使用。這類操作都是惰性化的(lazy),就是說,僅僅呼叫到這類方法,並沒有真正開始流的遍歷。
- Terminal:一個流只能有一個 terminal 操作,當這個操作執行後,流就被使用“光”了,無法再被操作。所以這必定是流的最後一個操作。Terminal 操作的執行,才會真正開始流的遍歷,並且會生成一個結果,或者一個 side effect。
在對於一個 Stream 進行多次轉換操作 (Intermediate