1. 程式人生 > >使用stream操作表達更高階的資料處理請求, Part 1

使用stream操作表達更高階的資料處理請求, Part 1

使用stream操作表達更高階的資料處理請求,Part 1

原文連結 作者:Raoul-Gabriel Urma 譯者:石頭獅子([email protected]) 校對:吳京潤

沒有了集合你會怎麼做?幾乎每一個Java應用都建立和處理集合。對於許多程式設計任務而言,這是基礎的技術:集合分組和處理資料。例如,你可能想要建立一個銀行交易集合來代表使用者的賬戶記錄。然後,你想要處理所有的集合找出使用者花費了多少金額。儘管集合如此重要,但是Java的實現遠非完美。

首先,典型的集合處理模式有點像SQL操作,例如”查詢”(查詢最大值的交易)或”分組”(編組所有與雜貨購買有關的交易)。大部分的資料庫可以允許我們宣告式地指定這些操作。例如,後面的SQL查詢可以讓我們找出最高值的交易ID:”SELECT id, MAX(value) from transactions”。

正如所見,我們並不需要去實現如何計算最大值(例如,使用迴圈,一個變數跟蹤最大的值)。我僅需要表達我們需要的。這個原則意味著,你並不需要擔憂如何明確地實現這些查詢–這完全不需要你處理。為什麼我們不能讓集合做相同的事情呢?想想你使用迴圈一次又一次的重新實現了這些操作幾次?

其次,我們怎樣才能有效率的處理大型的集合?理論上講,需要加快處理的速度,可能要使用多核架構。然而,寫出並行處理的程式碼並不容易,而且也容易出錯。

Java SE 8 解決了這個問題。 Java API的設計者使用新的稱為Stream的抽象更新了API,使得可以宣告式的處理資料。此外,streams可以使用多核心架構,而你並不需要寫任何一行關於多核心處理的程式碼。聽起來很美,確實是這樣嗎?這就是本系列文章要表述的內容。

在我們詳細表述使用streams可以做什麼之前,先讓我們看看一個例子。以便有一個使用Java SE 8 streams新的程式設計方式的概念。假設我們需要找出所有型別為grocery的交易,返回以交易金額為降序的交易ID列表。Java SE 7中,我們所做的如Listing 1Java SE 8中,我們所做的如Listing 2

List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
  if(t.getType() == Transaction.GROCERY){
    groceryTransactions.add(t);
  }
}
Collections.sort(groceryTransactions, new Comparator(){
  public int compare(Transaction t1, Transaction t2){
    return t2.getValue().compareTo(t1.getValue());
  }
});
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
  transactionsIds.add(t.getId());
}

Listing 1

List<Integer> transactionsIds =
      transactions.stream()
                  .filter(t -> t.getType() == Transaction.GROCERY)
                  .sorted(comparing(Transaction::getValue).reversed())
                  .map(Transaction::getId)
                  .collect(toList());

Listing 2

Figure 1 描述了Java SE 8的程式碼。首先,我們使用List上可用的stream()方法從transactions(資料來源)列表上取到stream。隨後,幾個操作(filter,sorted,map,collect)串聯起來形成pipeline(管道),pipeline可以看成是對資料查詢的一種形式。

Figure 1

可是,如何才能並行執行程式碼呢?對於Java SE 8來說,這是否容易做到:只要使用parallelStream()替換stream()方法,正如Listing 3所示Streams API內部會分解你的查詢,使用你電腦上的多個核心。

</pre>
List<Integer> transactionsIds = transactions.parallelStream()
    .filter(t -> t.getType() == Transaction.GROCERY)
    .sorted(comparing(Transaction::getValue).reversed())
    .map(Transaction::getId)
    .collect(toList());

Listing 3

不必擔憂這段程式碼是否無法理解。我們會在下一章中繼續探究程式碼是如何工作的。注意到lambda 表示式(例如, t-> t.getCategory() == Transaction.GROCERY),和方法引用(例如,Transaction::getId)的使用。這些概念目前你應該是熟悉的。

現在,已經看到stream作為有效表達的抽象,就像集合資料上的SQL操作。此外,這些操作可以簡潔的使用lambda 表示式引數化。

在學習Java SE 8 streams系列文章之後,你應該能夠使用Streams API寫出類似Listing 3上的程式碼,表達出強有力的查詢。

使用Streams基礎
我們先從一些理論開始。一個stream的定義是什麼?簡短的定義是”從一個支援聚集操作的源上獲取的一序列元素”。讓我們逐個解釋:

序列元素:stream為特定元素型別值集合提供了一個介面。但是,stream並不實際儲存元素;元素只在需要的時候被計算。
:Stream從資料提供源上消費資料,源可以是集合、陣列、I/O資源等。
聚集操作,Stream支援類SQL的操作,和函數語言程式設計語言的共通操作,例如 filter, map, reduce, find, match, sorted等等。

此外,stream操作有兩個基本的特徵,使得其和集合操作有極大的不同。

管道:許多stream 操作返回stream自身。這可以讓操作串聯成一個大的管道。這也使得某些優化技術,例如惰性(laziness)和短路(short-circuiting)得以實現,這些概念我們都會在後面闡釋。
內部迭代:與集合相比,集合的迭代是明確地(外部迭代),而stream操作執行的迭代你無法感知到。

讓我們重新看看之前的程式碼來闡述這個概念。Figure 2表述了Listing 2的更多細節。

Figure 2
首先,我們從transactions list上呼叫stream()獲取到stream。資料來源是transaction list,並且提供元素序列給stream。接下來,我們使用一系列stream上的聚合操作:filter (使用給定的predicate過濾元素), sorted (使用給定的comparator排序元素), and map (抽取資訊)。所有這些操作除了collect之外,都返回stream。所以,這些操作可以串聯形成一個管道,管道可以看成是對源查詢的檢視。

所有的操作只有在呼叫collect的時候才會執行。collect操作會開始處理管道,返回結果(一些不是stream;例子上是List)。不要太關心collect;我們會在之後的文章中詳細闡述。現在,你可以把collect看成一個需要指定如何聚集stream元素彙總成結果的操作。例子中,toList()則描述了需要從Stream轉換為List。

在我們闡述stream的方法之前,暫停並回顧一下stream 和collection之間的不同。

Streams Versus Collections

集合與stream在序列元素上所提供介面的新概念,都同時在java上存在。所以,不同的是什麼?簡而言之,集合是關於資料的,stream是關於計算的。想想儲存在DVD上的電影。這就是集合(可能是位元組,又可能是幀–這裡,我們並不關心),因為其包含所有的資料結構。現在我們想想相同的視訊,當視訊是網際網路上的流的情況。則這個時候就是stream(位元或幀)。視訊流播放器只需要下載使用者現在觀看位置之前的幾幀,所以你才可以從流的起始開始播放,在這之前,流裡面的資料已經是被計算過了(想象下足球直播流)。

粗略的講,集合和stream之間的不同則是在處理計算的事情時。集合是一個記憶體上的資料結構,持有所有的這個資料結構的值–集合上的每個元素在要新增進集合之前都需要被計算。相反,stream概念上是固定的資料結構,流內的每個元素只在需要的時候計算。

使用Collection介面則需要使用者來完成迭代(例如,使用稱為foreach的增強for迴圈);這個被叫做外部迭代。

相反,Streams庫使用內部迭代–為你執行迭代操作並且在某處維護執行結果;你僅僅只要提供一個函式說我要完成這個。Listing 4裡面的的程式碼(使用集合的外部迭代)和Listing 5(使用stream的內部迭代)則闡述了這點不同。

List<String> transactionIds = new ArrayList<>();
    for(Transaction t: transactions){
        transactionIds.add(t.getId());
    }

Listing 4

List<Integer> transactionIds =
    transactions.stream()
    .map(Transaction::getId)
    .collect(toList());

Listing 5

Listing 4上,我們明確地順序迭代transactions list,抽取出每個交易ID並新增給聚集器。相反,當使用stream,並沒有明確地迭代。Listing 5上的程式碼建立一個查詢,其中map操作引數化為抽取交易ID,collect操作轉換結果Stream到List。

到目前為止,你應該明確知道stream是什麼,並且你可以使用它。現在,讓我們看看stream提供的其他操作,這些操作可以讓你表達你自己的資料處理查詢。

Stream Operations: Exploiting Streams to Process Data

java.util .stream.Stream中的Stream介面定義了許多操作,主要可以分成兩類。正如Figure 1裡面的例子,可以看到如下的操作:

filter, sorted, 和map, 這些可以從管道上連線在一起的。
collect 關閉管道並放回結果。

Stream 上可以連線的操作稱為中間操作。因為其返回的型別是Stream。關閉stream管道的操作稱為結束操作。其從管道上產生結果,例如List,一個整數,甚至是void(任何非stream型別)。

你也許會疑惑這些物質的重要性。當然,中間操作在stream管道上執行結束之前是不會執行;中間操作是惰性的(Lazy),主要是因為中間操作通常是合併的,並且被結束操作處理進通道。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
List<Integer> twoEvenSquares =
    numbers.stream()
            .filter(n -> {
            System.out.println("filtering " + n);
            return n % 2 == 0;
            })
        .map(n -> {
        System.out.println("mapping " + n);
        return n * n;
    })
    .limit(2)
    .collect(toList());

