1. 程式人生 > >Java8新特性學習-Stream的Reduce及Collect方法詳解

Java8新特性學習-Stream的Reduce及Collect方法詳解

Stream的使用方法在http://blog.csdn.net/icarusliu/article/details/79495534一文中已經做了初步的介紹,但它的Reduce及Collect方法由於較為複雜未進行總結,現單獨對這兩個方法進行學習。
為簡化理解,部分可以採用Lambda語法的地方採用了原始的語法;

0. 涉及知識

0.1 BiFunction

它是一個函式式介面,包含的函式式方法定義如下:

R apply(T t, U u);

可見它與Function不同點在於它接收兩個輸入返回一個輸出; 而Function接收一個輸入返回一個輸出。
注意它的兩個輸入、一個輸出的型別可以是不一樣的。

0.2 BinaryOperator

它實際上就是繼承自BiFunction的一個介面;我們看下它的定義:

public interface BinaryOperator<T> extends BiFunction<T,T,T>

上面已經分析了,BiFunction的三個引數可以是一樣的也可以不一樣;而BinaryOperator就直接限定了其三個引數必須是一樣的;
因此BinaryOperator與BiFunction的區別就在這。
它表示的就是兩個相同型別的輸入經過計算後產生一個同類型的輸出。

0.3 BiConsumer

也是一個函式式介面,它的定義如下:

public interface BiConsumer<T, U> {

    /**
     * Performs this operation on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     */
    void accept(T t, U u);
}

可見它就是一個兩個輸入引數的Consumer的變種。計算沒有返回值。

1. Reduce

Reduce中文含義為:減少、縮小;而Stream中的Reduce方法乾的正是這樣的活:根據一定的規則將Stream中的元素進行計算後返回一個唯一的值。
它有三個變種,輸入引數分別是一個引數、二個引數以及三個引數;

1.1 一個引數的Reduce

定義如下:

Optional<T> reduce(BinaryOperator<T> accumulator)

假設Stream中的元素a[0]/a[1]/a[2]…a[n - 1],它表達的計算含義,使用Java程式碼來表述如下:

T result = a[0];  
for (int i = 1; i < n; i++) {
    result = accumulator.apply(result, a[i]);  
}
return result;  

也就是說,a[0]與a[1]進行二合運算,結果與a[2]做二合運算,一直到最後與a[n-1]做二合運算。

可見,reduce在求和、求最大最小值等方面都可以很方便的實現,程式碼如下,注意其返回的結果是一個Optional物件:

Stream<Integer> s = Stream.of(1, 2, 3, 4, 5, 6);
/**
 * 求和,也可以寫成Lambda語法:
 * Integer sum = s.reduce((a, b) -> a + b).get();
 */
Integer sum = s.reduce(new BinaryOperator<Integer>() {
    @Override
    public Integer apply(Integer integer, Integer integer2) {
        return integer + integer2;
    }
}).get();

/**
 * 求最大值,也可以寫成Lambda語法:
 * Integer max = s.reduce((a, b) -> a >= b ? a : b).get();
 */
Integer max = s.reduce(new BinaryOperator<Integer>() {
    @Override
    public Integer apply(Integer integer, Integer integer2) {
        return integer >= integer2 ? integer : integer2;
    }
}).get(); 

當然可做的事情更多,如將一系列數中的正數求和、將序列中滿足某個條件的數一起做某些計算等。

1.2 兩個引數的Reduce

其定義如下:

T reduce(T identity, BinaryOperator<T> accumulator)

相對於一個引數的方法來說,它多了一個T型別的引數;實際上就相當於需要計算的值在Stream的基礎上多了一個初始化的值。
同理,當對n個元素的陣列進行運算時,其表達的含義如下:

T result = identity; 
for (int i = 0; i < n; i++) {
    result = accumulator.apply(result, a[i]);  
}
return result;  

注意區分與一個引數的Reduce方法的不同:它多了一個初始化的值,因此計算的順序是identity與a[0]進行二合運算,結果與a[1]再進行二合運算,最終與a[n-1]進行二合運算。
因此它與一引數時的應用場景類似,不同點是它使用在可能需要某些初始化值的場景中。

