1. 程式人生 > >RxJava 事件流之過濾資料

RxJava 事件流之過濾資料

現在你可以用 Rx 來處理大批量實時資料,但是如果把所有大批量資料整個打包發給你的話,使用 Rx 還有啥優勢呢? 本節 我們將介紹一些操作函式(operators )來過濾資料、或者把所有資料變成一個需要的資料。

如果你瞭解過函數語言程式設計(functional programming)或者 Java 中的 Stream,則本節介紹的操作函式是非常眼熟的。本節中所有的操作符都返回一個不影響前一個 Observable 的新 Observable。 整個 Rx 框架都遵守該原則。通過建立新的 Observable 來轉換之前的 Observable而不會對之前的 Observable 造成干擾。訂閱到初始 Observable 的 Subscribers 不會受到任何影響,但是在後面的章節中也會看到,開發者也需要當心該原則。
Marble diagrams(彈子圖)

你可以想象一個機器,不停的發射彈子出來,發射出來的彈子可以被其他模組再次加工(比如 上色、把不合格的彈子給回收了),加工完成後再次發射出來 … 彈子圖就是對這個機器的抽象描述。在 Rx 中流行使用這種方式來描述操作符,畢竟圖片看起來直觀多了。 Marble diagrams(彈子圖)基本元素如下:

這裡寫圖片描述

時間從左往右流動,每個圖形代表一個數據,豎線代表發射完成了,而 X 代表出現錯誤了。 操作函式把上面的 Observable 轉換下面的新的 Observable , 裡面的每個資料都被操作函式給處理了並返回一個新的資料。

Filter(過濾資料)

filter 函式使用一個 predicate 函式介面來判斷每個發射的值是否能通過這個判斷。如果返回 true,則該資料繼續往下一個(過濾後的) Observable 發射。

這裡寫圖片描述

比如下面示例建立了一個發射 0 到 9 十個數字的 源Observable。在該 Observable 使用一個 filter 操作來過濾掉奇數,最後只保留偶數。

Observable<Integer> values = Observable.range(0,10);
Subscription oddNumbers = values
    .filter(v -> v % 2 == 0)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: "
+ e), () -> System.out.println("Completed") );

輸出結果如下:

0
2
4
6
8
Completed

distinct 和 distinctUntilChanged

distinct 函式用來過濾掉已經出現過的資料了。

這裡寫圖片描述

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onNext(1);
    o.onNext(2);
    o.onNext(3);
    o.onNext(2);
    o.onCompleted();
});

Subscription subscription = values
    .distinct()
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

1
2
3
Completed

distinct 還有一個過載函式,該函式有個生成 key 的引數。每個發射的資料都使用該引數生成一個 key,然後使用該key 來判斷資料是否一樣。

public final <U> Observable<T> distinct(Func1<? super T,? extends U> keySelector)

這裡寫圖片描述

下面示例中使用字串的第一個字母作為 key 來比較。

Observable<String> values = Observable.create(o -> {
    o.onNext("First");
    o.onNext("Second");
    o.onNext("Third");
    o.onNext("Fourth");
    o.onNext("Fifth");
    o.onCompleted();
});

Subscription subscription = values
    .distinct(v -> v.charAt(0))
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果:

First
Second
Third
Completed

“Fourth” 和 “Fifth” 字串被過濾掉了,應為他們的 key (首字母)和 First 一樣。已經發射過的資料將被過濾掉。

有經驗的碼農知道,該函式在內部維護一個 key 集合來儲存所有已經發射資料的 key,當有新的資料發射的時候,在集合中查詢該 資料的key 是否存在。 在使用 Rx 操作函式的時把內部細節給封裝起來了,但是我們應該注意該問題來避免效能問題。(如果有大量的資料,維護一個內部的集合來儲存 key 可能會佔用很多記憶體。)

distinct 還有個變體是 distinctUntilChanged。區別是 distinctUntilChanged 只過濾相鄰的 key 一樣的資料。

public final Observable<T> distinctUntilChanged()
public final <U> Observable<T> distinctUntilChanged(Func1<? super T,? extends U> keySelector)

這裡寫圖片描述

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onNext(1);
    o.onNext(2);
    o.onNext(3);
    o.onNext(2);
    o.onCompleted();
});

Subscription subscription = values
    .distinctUntilChanged()
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

1
2
3
2
Completed

同樣 distinctUntilChanged 也可以使用一個生成 key 的引數:

Observable<String> values = Observable.create(o -> {
    o.onNext("First");
    o.onNext("Second");
    o.onNext("Third");
    o.onNext("Fourth");
    o.onNext("Fifth");
    o.onCompleted();
});

