RxJava 事件流之過濾資料
現在你可以用 Rx 來處理大批量實時資料,但是如果把所有大批量資料整個打包發給你的話,使用 Rx 還有啥優勢呢? 本節 我們將介紹一些操作函式(operators )來過濾資料、或者把所有資料變成一個需要的資料。
如果你瞭解過函數語言程式設計(functional programming)或者 Java 中的 Stream,則本節介紹的操作函式是非常眼熟的。本節中所有的操作符都返回一個不影響前一個 Observable 的新 Observable。 整個 Rx 框架都遵守該原則。通過建立新的 Observable 來轉換之前的 Observable而不會對之前的 Observable 造成干擾。訂閱到初始 Observable 的 Subscribers 不會受到任何影響,但是在後面的章節中也會看到,開發者也需要當心該原則。
Marble diagrams(彈子圖)
你可以想象一個機器,不停的發射彈子出來,發射出來的彈子可以被其他模組再次加工(比如 上色、把不合格的彈子給回收了),加工完成後再次發射出來 … 彈子圖就是對這個機器的抽象描述。在 Rx 中流行使用這種方式來描述操作符,畢竟圖片看起來直觀多了。 Marble diagrams(彈子圖)基本元素如下:
時間從左往右流動,每個圖形代表一個數據,豎線代表發射完成了,而 X 代表出現錯誤了。 操作函式把上面的 Observable 轉換下面的新的 Observable , 裡面的每個資料都被操作函式給處理了並返回一個新的資料。
Filter(過濾資料)
filter 函式使用一個 predicate 函式介面來判斷每個發射的值是否能通過這個判斷。如果返回 true,則該資料繼續往下一個(過濾後的) Observable 發射。
比如下面示例建立了一個發射 0 到 9 十個數字的 源Observable。在該 Observable 使用一個 filter 操作來過濾掉奇數,最後只保留偶數。
Observable<Integer> values = Observable.range(0,10);
Subscription oddNumbers = values
.filter(v -> v % 2 == 0)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
0
2
4
6
8
Completed
distinct 和 distinctUntilChanged
distinct 函式用來過濾掉已經出現過的資料了。
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onNext(1);
o.onNext(2);
o.onNext(3);
o.onNext(2);
o.onCompleted();
});
Subscription subscription = values
.distinct()
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
1
2
3
Completed
distinct 還有一個過載函式,該函式有個生成 key 的引數。每個發射的資料都使用該引數生成一個 key,然後使用該key 來判斷資料是否一樣。
public final <U> Observable<T> distinct(Func1<? super T,? extends U> keySelector)
下面示例中使用字串的第一個字母作為 key 來比較。
Observable<String> values = Observable.create(o -> {
o.onNext("First");
o.onNext("Second");
o.onNext("Third");
o.onNext("Fourth");
o.onNext("Fifth");
o.onCompleted();
});
Subscription subscription = values
.distinct(v -> v.charAt(0))
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果:
First
Second
Third
Completed
“Fourth” 和 “Fifth” 字串被過濾掉了,應為他們的 key (首字母)和 First 一樣。已經發射過的資料將被過濾掉。
有經驗的碼農知道,該函式在內部維護一個 key 集合來儲存所有已經發射資料的 key,當有新的資料發射的時候,在集合中查詢該 資料的key 是否存在。 在使用 Rx 操作函式的時把內部細節給封裝起來了,但是我們應該注意該問題來避免效能問題。(如果有大量的資料,維護一個內部的集合來儲存 key 可能會佔用很多記憶體。)
distinct 還有個變體是 distinctUntilChanged。區別是 distinctUntilChanged 只過濾相鄰的 key 一樣的資料。
public final Observable<T> distinctUntilChanged()
public final <U> Observable<T> distinctUntilChanged(Func1<? super T,? extends U> keySelector)
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onNext(1);
o.onNext(2);
o.onNext(3);
o.onNext(2);
o.onCompleted();
});
Subscription subscription = values
.distinctUntilChanged()
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
1
2
3
2
Completed
同樣 distinctUntilChanged 也可以使用一個生成 key 的引數:
Observable<String> values = Observable.create(o -> {
o.onNext("First");
o.onNext("Second");
o.onNext("Third");
o.onNext("Fourth");
o.onNext("Fifth");
o.onCompleted();
});
Subscription subscription = values
.distinctUntilChanged(v -> v.charAt(0))
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果:
First
Second
Third
Fourth
Completed
ignoreElements
ignoreElements 會忽略所有發射的資料,只讓 onCompleted 和 onError 可以通過。
Observable<Integer> values = Observable.range(0, 10);
Subscription subscription = values
.ignoreElements()
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
Completed
ignoreElements() 和使用 filter(v -> false) 是一樣的效果。
skip 和 take
下面兩個操作函式依據發射資料的索引來在特定的位置切斷資料流,可以從頭開始切斷也可以從末尾開始切斷。 take 從頭開始獲取前 N 個數據,而 skip 則是從頭開始 跳過 N 個數據。注意,如果發射的資料比 N 小,則這兩個函式都會發射一個 error。
Observable<T> take(int num)
Observable<Integer> values = Observable.range(0, 5);
Subscription first2 = values
.take(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
0
1
Completed
熟悉 Java 8 Stream 的同學知道 take 函式和 limit 類似。 limit 函式在 Rx 中也有,和 take 是一樣的。只是為了方便熟悉 limit 的同學使用而已。
只要第 N 個數據可用, take 操作就結束了。 如果在 N 個數據發射之前發生了 error, error 資訊會繼續傳遞到下一個 Observable。 如果 第 N 個數據發射後, take 就不再關心源 Observable 的狀態了。
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
Subscription subscription = values
.take(1)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
1
Completed
skip 返回 take 操作忽略的另外一部分資料。也就是跳過前面 N 個數據。
Observable<T> skip(int num)
Observable<Integer> values = Observable.range(0, 5);
Subscription subscription = values
.skip(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
2
3
4
Completed
除了根據發射資料的索引來過濾資料以外,還可以使用資料流發射的時間來過濾。比如過濾掉前五秒發射的資料。
Observable<T> take(long time, java.util.concurrent.TimeUnit unit)
Observable<T> skip(long time, java.util.concurrent.TimeUnit unit)
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.take(250, TimeUnit.MILLISECONDS)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
0
1
Completed
上面示例中只獲取前 250 毫秒發射的資料。 第 300 毫秒才開始發射資料 3, 所以這裡只獲取 0 和1 兩個資料。
skipWhile 和 takeWhile
這兩個函式是使用一個 predicate 引數來當做判斷條件。 如果判斷條件返回為 ture, 則 takeWhile 保留該資料。
Observable<T> takeWhile(Func1<? super T,java.lang.Boolean> predicate)
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.takeWhile(v -> v < 2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果如下:
0
1
Completed
不出意料, skipWhile 跳過過濾條件為 true 的資料。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.skipWhile(v -> v < 2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果:
2
3
4
...
skipLast 和 takeLast
skip 和 take 是從頭開始索引資料,而 skipLast 和 takeLast 和他們相反,是從末尾開始索引資料。
Observable<Integer> values = Observable.range(0,5);
Subscription subscription = values
.skipLast(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果:
0
1
2
Completed
同樣這兩個函式也有依時間為條件的過載函式。
takeUntil 和 skipUntil
takeUntil 和 skipUntil 這兩個函式和 takeWhile 、skipWhile 剛好相反。 當判斷條件為 false 的時候, takeUntil 保留該資料。
takeUntil 和 skipUntil 還有另外一種不一樣的過載函式。切斷的條件為 另外一個 Observable 發射資料的時刻。
// 獲取源Observable的資料直到 other Observable 發射第一個資料時停止
public final <E> Observable<T> takeUntil(Observable<? extends E> other)
Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
Subscription subscription = values
.takeUntil(cutoff)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果:
0
1
Completed
你應該還記得,這個 timer 函式會等待 250 毫秒然後發射一個數據。當 takeUntil 收到 這個資料的時候就停止繼續接受 values 發射的資料。 cutoff 這個充當訊號的 Observable 可以是任意資料型別的,這裡不關心資料只關心何時發射了資料。
skipUntil 也是一樣,當收到另外一個 Observable 發射資料的時候,就開始接收 源 Observable 的資料。
Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
Subscription subscription = values
.skipUntil(cutoff)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
輸出結果:
2
3
4
...