使用示例,如要將一個String型別的Stream中的所有元素連線到一起並在最前面新增[value]後返回:

Stream<String> s = Stream.of("test", "t1", "t2", "teeeee", "aaaa", "taaa");
/**
 * 以下結果將會是: [value]testt1t2teeeeeaaaataaa
 * 也可以使用Lambda語法:
 * System.out.println(s.reduce("[value]", (s1, s2) -> s1.concat(s2)));
 */
System.out.println(s.reduce("[value]", new BinaryOperator<String>() {
    @Override
    public String apply(String s, String s2) {
        return s.concat(s2);
    }
})); 

1.3 三個引數的Reduce

三個引數時是最難以理解的。
先來看其定義:

<U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner)

分析下它的三個引數:

  • identity: 一個初始化的值;這個初始化的值其型別是泛型U,與Reduce方法返回的型別一致;注意此時Stream中元素的型別是T,與U可以不一樣也可以一樣,這樣的話操作空間就大了;不管Stream中儲存的元素是什麼型別,U都可以是任何型別,如U可以是一些基本資料型別的包裝型別Integer、Long等;或者是String,又或者是一些集合型別ArrayList等;後面會說到這些用法。
  • accumulator: 其型別是BiFunction,輸入是U與T兩個型別的資料,而返回的是U型別;也就是說返回的型別與輸入的第一個引數型別是一樣的,而輸入的第二個引數型別與Stream中元素型別是一樣的。
  • combiner: 其型別是BinaryOperator,支援的是對U型別的物件進行操作;

第三個引數combiner主要是使用在平行計算的場景下;如果Stream是非並行時,第三個引數實際上是不生效的。
因此針對這個方法的分析需要分並行與非並行兩個場景。

1.3.1 非並行

如果Stream是非並行的,combiner不生效;
其計算過程與兩個引數時的Reduce基本是一致的。
如Stream中包含了N個元素,其計算過程使用Java程式碼表述如下:

U result = identity;  
for (T element:a) {
    result = accumulator.apply(result, element);  
}
return result; 

這個含義與1.2中的含義基本是一樣的——除了型別上,Result的型別是U,而Element的型別是T!如果U與T一樣,那麼與1.2就是完全一樣的;
就是因為不一樣,就存在很多種用法了。如假設U的型別是ArrayList,那麼可以將Stream中所有元素新增到ArrayList中再返回了,如下示例:

/**
 * 以下reduce生成的List將會是[aa, ab, c, ad]
 * Lambda語法:
 *  System.out.println(s1.reduce(new ArrayList<String>(), (r, t) -> {r.add(t); return r; }, (r1, r2) -> r1));
 */
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
System.out.println(s1.reduce(new ArrayList<String>(),
        new BiFunction<ArrayList<String>, String, ArrayList<String>>() {
            @Override
            public ArrayList<String> apply(ArrayList<String> u, String s) {
                u.add(s);
                return u;
            }
        }, new BinaryOperator<ArrayList<String>>() {
            @Override
            public ArrayList<String> apply(ArrayList<String> strings, ArrayList<String> strings2) {
                return strings;
            }
        }));

也可以進行元素過濾,即模擬Stream中的Filter函式:

/**
 * 模擬Filter查詢其中含有字母a的所有元素,列印結果將是aa ab ad
 * lambda語法:
 * s1.reduce(new ArrayList<String>(), (r, t) -> {if (predicate.test(t)) r.add(t);  return r; },
        (r1, r2) -> r1).stream().forEach(System.out::println);
 */
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
Predicate<String> predicate = t -> t.contains("a");
s1.reduce(new ArrayList<String>(), new BiFunction<ArrayList<String>, String, ArrayList<String>>() {
            @Override
            public ArrayList<String> apply(ArrayList<String> strings, String s) {
                if (predicate.test(s)) strings.add(s);
                return strings;
            }
        },
        new BinaryOperator<ArrayList<String>>() {
            @Override
            public ArrayList<String> apply(ArrayList<String> strings, ArrayList<String> strings2) {
                return strings;  
            }
        }).stream().forEach(System.out::println);

