1. 程式人生 > >深入理解Java 8 Lambda(類庫篇——Streams API,Collectors和並行)

深入理解Java 8 Lambda(類庫篇——Streams API,Collectors和並行)

作者:Lucida

  • 微博:@peng_gong

本文謝絕轉載,如需轉載需徵得作者本人同意,謝謝。

本文是深入理解Java 8 Lambda系列的第二篇,主要介紹Java 8針對新增語言特性而新增的類庫(例如Streams API、Collectors和並行)。

關於

Java SE 8增加了新的語言特性(例如lambda表示式和預設方法),為此Java SE 8的類庫也進行了很多改進,本文簡要介紹了這些改進。在閱讀本文前,你應該先閱讀 深入淺出Java 8 Lambda(語言篇),以便對Java SE 8的新增特性有一個全面瞭解。

背景(Background)

自從lambda表示式成為Java語言的一部分之後,Java集合(Collections)API就面臨著大幅變化。而

JSR 355(規定了Java lambda表示式的標準)的正式啟用更是使得Java集合API變的過時不堪。儘管我們可以從頭實現一個新的集合框架(比如“Collection II”),但取代現有的集合框架是一項非常艱難的工作,因為集合介面滲透了Java生態系統的每個角落,將它們一一換成新類庫需要相當長的時間。因此,我們決定採取演化的策略(而非推倒重來)以改進集合API:

  • 為現有的介面(例如 Collection ListStream)增加擴充套件方法;
  • 在類庫中增加新的 (stream,即 java.util.stream.Stream)抽象以便進行聚集(aggregation)操作;
  • 改造現有的型別使之可以提供流檢視(stream view);
  • 改造現有的型別使之可以容易的使用新的程式設計模式,這樣使用者就不必拋棄使用以久的類庫,例如 ArrayListHashMap(當然這並不是說集合API會常駐永存,畢竟集合API在設計之初並沒有考慮到lambda表示式。我們可能會在未來的JDK中新增一個更現代的集合類庫)。

除了上面的改進,還有一項重要工作就是提供更加易用的並行(Parallelism)庫。儘管Java平臺已經對並行和併發提供了強有力的支援,然而開發者在實際工作(將序列程式碼並行化)中仍然會碰到很多問題。因此,我們希望Java類庫能夠既便於編寫序列程式碼也便於編寫並行程式碼,因此我們把程式設計的重點從具體執行細節(how computation should be formed)轉移到抽象執行步驟(what computation should be perfomed)。除此之外,我們還需要在將並行變的 容易

(easier)和將並行變的 不可見(invisible)之間做出抉擇,我們選擇了一個折中的路線:提供 顯式(explicit)但 非侵入(unobstrusive)的並行。(如果把並行變的透明,那麼很可能會引入不確定性(nondeterminism)以及各種資料競爭(data race)問題)

內部迭代和外部迭代(Internal vs external iteration)

集合類庫主要依賴於 外部迭代(external iteration)。 Collection實現 Iterable介面,從而使得使用者可以依次遍歷集合的元素。比如我們需要把一個集合中的形狀都設定成紅色,那麼可以這麼寫:

for (Shape shape: shapes) {
  shape.setColor(RED);
}

這個例子演示了外部迭代:for-each迴圈呼叫 shapes iterator()方法進行依次遍歷。外部迴圈的程式碼非常直接,但它有如下問題:

  • Java的for迴圈是序列的,而且必須按照集合中元素的順序進行依次處理;
  • 集合框架無法對控制流進行優化,例如通過排序、並行、短路(short-circuiting)求值以及惰性求值改善效能。

儘管有時for-each迴圈的這些特性(序列,依次)是我們所期待的,但它對改善效能造成了阻礙。

我們可以使用 內部迭代(internal iteration)替代外部迭代,使用者把對迭代的控制權交給類庫,並向類庫傳遞迭代時所需執行的程式碼。

下面是前例的內部迭代程式碼:

shapes.forEach(s -> s.setColor(RED));

儘管看起來只是一個小小的語法改動,但是它們的實際差別非常巨大。使用者把對操作的控制權交還給類庫,從而允許類庫進行各種各樣的優化(例如亂序執行、惰性求值和並行等等)。總的來說,內部迭代使得外部迭代中不可能實現的優化成為可能。