Listing 6

例如,看看Listing 6上的程式碼,計算給定number list上兩個偶數的平方:

filtering 1
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4

因為limit(2)使用短路特性;我們需要只處理stream的部分,並非全部地返回結果。這和計算用and串聯操作的布林表示式有點類似:只要一個表示式返回false,我們可以推斷出整個表示式返回false,而不用全部計算。這裡,limit操作返回大小為2的stream。

當然,filter和map操作合併到相同的通道中。

總結下我們目前學習到的,巨集觀上處理stream包括這三件事:

一個數據源(例如集合),在資料來源上執行的查詢
串聯的中間操作,這些操作形成stream管道
一個結束操作, 執行stream管道,並且產生結果。

現在,先看看stream上可用的一些操作。查閱java.util .stream.Stream介面獲取全部的列表,同樣也是這篇文章後面引用的資源。

Filtering. 有幾個操作可以用來從stream中過濾元素:
filter(Predicate): 使用predicate (java.util.function.Predicate)作為引數,並返回包含所有匹配給定predict元素的stream。

distinct: 返回一個有唯一元素的stream(根據stream中元素的equals實現)。
limit(n): 返回一個不長於給定大小n的stream。
skip(n): 返回一個丟棄了前面n個元素的stream。

Finding and matching. 一個通常的資料處理模式是決定是否某些元素匹配給定的屬性。你可以使用anyMatch,allMatch和noneMatch操作來幫助你完成這些操作。所有這些操作使用Predicate作為引數,返回一個布林值作為結果(因此,這些是決定式的操作)。例如,你可以使用allMatch檢查transaction stream中所有交易額大於100的元素,如 Listing 7所示的。

boolean expensive = transactions.stream()
    .allMatch(t -> t.getValue() > 100);

Listing 7

Stream介面提供 findFirst 和findAny操作,用於從stream中取回任意的元素。主要可以用於連線其他的stream操作,例如filter。
findFirst 和findAny返回Optional物件,如Listing 8所示。

Optional<Transaction> = transactions.stream()
    .filter(t -> t.getType() == Transaction.GROCERY)
    .findAny();

Listing 8

Optional<T>類(java.util .Optional)是一個容器類,用於代表一個值存在或不存在。Listing 8中,findAny可能並不會返回任何grocery型別的交易。

Optional類有一些方法用於測試元素是否存在。例如,如果有交易存在,我們可以選擇使用ifPresent方法選擇對optional物件上應用操作,如Listing 9(我們只是列印交易)。

transactions.stream()
    .filter(t -> t.getType() == Transaction.GROCERY)
    .findAny()
    .ifPresent(System.out::println);

Listing 9

Mapping. Stream支援map方法,使用function(java.util.function.Function)作為引數用於對映stream中的元素到另外一種形式。function會應用到每一個元素,對映元素到新的元素。

例如,你可能想要從stream的每個元素中抽出資訊。Listing 10的例子中,我們從一個list上返回每個詞長度的list。Reducing. 目前,我們所見的結束操作返回boolean(allMatch等),void(forEach),或一個Optional物件(findAny等)。並且同樣已經使用collect組合所有stream中的元素為List。

List<String> words = Arrays.asList("Oracle", "Java", "Magazine");
List<Integer> wordLengths =
    words.stream()
    .map(String::length)
    .collect(toList());

Listing 10

當然,你同樣可以組合stream中的所有元素表述成更復雜的處理請求,例如,最高ID的交易是什麼?或計算所有交易額的總數。

這可以使用stream上的reduce操作,這個操作重複地為每個元素應用操作(例如,新增兩個數字),直到產生結果。函式式程式中一般稱這操作為摺疊操作(fold),你可以把這個操作看成是重複地摺疊紙張的一部分(你的stream),直到形成一個小正方形,這就是摺疊操作的結果。

先看下我們如何使用for迴圈計算list的和:

int sum = 0;
for (int x : numbers) {
    sum += x;
}

Numbers list上的每個元素重複地使用新增操作來產生一個結果。實際上,我們縮小numbers list到一個數值。程式碼中則有兩個引數:sum變數的初始值,例子上為0,和組合所有list元素的操作,例子上為+。

使用stream的reduce方法,我們可以累加所有的stream元素。如 Listing 11所示的。

reduce方法使用兩個引數:

int sum = numbers.stream().reduce(0, (a, b) -> a + b);

Listing 11

一個初始值,0