Subscription subscription = values
    .distinctUntilChanged(v -> v.charAt(0))
    .subscribe(
            v -> System.out.println(v),
            e -> System.out.println("Error: " + e),
            () -> System.out.println("Completed")
        );

輸出結果:

First
Second
Third
Fourth
Completed

ignoreElements

ignoreElements 會忽略所有發射的資料,只讓 onCompleted 和 onError 可以通過。

Observable<Integer> values = Observable.range(0, 10);

Subscription subscription = values
    .ignoreElements()
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

Completed

ignoreElements() 和使用 filter(v -> false) 是一樣的效果。

skip 和 take

下面兩個操作函式依據發射資料的索引來在特定的位置切斷資料流,可以從頭開始切斷也可以從末尾開始切斷。 take 從頭開始獲取前 N 個數據,而 skip 則是從頭開始 跳過 N 個數據。注意,如果發射的資料比 N 小,則這兩個函式都會發射一個 error。

Observable<T>   take(int num)

這裡寫圖片描述

Observable<Integer> values = Observable.range(0, 5);

Subscription first2 = values
    .take(2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

0
1
Completed

熟悉 Java 8 Stream 的同學知道 take 函式和 limit 類似。 limit 函式在 Rx 中也有,和 take 是一樣的。只是為了方便熟悉 limit 的同學使用而已。

只要第 N 個數據可用, take 操作就結束了。 如果在 N 個數據發射之前發生了 error, error 資訊會繼續傳遞到下一個 Observable。 如果 第 N 個數據發射後, take 就不再關心源 Observable 的狀態了。

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onError(new Exception("Oops"));
});

Subscription subscription = values
    .take(1)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

1
Completed

skip 返回 take 操作忽略的另外一部分資料。也就是跳過前面 N 個數據。

Observable<T>   skip(int num)

這裡寫圖片描述

Observable<Integer> values = Observable.range(0, 5);

Subscription subscription = values
    .skip(2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

2
3
4
Completed

除了根據發射資料的索引來過濾資料以外,還可以使用資料流發射的時間來過濾。比如過濾掉前五秒發射的資料。

Observable<T>   take(long time, java.util.concurrent.TimeUnit unit)
Observable<T>   skip(long time, java.util.concurrent.TimeUnit unit)
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

Subscription subscription = values
    .take(250, TimeUnit.MILLISECONDS)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

0
1
Completed

上面示例中只獲取前 250 毫秒發射的資料。 第 300 毫秒才開始發射資料 3, 所以這裡只獲取 0 和1 兩個資料。

skipWhile 和 takeWhile

這兩個函式是使用一個 predicate 引數來當做判斷條件。 如果判斷條件返回為 ture, 則 takeWhile 保留該資料。

Observable<T>   takeWhile(Func1<? super T,java.lang.Boolean> predicate)
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

Subscription subscription = values
    .takeWhile(v -> v < 2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果如下:

0
1
Completed

不出意料, skipWhile 跳過過濾條件為 true 的資料。

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

Subscription subscription = values
    .skipWhile(v -> v < 2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果:

2
3
4
...

skipLast 和 takeLast

skip 和 take 是從頭開始索引資料,而 skipLast 和 takeLast 和他們相反,是從末尾開始索引資料。

Observable<Integer> values = Observable.range(0,5);

Subscription subscription = values
    .skipLast(2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果:

0
1
2
Completed

同樣這兩個函式也有依時間為條件的過載函式。

takeUntil 和 skipUntil

takeUntil 和 skipUntil 這兩個函式和 takeWhile 、skipWhile 剛好相反。 當判斷條件為 false 的時候, takeUntil 保留該資料。

takeUntil 和 skipUntil 還有另外一種不一樣的過載函式。切斷的條件為 另外一個 Observable 發射資料的時刻。

// 獲取源Observable的資料直到 other Observable 發射第一個資料時停止
public final <E> Observable<T> takeUntil(Observable<? extends E> other)

這裡寫圖片描述

Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);

Subscription subscription = values
    .takeUntil(cutoff)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果:

0
1
Completed

你應該還記得,這個 timer 函式會等待 250 毫秒然後發射一個數據。當 takeUntil 收到 這個資料的時候就停止繼續接受 values 發射的資料。 cutoff 這個充當訊號的 Observable 可以是任意資料型別的,這裡不關心資料只關心何時發射了資料。

skipUntil 也是一樣,當收到另外一個 Observable 發射資料的時候,就開始接收 源 Observable 的資料。

Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);

Subscription subscription = values
    .skipUntil(cutoff)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );

輸出結果:

2
3
4
...