外部迭代同時承擔了 做什麼(把形狀設為紅色)和 怎麼做(得到 Iterator例項然後依次遍歷)兩項職責,而內部迭代只負責 做什麼,而把 怎麼做留給類庫。通過這樣的職責轉變:使用者的程式碼會變得更加清晰,而類庫則可以進行各種優化,從而使所有使用者都從中受益。

流(Stream)

是Java SE 8類庫中新增的關鍵抽象,它被定義於 java.util.stream(這個包裡有若干流型別: Stream<T>代表物件引用流,此外還有一系列特化(specialization)流,比如 IntStream代表整形數字流)。每個流代表一個值序列,流提供一系列常用的聚集操作,使得我們可以便捷的在它上面進行各種運算。集合類庫也提供了便捷的方式使我們可以以操作流的方式使用集合、陣列以及其它資料結構。

流的操作可以被組合成 流水線(Pipeline)。以前面的例子為例,如果我們只想把藍色改成紅色:

shapes.stream()
      .filter(s -> s.getColor() == BLUE)
      .forEach(s -> s.setColor(RED));

Collection上呼叫 stream()會生成該集合元素的流檢視(stream view),接下來 filter()操作會產生只包含藍色形狀的流,最後,這些藍色形狀會被 forEach操作設為紅色。

如果我們想把藍色的形狀提取到新的 List裡,則可以:

List<Shape> blue = shapes.stream()
                         .filter(s -> s.getColor() == BLUE)
                         .collect(Collectors.toList());

collect()操作會把其接收的元素聚集(aggregate)到一起(這裡是 List), collect()方法的引數則被用來指定如何進行聚集操作。在這裡我們使用 toList()以把元素輸出到 List中。(如需更多 collect()方法的細節,請閱讀Collectors一節)

如果每個形狀都被儲存在 Box裡,然後我們想知道哪個盒子至少包含一個藍色形狀,我們可以這麼寫:

Set<Box> hasBlueShape = shapes.stream()
                              .filter(s -> s.getColor() == BLUE)
                              .map(s -> s.getContainingBox())
                              .collect(Collectors.toSet());

map()操作通過對映函式(這裡的對映函式接收一個形狀,然後返回包含它的盒子)對輸入流裡面的元素進行依次轉換,然後產生新流。

如果我們需要得到藍色物體的總重量,我們可以這樣表達:

int sum = shapes.stream()
                .filter(s -> s.getColor() == BLUE)
                .mapToInt(s -> s.getWeight())
                .sum();

這些例子演示了流框架的設計,以及如何使用流框架解決實際問題。

流和集合(Streams vs Collections)

集合和流盡管在表面上看起來很相似,但它們的設計目標是不同的:集合主要用來對其元素進行有效(effective)的管理和訪問(access),而流並不支援對其元素進行直接操作或直接訪問,而只支援通過宣告式操作在其上進行運算然後得到結果。除此之外,流和集合還有一些其它不同:

  • 無儲存:流並不儲存值;流的元素源自資料來源(可能是某個資料結構、生成函式或I/O通道等等),通過一系列計算步驟得到;
  • 天然的函式式風格(Functional in nature):對流的操作會產生一個結果,但流的資料來源不會被修改;
  • 惰性求值:多數流操作(包括過濾、對映、排序以及去重)都可以以惰性方式實現。這使得我們可以用一遍遍歷完成整個流水線操作,並可以用短路操作提供更高效的實現;
  • 無需上界(Bounds optional):不少問題都可以被表達為無限流(infinite stream):使用者不停地讀取流直到滿意的結果出現為止(比如說,列舉 完美數這個操作可以被表達為在所有整數上進行過濾)。集合是有限的,但流不是(操作無限流時我們必需使用短路操作,以確保操作可以在有限時間內完成);

