Android從零開始學習Rxjava2(三)
rxjava2變換運算子
rxjava一樣提供了很多變換運算子幫助我們更簡單的轉轉發出的Observable。這些變換運算子也是我們相對來說比較常用到的,所以對於每個變換運算子我們都單獨拿出來簡單記錄下。
Buffer
定期將Observable發出的專案收集到束中併發出這些束,而不是一次傳送一個專案。Buffer運算子將一個Observable轉換為另一個Observable,該Observable將發出這些項的緩衝集合。
上面這句話可能難以直接理解,其實buffer顧名思義就是緩衝,rxjava提供buffer可以讓我們按大小和時間來緩衝發出的資料,等緩衝條件滿足或者發射結束後再發出緩衝的資料。
buffer提供瞭如下幾種方法,我們來簡單說下
1.buffer(int count)
以列表的形式發出非重疊緩衝區,每個緩衝區為一組發出。
舉個例子,如果發射0到9共十個數,設定緩衝區size為3,則會每緩衝3個元素就緒後就將這幾個為一組直接發出,直到最後將剩餘的資料為一組發出。
private void doSomeWork() {
Observable.just("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.buffer(3)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> stringList) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("[");
for (String value : stringList) {
builder.append(value + ",");
}
builder.append("]");
Log.d(TAG, " accept : value : " + builder.toString());
}
});
}
列印結果
accept : value : [0,1,2,]
accept : value : [3,4,5,]
accept : value : [6,7,8,]
accept : value : [9,]
2.buffer(count, skip)
從第一項開始,取count個項建立快取,從第一項開始skip個項之後再取count個項建立快取。根據count和skip的值,這些緩衝區可能重疊(多個緩衝區可能包含相同的項),也可能有間隙(源Observable發出的項不在任何緩衝區中)。
這個方法主要注意的引數是skip,skip故名思義是跳過,由於第一次快取後是每跳過skip個元素後再去取元素快取,所以有三種情況發生
1.count = skip 非重疊不遺漏緩衝,等同於buffer(int count)
2.count > skip 重疊不遺漏緩衝
3.count < skip 非重疊遺漏緩衝
為方便理解,我一樣寫下例子。先舉個count > skip的。
如果發射0到9共十個數,設定緩衝區size為3,skip為2,則每次緩衝3個元素為一組發出,第一次發出0,1,2,這時skip兩個元素,0和1,下一次快取則從元素2開始,快取3個元素,即2,3,4,依次類似。
private void doSomeWork() {
Observable.just("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.buffer(3, 2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> stringList) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("[");
for (String value : stringList) {
builder.append(value + ",");
}
builder.append("]");
Log.d(TAG, " accept : value : " + builder.toString());
}
});
}
列印結果,可發現count > skip時,緩衝區元素被重疊了一部分
accept : value : [0,1,2,]
accept : value : [2,3,4,]
accept : value : [4,5,6,]
accept : value : [6,7,8,]
accept : value : [8,9,]
再舉個count < skip的例子。如果發射0到9共十個數,設定緩衝區size為2,skip為3,則每次緩衝2個元素為一組發出,第一次發出0,1,這時skip三個元素,0、1和2,下一次快取則從元素3開始,快取2個元素,即3,4,依次類似。
private void doSomeWork() {
Observable.just("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.buffer(2, 3)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> stringList) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("[");
for (String value : stringList) {
builder.append(value + ",");
}
builder.append("]");
Log.d(TAG, " accept : value : " + builder.toString());
}
});
}
列印結果,可發現count < skip時,緩衝區元素沒有重疊,但有一部分元素被遺漏
accept : value : [0,1,]
accept : value : [3,4,]
accept : value : [6,7,]
accept : value : [9,]
3.buffer(long timespan, TimeUnit unit)
按時間片段快取,等時傳送快取區的一組資料。
buffer不僅可以按元素個數來分組,也可以按時間片段來分組,與上面按元素個數分組類似,只是將分組的條件換成了時間值。
一樣舉個例子,我們每隔100ms傳送一個元素,每隔250ms緩衝一組,看看列印結果
private void doSomeWork() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.buffer(250, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("[");
for (Long value : longs) {
builder.append(value + ",");
}
builder.append("]");
Log.d(TAG, " accept : value : " + builder.toString());
}
});
}
列印結果
accept : value : [0,1,]
accept : value : [2,3,]
accept : value : [4,5,6,]
accept : value : [7,8,9,]
按時間快取,元素的個數取決於這段時間裡面總共發出的個數,所以快取集合的size是不固定的,可能為空,也可能很多。
4.buffer(boundary[, initialCapacity])
buffer(boundary)監視一個Observable邊界。 每次Observable發出一個專案時,它都會建立一個新的List來開始收集源Observable發出的專案併發出前一個List。
buffer(boundary[, initialCapacity])這個方法分組快取的條件是以另外一個Observable邊界為條件,也就是說當另外一個Observable開始發射訊號時,會觸發快取的條件。我們把上面按時間分組的例子更改下,用邊界分組的方式實現。
private void doSomeWork() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.buffer(Observable.interval(250,TimeUnit.MILLISECONDS))
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("[");
for (Long value : longs) {
builder.append(value + ",");
}
builder.append("]");
Log.d(TAG, " accept : value : " + builder.toString());
}
});
}
列印結果。監聽到Observable.interval(250,TimeUnit.MILLISECONDS)的訊號時產生分組。
accept : value : [0,1,]
accept : value : [2,3,4,]
accept : value : [5,6,]
accept : value : [7,8,]
accept : value : [9,]
5.buffer(bufferOpenings, bufferClosingSelector)
bufferOpenings,它發出BufferOpening物件。 每次觀察到這樣一個發射的專案時,bufferOpenings會建立一個新的List來開始收集source Observable發出的專案,並將bufferOpenings Observable傳遞給closingSelector函式。 closingSelector函式也返回一個Observable。 緩衝區監視closingSelector函式返回的Observable,當檢測到它的發射項時,它會關閉List並將其作為自己的發射發出。
這個方法看起來有點麻煩,其實簡單來說就是監聽了兩個Observable發射時間作為緩衝區開始和結束的節點。
我們一樣來舉個例子,
private void doSomeWork() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.buffer(Observable.interval(250, TimeUnit.MILLISECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Long aLong) throws Exception {
String str = "this aLong is " + aLong;
Log.d(TAG, "openingIndicator apply :" + str);
return Observable.just(str);
}
}), new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.interval(200, TimeUnit.MILLISECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Long aLong) throws Exception {
String str = "this aLong is " + aLong;
Log.d(TAG, "closingIndicator apply :" + str);
return Observable.just(str);
}
});
}
})
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("[");
for (Long value : longs) {
builder.append(value + ",");
}
builder.append("]");
Log.d(TAG, " accept : value : " + builder.toString());
}
});
}
列印結果,由於interval第一個緩衝區開始是在250ms的時刻,前面source Observable每隔100ms發射一個數據,已經發射出去了0和1,所以列印accept第一組是[2,3]。
openingIndicator apply :this aLong is 0
closingIndicator apply :this aLong is 0
accept : value : [2,3,]
openingIndicator apply :this aLong is 1
closingIndicator apply :this aLong is 0
accept : value : [5,6,]
openingIndicator apply :this aLong is 2
closingIndicator apply :this aLong is 0
accept : value : [7,8,]
openingIndicator apply :this aLong is 3
accept : value : [9,]
上面的例子我們沒有使用bufferOpenings和bufferClosingSelector裡的Observable發射的資料,他們對source Observable發射的資料來說沒有關係,只是當bufferOpenings和bufferClosingSelector發射資料時,控制buffer開始和結束的緩衝時間節點。
好了,buffer的部分就只寫這麼多,其他的方法其實萬變不離其中,只是不同的方法控制快取區開啟結束的條件不同而已。