1. 程式人生 > >JDK 8 中的 Streams API 詳解

JDK 8 中的 Streams API 詳解

Stream API介紹

Java 8引入了全新的Stream API,此Stream與java I/O包裡的InputStream和OutputStream是完全不同的概念,它不同於StAX對XML解析的Stream,也不同於Amazon Kinesis對大資料實時處理的Stream。Stream API更像具有Iterable的集合類,但行為和集合類又有所不同,它是對集合物件功能的增強,專注於對集合物件進行各種非常便捷、高效的聚合操作或大批量資料操作。

Stream API引入的目的在於彌補Java函數語言程式設計的缺陷。對於很多支援函數語言程式設計的語言,map()、reduce()基本上都內建到語言的標準庫中了,不過,Java 8的Stream API總體來講仍然是非常完善和強大,足以用很少的程式碼完成許多複雜的功能。

Java 8的Stream API充分利用Lambda表示式的特性,極大的提高程式設計效率和程式可讀性。同時它提供序列和並行兩種模式進行匯聚操作,併發模式能夠充分利用多核處理器的優勢,使用fork/join並行方式來拆分任務和加速處理過程。通常編寫並行程式碼很難而且容易出錯,但使用Stream API無需編寫一行多執行緒的程式碼,就可以很方便地寫出高效能的併發程式。

在Stream API中,一個流基本上代表一個元素序列,Stream API提供了豐富的操作函式來計算這些元素。以前我們在開發業務應用時,通常很多操作的實現是這樣做的:我們使用迴圈對集合做遍歷,針對集合中的元素實現各種操作,定義各種變數來實現目的,這樣我們就得到了一大堆醜陋的順序程式碼。

如果我們使用Stream API做同樣的事情,使用Lambda表示式和其它函式進行抽象,可以使得程式碼更易於理解、更為乾淨。有了這些抽象,還可以做一些優化,比如實現並行等。

什麼是聚合操作

在傳統的 J2EE 應用中,Java 程式碼經常不得不依賴於關係型資料庫的聚合操作來完成諸如:

  • 客戶每月平均消費金額
  • 最昂貴的在售商品
  • 本週完成的有效訂單(排除了無效的)
  • 取十個資料樣本作為首頁推薦

這類的操作。

但在當今這個資料大爆炸的時代,在資料來源多樣化、資料海量化的今天,很多時候不得不脫離 RDBMS,或者以底層返回的資料為基礎進行更上層的資料統計。而 Java 的集合 API 中,僅僅有極少量的輔助型方法,更多的時候是程式設計師需要用 Iterator 來遍歷集合,完成相關的聚合應用邏輯。這是一種遠不夠高效、笨拙的方法。在 Java 7 中,如果要發現 type 為 grocery 的所有交易,然後返回以交易值降序排序好的交易 ID 集合,我們需要這樣寫:

清單 1. Java 7 的排序、取值實
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 List<Transaction> groceryTransactions = new Arraylist<>(); for(Transaction t: transactions){ if(t.getType() == Transaction.GROCERY){ groceryTransactions.add(t); } } Collections.sort(groceryTransactions, new Comparator(){ public int compare(Transaction t1, Transaction t2){ return t2.getValue().compareTo(t1.getValue()); } }); List<Integer> transactionIds = new ArrayList<>(); for(Transaction t: groceryTransactions){ transactionsIds.add(t.getId()); }

而在 Java 8 使用 Stream,程式碼更加簡潔易讀;而且使用併發模式,程式執行速度更快。

清單 2. Java 8 的排序、取值實現
1 2 3 4 5 List<Integer> transactionsIds = transactions.parallelStream(). filter(t -> t.getType() == Transaction.GROCERY). sorted(comparing(Transaction::getValue).reversed()). map(Transaction::getId). collect(toList());

Stream 總覽

什麼是流

Stream 不是集合元素,它不是資料結構並不儲存資料,它是有關演算法和計算的,它更像一個高階版本的 Iterator。原始版本的 Iterator,使用者只能顯式地一個一個遍歷元素並對其執行某些操作;高階版本的 Stream,使用者只要給出需要對其包含的元素執行什麼操作,比如 “過濾掉長度大於 10 的字串”、“獲取每個字串的首字母”等,Stream 會隱式地在內部進行遍歷,做出相應的資料轉換。

Stream 就如同一個迭代器(Iterator),單向,不可往復,資料只能遍歷一次,遍歷過一次後即用盡了,就好比流水從面前流過,一去不復返。

而和迭代器又不同的是,Stream 可以並行化操作,迭代器只能命令式地、序列化操作。顧名思義,當使用序列方式去遍歷時,每個 item 讀完後再讀下一個 item。而使用並行去遍歷時,資料會被分成多個段,其中每一個都在不同的執行緒中處理,然後將結果一起輸出。Stream 的並行操作依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。Java 的並行 API 演變歷程基本如下:

  1. 1.0-1.4 中的 java.lang.Thread
  2. 5.0 中的 java.util.concurrent
  3. 6.0 中的 Phasers 等
  4. 7.0 中的 Fork/Join 框架
  5. 8.0 中的 Lambda

Stream 的另外一大特點是,資料來源本身可以是無限的。

流的構成

當我們使用一個流的時候,通常包括三個基本步驟:

獲取一個數據源(source)→ 資料轉換→執行操作獲取想要的結果,每次轉換原有 Stream 物件不改變,返回一個新的 Stream 物件(可以有多次轉換),這就允許對其操作可以像鏈條一樣排列,變成一個管道,如下圖所示。

圖 1. 流管道 (Stream Pipeline) 的構成
圖 1.  流管道 (Stream Pipeline) 的構成

有多種方式生成 Stream Source:

  • 從 Collection 和陣列
    • Collection.stream()
    • Collection.parallelStream()
    • Arrays.stream(T array) or Stream.of()
    從 BufferedReader
    • java.io.BufferedReader.lines()
  • 靜態工廠
  • java.util.stream.IntStream.range()
  • java.nio.file.Files.walk()
  • 自己構建
    • java.util.Spliterator
    其它
    • Random.ints()
    • BitSet.stream()
    • Pattern.splitAsStream(java.lang.CharSequence)
    • JarFile.stream()

流的操作型別分為兩種:

  • Intermediate:一個流可以後面跟隨零個或多個 intermediate 操作。其目的主要是開啟流,做出某種程度的資料對映/過濾,然後返回一個新的流,交給下一個操作使用。這類操作都是惰性化的(lazy),就是說,僅僅呼叫到這類方法,並沒有真正開始流的遍歷。
  • Terminal:一個流只能有一個 terminal 操作,當這個操作執行後,流就被使用“光”了,無法再被操作。所以這必定是流的最後一個操作。Terminal 操作的執行,才會真正開始流的遍歷,並且會生成一個結果,或者一個 side effect。

在對於一個 Stream 進行多次轉換操作 (Intermediate