從API的角度來看,流和集合完全互相獨立,不過我們可以既把集合作為流的資料來源( Collection擁有 stream()parallelStream()方法),也可以通過流產生一個集合(使用前例的 collect()方法)。 Collection以外的型別也可以作為 stream的資料來源,比如JDK中的 BufferedReaderRandomBitSet已經被改造可以用做流的資料來源, Arrays.stream()則產生給定陣列的流檢視。事實上,任何可以用 Iterator描述的物件都可以成為流的資料來源,如果有額外的資訊(比如大小、是否有序等特性),庫還可以進行進一步的優化。

惰性(Laziness)

過濾和對映這樣的操作既可以被 急性求值(以 filter為例,急性求值需要在方法返回前完成對所有元素的過濾),也可以被 惰性求值(用 Stream代表過濾結果,當且僅當需要時才進行過濾操作)在實際中進行惰性運算可以帶來很多好處。比如說,如果我們進行惰性過濾,我們就可以把過濾和流水線裡的其它操作混合在一起,從而不需要對資料進行多遍遍歷。相類似的,如果我們在一個大型集合裡搜尋第一個滿足某個條件的元素,我們可以在找到後直接停止,而不是繼續處理整個集合。(這一點對無限資料來源是很重要,惰性求值對於有限資料來源起到的是優化作用,但對無限資料來源起到的是決定作用,沒有惰性求值,對無限資料來源的操作將無法終止)

對於過濾和對映這樣的操作,我們很自然的會把它當成是惰性求值操作,不過它們是否真的是惰性取決於它們的具體實現。另外,像 sum()這樣生成值的操作和 forEach()這樣產生副作用的操作都是“天然急性求值”,因為它們必須要產生具體的結果。

以下面的流水線為例:

int sum = shapes.stream()
                .filter(s -> s.getColor() == BLUE)
                .mapToInt(s -> s.getWeight())
                .sum();

這裡的過濾操作和對映操作是惰性的,這意味著在呼叫 sum()之前,我們不會從資料來源提取任何元素。在 sum操作開始之後,我們把過濾、對映以及求和混合在對資料來源的一遍遍歷之中。這樣可以大大減少維持中間結果所帶來的開銷。

大多數迴圈都可以用資料來源(陣列、集合、生成函式以及I/O管道)上的聚合操作來表示:進行一系列惰性操作(過濾和對映等操作),然後用一個急性求值操作( forEachtoArray collect等操作)得到最終結果——例如過濾—對映—累積,過濾—對映—排序—遍歷等組合操作。惰性操作一般被用來計算中間結果,這在Streams API設計中得到了很好的體現——與其讓 filtermap返回一個集合,我們選擇讓它們返回一個新的流。在Streams API中,返回流物件的操作都是惰性操作,而返回非流物件的操作(或者無返回值的操作,例如 forEach())都是急性操作。絕大多數情況下,潛在的惰性操作會被用於聚合,這正是我們想要的——流水線中的每一輪操作都會接收輸入流中的元素,進行轉換,然後把轉換結果傳給下一輪操作。

在使用這種 資料來源—惰性操作—惰性操作—急性操作流水線時,流水線中的惰性幾乎是不可見的,因為計算過程被夾在資料來源和最終結果(或副作用操作)之間。這使得API的可用性和效能得到了改善。

對於 anyMatch(Predicate) findFirst()這些急性求值操作,我們可以使用短路(short-circuiting)來終止不必要的運算。以下面的流水線為例:

Optional<Shape> firstBlue = shapes.stream()
                                  .filter(s -> s.getColor() == BLUE)
                                  .findFirst();

由於過濾這一步是惰性的, findFirst在從其上游得到一個元素之後就會終止,這意味著我們只會處理這個元素及其之前的元素,而不是所有元素。 findFirst()方法返回 Optional物件,因為集合中有可能不存在滿足條件的元素。 Optional是一種用於描述可缺失值的型別。

在這種設計下,使用者並不需要顯式進行惰性求值,甚至他們都不需要了解惰性求值。類庫自己會選擇最優化的計算方式。

並行(Parallelism)

流水線既可以序列執行也可以並行執行,並行或序列是流的屬性。除非你顯式要求使用並行流,否則JDK總會返回序列流。(序列流可以通過 parallel()方法被轉化為並行流)

儘管並行是顯式的,但它並不需要成為侵入式的。利用 parallelStream(),我們可以輕鬆的把之前重量求和的程式碼並行化:

int sum = shapes.parallelStream()
                .filter(s -> s.getColor = BLUE)
                .mapToInt(s -> s.getWeight())
                .sum();

並行化之後和之前的程式碼區別並不大,然而我們可以很容易看出它是並行的(此外我們並不需要自己去實現並行程式碼)。

因為流的資料來源可能是一個可變集合,如果在遍歷流時資料來源被修改,就會產生干擾(interference)。所以在進行流操作時,流的資料來源應保持不變(held constant)。這個條件並不難維持,如果集合只屬於當前執行緒,只要lambda表示式不修改流的資料來源就可以。(這個條件和遍歷集合時所需的條件相似,如果集合在遍歷時被修改,絕大多數的集合實現都會丟擲 ConcurrentModificationException)我們把這個條件稱為無干擾性(non-interference)。

我們應避免在傳遞給流方法的lambda產生副作用。一般來說,列印除錯語句這種輸出變數的操作是安全的,然而在lambda表示式裡訪問可變變數就有可能造成資料競爭或是其它意想不到的問題,因為lambda在執行時可能會同時執行在多個執行緒上,因而它們所看到的元素有可能和正常的順序不一致。無干擾性有兩層含義:

  1. 不要干擾資料來源;
  2. 不要干擾其它lambda表示式,當一個lambda在修改某個可變狀態而另一個lambda在讀取該狀態時就會產生這種干擾。

只要滿足無干擾性,我們就可以安全的進行並行操作並得到可預測的結果,即便對執行緒不安全的集合(例如 ArrayList)也是一樣。

例項(Examples)

下面的程式碼源自JDK中的 Class型別( getEnclosingMethod方法),這段程式碼會遍歷所有宣告的方法,然後根據方法名稱、返回型別以及引數的數量和型別進行匹配:

for (Method method : enclosingInfo.getEnclosingClass().getDeclaredMethods()) {
  if (method.getName().equals(enclosingInfo.getName())) {
    Class< ? >[] candidateParamClasses = method.getParameterTypes();
    if (candidateParamClasses.length == parameterClasses.length) {
      boolean matches = true;
      for (int i = 0; i < candidateParamClasses.length; i += 1) {
        if (!candidateParamClasses[i].equals(parameterClasses[i])) {
          matches = false;
          break;
        }
      }
      if (matches) { // finally, check return type
        if (method.getReturnType().equals(returnType)) {
          return method;
        }
      }
    }
  }
}
throw new InternalError("Enclosing method not found");

通過使用流,我們不但可以消除上面程式碼裡面所有的臨時變數,還可以把控制邏輯交給類庫處理。通過反射得到方法列表之後,我們利用 Arrays.stream將它轉化為 Stream,然後利用一系列過濾器去除型別不符、引數不符以及返回值不符的方法,然後通過呼叫 findFirst得到 Optional<Method>,最後利用 orElseThrow返回目標值或者丟擲異常。

return Arrays.stream(enclosingInfo.getEnclosingClass().getDeclaredMethods())
             .filter(m -> Objects.equal(m.getName(), enclosingInfo.getName()))
             .filter(m -> Arrays.equal(m.getParameterTypes(), parameterClasses))
             .filter(m -> Objects.equals(m.getReturnType(), returnType))
             .findFirst()
             .orElseThrow(() -> new InternalError("Enclosing method not found"));

相對於未使用流的程式碼,這段程式碼更加緊湊,可讀性更好,也不容易出錯。

流操作特別適合對集合進行查詢操作。假設有一個“音樂庫”應用,這個應用裡每個庫都有一個專輯列表,每張專輯都有其名稱和音軌列表,每首音軌表都有名稱、藝術家和評分。

假設我們需要得到一個按名字排序的專輯列表,專輯列表裡面的每張專輯都至少包含一首四星及四星以上的音軌,為了構建這個專輯列表,我們可以這麼寫:

List<Album> favs = new ArrayList<>();
for (Album album : albums) {
  boolean hasFavorite = false;
  for (Track track : album.tracks) {
    if (track.rating >= 4) {
      hasFavorite = true;
      break;
    }
  }
  if (hasFavorite)
    favs.add(album);
}
Collections.sort(favs, new Comparator<Album>() {
  public int compare(Album a1, Album a2) {
    return a1.name.compareTo(a2.name);
  }
});