BinaryOperator<T>,用於組合兩個元素併產生一個新的值。

reduce方法本質上抽象了重複的應用模式。其他查詢例如”計算產品”或”計算最大值(見Listing 12)”則是成為reduce方法的特定例子。

    int product = numbers.stream().reduce(1, (a, b) -> a * b);
    int product = numbers.stream().reduce(1, Integer::max);

Listing 12

Numeric Streams

現在,已經看過了使用reduce方法用於計算整數stream和的例子。但是,這其中還是有一定的開銷:我們執行多次裝箱(boxing)操作,重複的在integer物件上求和。如果可以呼叫一個sum方法,可能會更好一點,正如Listing 13所示,是否更明確我們程式碼的目的?

int statement = transactions.stream()
    .map(Transaction::getValue)
    .sum(); // error since Stream has no sum method

Listing 13

Java SE 8 引入3個特定的primitive stream介面用於處理這個問題–IntStream,DoubleStream和LongStream–各自代表stream中的元素是int,double和long。

通常要轉換stream到特定版本的stream所執行的方法是mapToInt,mapToDouble和mapToLong。這些方法工作起來完全像是我們之前見到的map方法,不同的是這些方法返回特定的stream而不是Stream<T>。例如,我們可以改進Listing 13的程式碼,如Listing 14所展示的。你同樣可以通過裝箱(boxed)操作從primitive stream轉換為某個物件stream。

int statementSum =
    transactions.stream()
    .mapToInt(Transaction::getValue)
    .sum(); // works!

Listing 14

最後,另一個numeric streams有用的形式是數字範圍(numeric ranges)。例如,你可能想要產生所有1到100之間的數值。Java SE 8則引入了 IntStream, DoubleStream, 和LongStream上可用的2個靜態方法輔助產生這樣的範圍:range和rangeClosed。

這兩個方法都使用範圍的起始作為首個引數,範圍的結束作為第二個引數。range方法是開區間,而rangeClosed是閉區間的。 Listing 15則是一個使用rangeClose方法的例子,返回10到30之間數值的stream。

IntStream oddNumbers =
    IntStream.rangeClosed(10, 30)
    .filter(n -> n % 2 == 1);

Listing 15

Building Streams

有幾種方式用於構建stream。我們已經看到如何從集合上獲取到stream。同樣,我也使用了number stream。你同樣可以從值、陣列或檔案上建立stream。此外甚至可以從一個函式上獲取stream 來產生無限的stream。

從值或從陣列上建立stream十分簡單:只要為值呼叫Stream.of的靜態方法和為陣列呼叫Arrays.stream生成。如 Listing 16所示。

    Stream<Integer> numbersFromValues = Stream.of(1, 2, 3, 4);
    int[] numbers = {1, 2, 3, 4};
    IntStream numbersFromArray = Arrays.stream(numbers);

Listing 16

同樣也可以使用Files.lines靜態方法將檔案轉換為一個stream。例如,Listing 17計算檔案中的行數。

long numberOfLines =
     Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
         .count();

Listing 17

Infinite streams. 最後,在我們結束關於stream的這篇文章之前,還有一個令人興奮的概念。到目前為止,應該理解stream內的元素是按需產生的。這裡有兩個靜態方法–Stream.iterate 和 Stream.generate可以從函式上建立stream。然而,由於元素是按需計算的,這兩個操作可以一直產生元素。這就是為什麼稱為 infinite stream:沒有固定大小的stream,與我們從固定集合建立的流相比。

Listing 18 是使用iterate的例子,建立一個所有10倍數的數字stream。Iterate方法使用一個初始值(例子上是,0)和一個用於連續地產生每個新值的lambda(型別為UnaryOperator<T>)。

Stream<Integer> numbers = Stream.iterate(0, n -> n + 10);

Listing 18
我們可以把這個無限的stream轉換成固定大小的stream,通過使用limit操作。例如,我們可以限制stream的大小為5,如Listing 19所示。

numbers.limit(5).forEach(System.out::println); // 0, 10, 20, 30, 40

Listing 19

Conclusion

Java SE 8 引入的stream API,可以讓我們表達更復雜的資料處理邏輯。本文中,你已經看到stream支援許多方法,例如filter,map,reduce和iterate,這些方法組合可以寫出簡潔的程式碼並表達資料處理查詢。這種新的程式碼編寫方式與Java SE8 之前你要處理的集合十分的不同。顯然,這有許多好處。首先,Stream API使用了許多技術,例如惰性和短路來優化資料處理查詢。其次,stream可以是並行自動地使用多核心架構。本系列的下一章節中,我們會表述更高階的操作,例如flatMap和collect。