1. 程式人生 > >RxJava使用詳解--過濾操作符

RxJava使用詳解--過濾操作符

RxJava使用詳解系列文章

詳細的例子可以檢視文章末尾的原始碼


這篇文章主要講RxJava中常見的過濾操作符

1.debounce操作符

源Observable每發射一個數據項,如果在debounce規定的間隔時間內Observable沒有發射新的資料項,debounce就把這個結果提交給訂閱者處理,如果在規定的間隔時間內產生了其他結果,就忽略掉髮射的這個資料,通過制定的時間間隔來限流,可以過濾掉髮射速率過快的資料項,預設在computatiion排程器上執行,可以指定執行執行緒。

注意:如果源Observable發射最後一個數據後,在debounce規定的時間間隔內呼叫了onCompleted,那麼通過debounce操作符就把這個結果提交給訂閱者
 throttleWithTimeOut使用也是呼叫了debounce操作符來實現


Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
public void call(Subscriber<? super Integer> subscriber) {
        if (subscriber.isUnsubscribed()) return;//如果沒有訂閱者就直接返回try {
            //發射資料的時間間隔:100~900毫秒
for (int i = 0; i < 10; i++) {
                subscriber.onNext(i);
                Thread.sleep
(i * 100); } subscriber.onCompleted(); } catch (InterruptedException e) { subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()) .debounce(400, TimeUnit.MILLISECONDS)//超時時間為400毫秒,預期結果:時間間隔在400毫秒以上的資料都會提交給訂閱者,其他的不會。.subscribe(new Action1<Integer>() { @Override
public void call(Integer integer) { System.out.println("debounce onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });
輸出結果:

12-19 15:22:26.910 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:4 所線上程:RxComputationScheduler-1
12-19 15:22:27.310 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:5 所線上程:RxComputationScheduler-1
12-19 15:22:27.811 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:6 所線上程:RxComputationScheduler-1
12-19 15:22:28.411 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:7 所線上程:RxComputationScheduler-1
12-19 15:22:29.111 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:8 所線上程:RxComputationScheduler-1
12-19 15:22:29.911 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:9 所線上程:RxComputationScheduler-1

2.distinct()操作符:

過濾掉重複的資料項。過濾規則是隻允許沒有發射過的資料項通過。

變體distinct(Func1)根據返回的key值去過濾,不用資料本身.
distinctUntilChanged()只判斷這個資料項跟前一個數據項是否相同,distinctUnitilChanged(Func1)也是根據返回的key值去比較過濾。
預設不在任何特定的排程器上執行。

Observable.just(1,2,2,3,4,4,5).distinct().subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("distinct() onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:

distinct() onNext:1 所線上程:main
distinct() onNext:2 所線上程:main
distinct() onNext:3 所線上程:main
distinct() onNext:4 所線上程:main
distinct() onNext:5 所線上程:main

Observable.just(1,2,2,3,4,4,5).distinct(new Func1<Integer, String>() {
    @Override
public String call(Integer integer) {
        return 3 < integer ? "第一組" :"第二組";//這裡返回key值,小於3的key值是第一組,也就是說1和2的key值都是第一組,只會將1提交給訂閱者,2的key值與1相同就直接被過濾掉了,這個變體是根據key值進行過濾的
    }
}).subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("distinct(Func1) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:

distinct(Func1) onNext:1 所線上程:main
distinct(Func1) onNext:4 所線上程:main

Observable.just(1,2,3,2,4,5,4).distinctUntilChanged().subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("distinctUntilChanged() onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:

distinctUntilChanged() onNext:1 所線上程:main
distinctUntilChanged() onNext:2 所線上程:main
distinctUntilChanged() onNext:3 所線上程:main
distinctUntilChanged() onNext:2 所線上程:main
distinctUntilChanged() onNext:4 所線上程:main
distinctUntilChanged() onNext:5 所線上程:main
distinctUntilChanged() onNext:4 所線上程:main

3.elementAt(index)

將指定索引的資料項提交給訂閱者,索引是從0開始,當沒有這個索引或者索引為負數會拋異常。

elementAtOrDefault(index,default):這個會設定一個預設值,當沒有指定的索引就提交預設值給訂閱者,為負數就拋異常。
Observable.just(1,2,3,4).elementAt(3).subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("elementAt onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
Observable.just(1,2,3,4).elementAt(3).subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("elementAt onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:elementAt onNext:4 所線上程:main

Observable.just(1,2,3,4).elementAtOrDefault(6,6).subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("elementAtOrDefault onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:elementAtOrDefault onNext:6 所線上程:main

4. filter操作符對源Observable發射的資料項按照指定的條件進行過濾,滿足的條件的才會調給訂閱者。預設不在任何特定的排程器上執行


Observable.just(1,2,3,4,5).filter(new Func1<Integer, Boolean>() {
    @Override
public Boolean call(Integer integer) {
        return integer > 3;
    }
}).subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("filter(Func1) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:

filter(Func1) onNext:4 所線上程:main
filter(Func1) onNext:5 所線上程:main

5.first()操作符提交源Observable發射的第一項資料,如果只是想要一個過濾符,最好使用take(2)或者elementAt(0)

first(Func1)操作符是提交第一項符合自定義條件的資料
firstOrDefault(T)操作符是在Observable沒有發射任何資料時提交一個指定的預設值
takeFirst(Func1)操作符提交符合自定義條件的的第一項資料,
與first(Func1)不同的是,takeFirst(Func1)在沒有符合條件的時候,會呼叫onCompleted,而first(Func1)會拋一個NoSuchElementException的異常
Observable.just(1,2,3).first().subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("first() onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:first() onNext:1 所線上程:main

Observable.just(1,2,3).first(new Func1<Integer, Boolean>() {
    @Override
public Boolean call(Integer integer) {
        return 1 < integer;
    }
}).subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("first(Func1) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:first(Func1) onNext:2 所線上程:main

6.ignoreElements()操作符不提交任何資料給訂閱者,只提交終止通知(onError或者onCompeleted)給訂閱者,預設不在任何特定的排程器上執行


7.last()操作符與first()操作符相反,只提交最後一個數據項給訂閱者,如果只是作為過濾操作符,最好使用takeLast(1),

官方文件解釋說:first()操作符和last()操作符在某些實現中會返回一個阻塞函式。
與first()操作符系列對應,也有last(Func1)、lastOrDefault(T)、lastOrDefault(T,Func1)

8.ofType操作符類似於filter操作符,區別在於ofType按照資料項的型別進行過濾,預設不在任何特定的排程器上執行


Observable.just(1,"String型別",true).ofType(String.class).subscribe(new Action1<String>() {
    @Override
public void call(String s) {
        System.out.println("ofType(class) onNext:" + s + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:ofType(class) onNext:String型別 所線上程:main

9.sample操作符對Observable發射的資料定時進行取樣,採的是自從上一次取樣以來,Observable最近發射的一項資料,也就是這段時間間隔中最後一個數據項。如果自上一次取樣以來,源Observable沒有發射任何資料,sample操作符返回的Observable在此段時間也不會發射任何資料

預設在computation排程器上執行,但是可以指定它執行的執行緒 sample(long,TimeUnit,Scheduler)
與之對應的操作符是throttleFirst,它取樣的是取樣時間間隔中第一項資料,在最後一個時間段會發射最後一個數據項,看下面例子
* △time = 2.2s
* Data :    1------2------3------4------5------6------7
* Time(s) : 0------1------2------3------4------5------6------7................
* △time點: 0------1------2-。----3------4-。---5------6----。-7................
* SampleResults:  3,5,7
* ThrottleFirst:  1,4,7

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
public void call(Subscriber<? super Integer> subscriber) {
        if (subscriber.isUnsubscribed()) return;//如果沒有訂閱者直接返回
try {
            //前3個數字的時間間隔設定1秒,最後一個設定2秒
for (int i = 1; i <8 ; i++) {
                subscriber.onNext(i);
                Thread.sleep(1000);
            }
            subscriber.onCompleted();
        } catch (InterruptedException e) {
           subscriber.onError(e);
        }
    }
}).sample(2200, TimeUnit.MILLISECONDS)//取樣間隔時間為2200毫秒
.subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("sample  onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:

sample  onNext:3 所線上程:RxComputationScheduler-1
sample  onNext:5 所線上程:RxComputationScheduler-1
sample  onNext:7 所線上程:RxComputationScheduler-1


Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override
public void call(Subscriber<? super Integer> subscriber) {
        if (subscriber.isUnsubscribed()) return;
        try {
            for (int i = 1; i < 9 ; i++) {
                subscriber.onNext(i);
                Thread.sleep(1000);
            }
            subscriber.onCompleted();
        } catch (InterruptedException e) {
            subscriber.onError(e);
        }
    }
}).throttleFirst(2200,TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Integer>() {
            @Override
public void call(Integer integer) {
                System.out.println("throttleFirst  onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
            }
        });
輸出結果:

throttleFirst  onNext:1 所線上程:main
throttleFirst  onNext:4 所線上程:main
throttleFirst  onNext:7 所線上程:main

10.single()操作符在源Observable只發射一個數據項的時候,single()操作符會將這個資料提交給訂閱者,大於1個就拋Sequence contains too many elements的異常,不是正好是一個數據項就會拋異常

single(Func1)操作符是對源Observable發射的資料項進行判斷,如果符合條件的資料數量大於1就會拋異常。不是正好是一個數據項就會拋異常
也有設定預設值得api,預設不在任何特定的排程器上執行
11.skip操作符
skip(count) 對於源Observable發射的資料項,跳過前count項,將後面的資料項提交給訂閱者
skip(long,TimeUnit)對於原Observalbe發射的資料項,跳過long前的資料項,將之後的資料提交給訂閱者,可以指定執行執行緒

 
Observable.just(1,2,3,4,5,6).skip(3).subscribe(new Action1<Integer>() {
    @Override
public void call(Integer integer) {
        System.out.println("skip(count) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:

skip(count) onNext:4 所線上程:main
skip(count) onNext:5 所線上程:main
skip(count) onNext:6 所線上程:main

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
public void call(Subscriber<? super Integer> subscriber) {
        if (subscriber.isUnsubscribed()) return;
        try {
            //每隔一秒發射一項資料
for (int i = 1; i < 5 ; i++) {
                subscriber.onNext(i);
                Thread.sleep(1000);
            }
            subscriber.onCompleted();
        } catch (InterruptedException e) {
            subscriber.onError(e);
        }
    }
}).skip(2, TimeUnit.SECONDS, Schedulers.newThread()).subscribe(new Action1<Integer>() {//發射2秒之後的資料項
@Override
public void call(Integer integer) {
        System.out.println("skip(long,TimeUnit) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName());
    }
});
輸出結果:

skip(long,TimeUnit) onNext:3 所線上程:main
skip(long,TimeUnit) onNext:4 所線上程:main

12.skipLast操作符

skipLast(count) 對於源Observable發射的資料項,省略最後count項,將前面的資料項提交給訂閱者
skipLast(long,TimeUnit)對於原Observalbe發射的資料項,省略最後long時間段的資料項,將之前的資料提交給訂閱者,可以指定執行執行緒


13.take操作符

take(count)操作符對於源Observable發射的資料項,提取前面的count項資料提交給訂閱者,忽略後面的
take(long,TimeUnit)操作符對於源Obsrvable發射的資料項,提取前面long時間段裡的資料項提交給訂閱者,忽略後面的,可以指定執行緒

14.takeLast操作符

takeLast(count)操作符對於源Observable發射的資料項,提取前面的count項資料提交給訂閱者,忽略後面的
takeLast(long,TimeUnit)操作符對於源Obsrvable發射的資料項,提取前面long時間段裡的資料項提交給訂閱者,忽略後面的,可以指定執行緒



15.takeLastBuffer操作符

takeLastBuffer(count)操作符與takeLast(count)操作符類似,唯一不同就是takeLastBuffer(count)將最後的那些資料項收集到一個list集合中,提交這個集合給訂閱者
takeLastBuffer(long,TimeUnit)操作符與takeLast(long,TimeUnit)操作符類似,唯一不同就是將在最後時間段Long中資料項收集到一個list集合中,
將這個集合提交給了訂閱者,可以指定執行的執行緒。


更多詳細內容和例子,可以檢視原始碼