1. 程式人生 > >Android從零開始學習Rxjava2(三)

Android從零開始學習Rxjava2(三)

rxjava2變換運算子

rxjava一樣提供了很多變換運算子幫助我們更簡單的轉轉發出的Observable。這些變換運算子也是我們相對來說比較常用到的,所以對於每個變換運算子我們都單獨拿出來簡單記錄下。

Buffer

定期將Observable發出的專案收集到束中併發出這些束,而不是一次傳送一個專案。Buffer運算子將一個Observable轉換為另一個Observable,該Observable將發出這些項的緩衝集合。
上面這句話可能難以直接理解,其實buffer顧名思義就是緩衝,rxjava提供buffer可以讓我們按大小和時間來緩衝發出的資料,等緩衝條件滿足或者發射結束後再發出緩衝的資料。

buffer提供瞭如下幾種方法,我們來簡單說下
在這裡插入圖片描述

1.buffer(int count)

以列表的形式發出非重疊緩衝區,每個緩衝區為一組發出。
在這裡插入圖片描述
舉個例子,如果發射0到9共十個數,設定緩衝區size為3,則會每緩衝3個元素就緒後就將這幾個為一組直接發出,直到最後將剩餘的資料為一組發出。

private void doSomeWork() {
        Observable.just("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
                .buffer(3)
                .subscribe(new
Consumer<List<String>>() { @Override public void accept(List<String> stringList) throws Exception { StringBuilder builder = new StringBuilder(); builder.append("["); for
(String value : stringList) { builder.append(value + ","); } builder.append("]"); Log.d(TAG, " accept : value : " + builder.toString()); } }); }

列印結果

accept : value : [0,1,2,]
accept : value : [3,4,5,]
accept : value : [6,7,8,]
accept : value : [9,]

2.buffer(count, skip)

從第一項開始,取count個項建立快取,從第一項開始skip個項之後再取count個項建立快取。根據count和skip的值,這些緩衝區可能重疊(多個緩衝區可能包含相同的項),也可能有間隙(源Observable發出的項不在任何緩衝區中)。
在這裡插入圖片描述
這個方法主要注意的引數是skip,skip故名思義是跳過,由於第一次快取後是每跳過skip個元素後再去取元素快取,所以有三種情況發生
1.count = skip 非重疊不遺漏緩衝,等同於buffer(int count)
2.count > skip 重疊不遺漏緩衝
3.count < skip 非重疊遺漏緩衝
為方便理解,我一樣寫下例子。先舉個count > skip的。

如果發射0到9共十個數,設定緩衝區size為3,skip為2,則每次緩衝3個元素為一組發出,第一次發出0,1,2,這時skip兩個元素,0和1,下一次快取則從元素2開始,快取3個元素,即2,3,4,依次類似。

private void doSomeWork() {
        Observable.just("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
                .buffer(3, 2)
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> stringList) throws Exception {
                        StringBuilder builder = new StringBuilder();
                        builder.append("[");
                        for (String value : stringList) {
                            builder.append(value + ",");
                        }
                        builder.append("]");
                        Log.d(TAG, " accept : value : " + builder.toString());
                    }
                });
    }

列印結果,可發現count > skip時,緩衝區元素被重疊了一部分

accept : value : [0,1,2,]
accept : value : [2,3,4,]
accept : value : [4,5,6,]
accept : value : [6,7,8,]
accept : value : [8,9,]

再舉個count < skip的例子。如果發射0到9共十個數,設定緩衝區size為2,skip為3,則每次緩衝2個元素為一組發出,第一次發出0,1,這時skip三個元素,0、1和2,下一次快取則從元素3開始,快取2個元素,即3,4,依次類似。

private void doSomeWork() {
        Observable.just("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
                .buffer(2, 3)
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> stringList) throws Exception {
                        StringBuilder builder = new StringBuilder();
                        builder.append("[");
                        for (String value : stringList) {
                            builder.append(value + ",");
                        }
                        builder.append("]");
                        Log.d(TAG, " accept : value : " + builder.toString());
                    }
                });
    }

列印結果,可發現count < skip時,緩衝區元素沒有重疊,但有一部分元素被遺漏

accept : value : [0,1,]
accept : value : [3,4,]
accept : value : [6,7,]
accept : value : [9,]

3.buffer(long timespan, TimeUnit unit)

按時間片段快取,等時傳送快取區的一組資料。
在這裡插入圖片描述
buffer不僅可以按元素個數來分組,也可以按時間片段來分組,與上面按元素個數分組類似,只是將分組的條件換成了時間值。
一樣舉個例子,我們每隔100ms傳送一個元素,每隔250ms緩衝一組,看看列印結果