注意由於是非並行的,第三個引數實際上沒有什麼意義,可以指定r1或者r2為其返回值,甚至可以指定null為返回值。

1.3.2 並行

當Stream是並行時,第三個引數就有意義了,它會將不同執行緒計算的結果呼叫combiner做彙總後返回。
注意由於採用了平行計算,前兩個引數與非並行時也有了差異!
舉個簡單點的例子,計算4+1+2+3的結果,其中4是初始值:

/**
 * lambda語法:
 * System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, (s1, s2) -> s1 + s2
 , (s1, s2) -> s1 + s2));
 **/
System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        }
        , new BinaryOperator<Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        }));

並行時的計算結果是18,而非並行時的計算結果是10!
為什麼會這樣?
先分析下非並行時的計算過程;第一步計算4 + 1 = 5,第二步是5 + 2 = 7,第三步是7 + 3 = 10。按1.3.1中所述來理解沒有什麼疑問。
那問題就是非並行的情況與理解有不一致的地方了!
先分析下它可能是通過什麼方式來並行的?按非並行的方式來看它是分了三步的,每一步都要依賴前一步的運算結果!那應該是沒有辦法進行平行計算的啊!可實際上現在平行計算出了結果並且關鍵其結果與非並行時是不一致的!
那要不就是理解上有問題,要不就是這種方式在平行計算上存在BUG。
暫且認為其不存在BUG,先來看下它是怎麼樣出這個結果的。猜測初始值4是儲存在一個變數result中的;平行計算時,執行緒之間沒有影響,因此每個執行緒在呼叫第二個引數BiFunction進行計算時,直接都是使用result值當其第一個引數(由於Stream計算的延遲性,在呼叫最終方法前,都不會進行實際的運算,因此每個執行緒取到的result值都是原始的4),因此計算過程現在是這樣的:執行緒1:1 + 4 = 5;執行緒2:2 + 4 = 6;執行緒3:3 + 4 = 7;Combiner函式: 5 + 6 + 7 = 18!
通過多種情況的測試,其結果都符合上述推測!

如以下示例:

/**
 * lambda語法:
 * System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, (s1, s2) -> s1 + s2
 , (s1, s2) -> s1 * s2));
 */
System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        }
        , new BinaryOperator<Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) {
                return integer * integer2;
            }
        }));

以上示例輸出的結果是210!
它表示的是,使用4與1、2、3中的所有元素按(s1,s2) -> s1 + s2(accumulator)的方式進行第一次計算,得到結果序列4+1, 4+2, 4+3,即5、6、7;然後將5、6、7按combiner即(s1, s2) -> s1 * s2的方式進行彙總,也就是5 * 6 * 7 = 210。
使用函式表示就是:(4+1) * (4+2) * (4+3) = 210;

reduce的這種寫法可以與以下寫法結果相等(但過程是不一樣的,三個引數時會進行並行處理):

System.out.println(Stream.of(1, 2, 3).map(n -> n + 4).reduce((s1, s2) -> s1 * s2));

這種方式有助於理解並行三個引數時的場景,實際上就是第一步使用accumulator進行轉換(它的兩個輸入引數一個是identity, 一個是序列中的每一個元素),由N個元素得到N個結果;第二步是使用combiner對第一步的N個結果做彙總。

但這裡需要注意的是,如果第一個引數的型別是ArrayList等物件而非基本資料型別的包裝類或者String,第三個函式的處理上可能容易引起誤解,如以下示例:

/**
 * 模擬Filter查詢其中含有字母a的所有元素,列印結果將是aa ab ad
 * lambda語法:
 * s1.parallel().reduce(new ArrayList<String>(), (r, t) -> {if (predicate.test(t)) r.add(t);  return r; },
 (r1, r2) -> {System.out.println(r1==r2); return r2; }).stream().forEach(System.out::println);
 */
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
Predicate<String> predicate = t -> t.contains("a");
s1.parallel().reduce(new ArrayList<String>(), new BiFunction<ArrayList<String>, String, ArrayList<String>>() {
            @Override
            public ArrayList<String> apply(ArrayList<String> strings, String s) {
                if (predicate.test(s)) {
                    strings.add(s);
                }

                return strings;
            }
        },
        new BinaryOperator<ArrayList<String>>() {
            @Override
            public ArrayList<String> apply(ArrayList<String> strings, ArrayList<String> strings2) {
                System.out.println(strings == strings2);
                return strings;
            }
        }).stream().forEach(System.out::println);