我們可以用流操作來完成上面程式碼中的三個主要步驟——識別一張專輯是否包含一首評分大於等於四星的音軌(使用 anyMatch);按名字排序;以及把滿足條件的專輯放在一個 List中:

List<Album> sortedFavs =
    albums.stream()
          .filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4)))
          .sorted(Comparator.comparing(a -> a.name))
          .collect(Collectors.toList());

Compartor.comparing方法接收一個函式(該函式返回一個實現了 Comparable介面的排序鍵值),然後返回一個利用該鍵值進行排序的 Comparator(請參考下面的比較器工廠一節)。

收集器(Collectors)

在之前的例子中,我們利用 collect()方法把流中的元素聚合到 ListSet中。 collect()接收一個型別為 Collector的引數,這個引數決定了如何把流中的元素聚合到其它資料結構中。 Collectors類包含了大量常用收集器的工廠方法, toList() toSet()就是其中最常見的兩個,除了它們還有很多收集器,用來對資料進行對複雜的轉換。

Collector的型別由其輸入型別和輸出型別決定。以 toList()收集器為例,它的輸入型別為 T,輸出型別為 List<T>toMap是另外一個較為複雜的 Collector,它有若干個版本。最簡單的版本接收一對函式作為輸入,其中一個函式用來生成鍵(key),另一個函式用來生成值(value)。 toMap的輸入型別是 T,輸出型別是 Map<K, V>,其中 K V分別是前面兩個函式所生成的鍵型別和值型別。(複雜版本的 toMap收集器則允許你指定目標 Map的型別或解決鍵衝突)。舉例來說,下面的程式碼以目錄數字為鍵值建立一個倒排索引:

Map<Integer, Album> albumsByCatalogNumber =
    albums.stream()
          .collect(Collectors.toMap(a -> a.getCatalogNumber(), a -> a));

groupingBy是一個與 toMap相類似的收集器,比如說我們想要把我們最喜歡的音樂按歌手列出來,這時我們就需要這樣的 Collector:它以 Track作為輸入,以 Map<Artist, List<Track>>作為輸出。 groupingBy收集器就可以勝任這個工作,它接收分類函式(classification function),然後根據這個函式生成 Map,該 Map的鍵是分類函式的返回結果,值是該分類下的元素列表。

Map<Artist, List<Track>>favsByArtist =
    tracks.stream()
          .filter(t -> t.rating >= 4)
          .collect(Collectors.groupingBy(t -> t.artist));

收集器可以通過組合和複用來生成更加複雜的收集器,簡單版本的 groupingBy收集器把元素按照分類函式為每個元素計算出分類鍵值,然後把輸入元素輸出到對應的分類列表中。除了這個版本,還有一個更加通用(general)的版本允許你使用 其它收集器來整理輸入元素:它接收一個分類函式以及一個下流(downstream)收集器(單引數版本的 groupingBy使用 toList()作為其預設下流收集器)。舉例來說,如果我們想把每首歌曲的演唱者收集到 Set而非 List中,我們可以使用 toSet收集器:

Map<Artist, Set<Track>>favsByArtist =
    tracks.stream()
          .filter(t -> t.rating >= 4)
          .collect(Collectors.groupingBy(t -> t.artist,
                                         Collectors.toSet()));

如果我們需要按照歌手和評分來管理歌曲,我們可以生成多級 Map

Map<Artist, Map<Integer, List<Track>>> byArtistAndRating =
    tracks.stream()
          .collect(groupingBy(t -> t.artist,
                              groupingBy(t -> t.rating)));

在最後的例子裡,我們建立了一個歌曲標題裡面的詞頻分佈。我們首先使用 Stream.flatMap()得到一個歌曲流,然後用 Pattern.splitAsStream把每首歌曲的標題打散成詞流;接下來我們用 groupingByString.toUpperCase對這些詞進行不區分大小寫的分組,最後使用 counting()收集器計算每個詞出現的次數(從而無需建立中間集合)。