private void doSomeWork() {
        Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(10)
                .buffer(250, TimeUnit.MILLISECONDS)
                .subscribe(new Consumer<List<Long>>() {
                    @Override
                    public void accept(List<Long> longs) throws Exception {
                        StringBuilder builder = new StringBuilder();
                        builder.append("[");
                        for (Long value : longs) {
                            builder.append(value + ",");
                        }
                        builder.append("]");
                        Log.d(TAG, " accept : value : " + builder.toString());
                    }
                });
    }

列印結果

accept : value : [0,1,]
accept : value : [2,3,]
accept : value : [4,5,6,]
accept : value : [7,8,9,]

按時間快取,元素的個數取決於這段時間裡面總共發出的個數,所以快取集合的size是不固定的,可能為空,也可能很多。

4.buffer(boundary[, initialCapacity])

buffer(boundary)監視一個Observable邊界。 每次Observable發出一個專案時,它都會建立一個新的List來開始收集源Observable發出的專案併發出前一個List。
在這裡插入圖片描述buffer(boundary[, initialCapacity])這個方法分組快取的條件是以另外一個Observable邊界為條件,也就是說當另外一個Observable開始發射訊號時,會觸發快取的條件。我們把上面按時間分組的例子更改下,用邊界分組的方式實現。

private void doSomeWork() {
        Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(10)
                .buffer(Observable.interval(250,TimeUnit.MILLISECONDS))
                .subscribe(new Consumer<List<Long>>() {
                    @Override
                    public void accept(List<Long> longs) throws Exception {
                        StringBuilder builder = new StringBuilder();
                        builder.append("[");
                        for (Long value : longs) {
                            builder.append(value + ",");
                        }
                        builder.append("]");
                        Log.d(TAG, " accept : value : " + builder.toString());
                    }
                });
    }

列印結果。監聽到Observable.interval(250,TimeUnit.MILLISECONDS)的訊號時產生分組。

accept : value : [0,1,]
accept : value : [2,3,4,]
accept : value : [5,6,]
accept : value : [7,8,]
accept : value : [9,]

5.buffer(bufferOpenings, bufferClosingSelector)

bufferOpenings,它發出BufferOpening物件。 每次觀察到這樣一個發射的專案時,bufferOpenings會建立一個新的List來開始收集source Observable發出的專案,並將bufferOpenings Observable傳遞給closingSelector函式。 closingSelector函式也返回一個Observable。 緩衝區監視closingSelector函式返回的Observable,當檢測到它的發射項時,它會關閉List並將其作為自己的發射發出。
在這裡插入圖片描述
這個方法看起來有點麻煩,其實簡單來說就是監聽了兩個Observable發射時間作為緩衝區開始和結束的節點。
我們一樣來舉個例子,

private void doSomeWork() {
        Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(10)
                .buffer(Observable.interval(250, TimeUnit.MILLISECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Long aLong) throws Exception {
                        String str = "this aLong is " + aLong;
                        Log.d(TAG, "openingIndicator apply :" + str);
                        return Observable.just(str);
                    }
                }), new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
                        return Observable.interval(200, TimeUnit.MILLISECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
                            @Override
                            public ObservableSource<String> apply(Long aLong) throws Exception {
                                String str = "this aLong is " + aLong;
                                Log.d(TAG, "closingIndicator apply :" + str);
                                return Observable.just(str);
                            }
                        });
                    }
                })
                .subscribe(new Consumer<List<Long>>() {
                    @Override
                    public void accept(List<Long> longs) throws Exception {
                        StringBuilder builder = new StringBuilder();
                        builder.append("[");
                        for (Long value : longs) {
                            builder.append(value + ",");
                        }
                        builder.append("]");
                        Log.d(TAG, " accept : value : " + builder.toString());
                    }
                });
    }

列印結果,由於interval第一個緩衝區開始是在250ms的時刻,前面source Observable每隔100ms發射一個數據,已經發射出去了0和1,所以列印accept第一組是[2,3]。

openingIndicator apply :this aLong is 0
closingIndicator apply :this aLong is 0
accept : value : [2,3,]
openingIndicator apply :this aLong is 1
closingIndicator apply :this aLong is 0
accept : value : [5,6,]
openingIndicator apply :this aLong is 2
closingIndicator apply :this aLong is 0
accept : value : [7,8,]
openingIndicator apply :this aLong is 3
accept : value : [9,]

上面的例子我們沒有使用bufferOpenings和bufferClosingSelector裡的Observable發射的資料,他們對source Observable發射的資料來說沒有關係,只是當bufferOpenings和bufferClosingSelector發射資料時,控制buffer開始和結束的緩衝時間節點。

好了,buffer的部分就只寫這麼多,其他的方法其實萬變不離其中,只是不同的方法控制快取區開啟結束的條件不同而已。