其中System.out.println(r1==r2)這句列印的結果是什麼呢?經過執行後發現是True!
為什麼會這樣?這是因為每次第二個引數也就是accumulator返回的都是第一個引數中New的ArrayList物件!因此combiner中傳入的永遠都會是這個物件,這樣r1與r2就必然是同一樣物件!
因此如果按理解的,combiner是將不同執行緒操作的結果彙總起來,那麼一般情況下上述程式碼就會這樣寫(lambda): 

Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");

//模擬Filter查詢其中含有字母a的所有元素,由於使用了r1.addAll(r2),其列印結果將不會是預期的aa ab ad
Predicate<String> predicate = t -> t.contains("a");
s1.parallel().reduce(new ArrayList<String>(), (r, t) -> {if (predicate.test(t)) r.add(t);  return r; },
        (r1, r2) -> {r1.addAll(r2); return r1; }).stream().forEach(System.out::println);

這個時候出來的結果與預期的結果就完全不一樣了,要多了很多元素!

3.1.2.7 collect

collect含義與Reduce有點相似;
先看其定義:

<R> R collect(Supplier<R> supplier,
              BiConsumer<R, ? super T> accumulator,
              BiConsumer<R, R> combiner);

仍舊先分析其引數(參考其JavaDoc):

  • supplier:動態的提供初始化的值;建立一個可變的結果容器(JAVADOC);對於平行計算,這個方法可能被呼叫多次,每次返回一個新的物件;
  • accumulator:型別為BiConsumer,注意這個介面是沒有返回值的;它必須將一個元素放入結果容器中(JAVADOC)。
  • combiner:型別也是BiConsumer,因此也沒有返回值。它與三引數的Reduce型別,只是在平行計算時彙總不同執行緒計算的結果。它的輸入是兩個結果容器,必須將第二個結果容器中的值全部放入第一個結果容器中(JAVADOC)。

可見Collect與分並行與非並行兩種情況。
下面對並行情況進行分析。
直接使用上面Reduce模擬Filter的示例進行演示(使用lambda語法):

/**
 * 模擬Filter查詢其中含有字母a的所有元素,列印結果將是aa ab ad
 */
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
Predicate<String> predicate = t -> t.contains("a");
System.out.println(s1.parallel().collect(() -> new ArrayList<String>(),
        (array, s) -> {if (predicate.test(s)) array.add(s); },
        (array1, array2) -> array1.addAll(array2)));

根據以上分析,這邊理解起來就很容易了:每個執行緒都建立了一個結果容器ArrayList,假設每個執行緒處理一個元素,那麼處理的結果將會是[aa],[ab],[],[ad]四個結果容器(ArrayList);最終再呼叫第三個BiConsumer引數將結果全部Put到第一個List中,因此返回結果就是列印的結果了。

JAVADOC中也在強調結果容器(result container)這個,那是否除集合型別,其結果R也可以是其它型別呢?
先看基本型別,由於BiConsumer不會有返回值,如果是基本資料型別或者String,在BiConsumer中加工後的結果都無法在這個函式外體現,因此是沒有意義的。
那其它非集合型別的Java物件呢?如果物件中包含有集合型別的屬性,也是可以處理的;否則,處理上也沒有任何意義,combiner物件使用一個Java物件來更新另外一個物件?至少目前我沒有想到這個有哪些應用場景。它不同Reduce,Reduce在Java物件上是有應用場景的,就因為Reduce即使是並行情況下,也不會建立多個初始化物件,combiner接收的兩個引數永遠是同一個物件,如假設人有很多條參加會議的記錄,這些記錄沒有在人本身物件裡面儲存而在另外一個物件中;人本身物件中只有一個屬性是最早參加會議時間,那就可以使用reduce來對這個屬性進行更新。當然這個示例不夠完美,它能使用其它更快的方式實現,但至少通過Reduce是能夠實現這一型別的功能的。