Pattern pattern = Pattern.compile("\\s+");
Map<String, Integer> wordFreq =
    tracks.stream()
          .flatMap(t -> pattern.splitAsStream(t.name)) // Stream<String>
          .collect(groupingBy(s -> s.toUpperCase(),
                              counting()));

flatMap接收一個返回流(這裡是歌曲標題裡的詞)的函式。它利用這個函式將輸入流中的每個元素轉換為對應的流,然後把這些流拼接到一個流中。所以上面程式碼中的 flatMap會返回所有歌曲標題裡面的詞,接下來我們不區分大小寫的把這些詞分組,並把詞頻作為值(value)儲存。

Collectors類包含大量的方法,這些方法被用來創造各式各樣的收集器,以便進行查詢、列表(tabulation)和分組等工作,當然你也可以實現一個自定義 Collector

並行的實質(Parallelism under the hood)

Java SE 7引入了 Fork/Join模型,以便高效實現平行計算。不過,通過Fork/Join編寫的並行程式碼和同功能的序列程式碼的差別非常巨大,這使改寫序列程式碼變的非常困難。通過提供序列流和並行流,使用者可以在序列操作和並行操作之間進行便捷的切換(無需重寫程式碼),從而使得編寫正確的並行程式碼變的更加容易。

為了實現平行計算,我們一般要把計算過程遞迴分解(recursive decompose)為若干步:

  • 把問題分解為子問題;
  • 序列解決子問題從而得到部分結果(partial result);
  • 合併部分結果合為最終結果。

這也是Fork/Join的實現原理。

為了能夠並行化任意流上的所有操作,我們把流抽象為 Spliterator Spliterator是對傳統迭代器概念的一個泛化。分割迭代器(spliterator)既支援順序依次訪問資料,也支援分解資料:就像 Iterator允許你跳過一個元素然後保留剩下的元素, Spliterator允許你把輸入元素的一部分(一般來說是一半)轉移(carve off)到另一個新的 Spliterator中,而剩下的資料則會被儲存在原來的 Spliterator裡。(這兩個分割迭代器還可以被進一步分解)除此之外,分割迭代器還可以提供源的元資料(比如元素的數量,如果已知的話)和其它一系列布林值特徵(比如說“元素是否被排序”這樣的特徵),Streams框架可以利用這些資料來進行優化。

上面的分解方法也同樣適用於其它資料結構,資料結構的作者只需要提供分解邏輯,然後就可以直接享用並行流操作帶來的遍歷。

大多數使用者無需去實現 Spliterator介面,因為集合上的 stream()方法往往就足夠了。但如果你需要實現一個集合或一個流,那麼你可能需要手動實現 Spliterator介面。 Spliterator介面的API如下所示:

public interfaceSpliterator<T>{
  // Element access
  booleantryAdvance(Consumer<?superT>action);
  voidforEachRemaining(Consumer<?superT>action);
  // Decomposition
  Spliterator<T>trySplit();
  //Optional metadata
  longestimateSize();
  intcharacteristics();
  Comparator< ? super T> getComparator();
}

集合庫中的基礎介面 Collection Iterable都實現了正確但相對低效的 spliterator()實現,但派生介面(例如 Set)和具體實現類(例如 ArrayList)均提供了高效的分割迭代器實現。分割迭代器的實現質量會影響到流操作的執行效率;如果在 split()方法中進行良好(平衡)的劃分,CPU的利用率會得到改善;此外,提供正確的特性(characteristics)和大小(size)這些元資料有利於進一步優化。

出現順序(Encounter order)

多數資料結構(例如列表,陣列和I/O通道)都擁有 自然出現順序(natural encounter order),這意味著它們的元素出現順序是可預測的。其它的資料結構(例如 HashSet)則沒有一個明確定義的出現順序(這也是 HashSetIterator實現中不保證元素出現順序的原因)。

是否具有明確定義的出現順序是 Spliterator檢查的特性之一(這個特性也被流使用)。除了少數例外(比如 Stream.forEach()Stream.findAny()),並行操作一般都會受到出現順序的限制。這意味著下面的流水線:

List<String> names = people.parallelStream()
                           .map(Person::getName)
                           .collect(toList()