RxJava使用詳解--過濾操作符
RxJava使用詳解系列文章
詳細的例子可以檢視文章末尾的原始碼
這篇文章主要講RxJava中常見的過濾操作符
1.debounce操作符
源Observable每發射一個數據項,如果在debounce規定的間隔時間內Observable沒有發射新的資料項,debounce就把這個結果提交給訂閱者處理,如果在規定的間隔時間內產生了其他結果,就忽略掉髮射的這個資料,通過制定的時間間隔來限流,可以過濾掉髮射速率過快的資料項,預設在computatiion排程器上執行,可以指定執行執行緒。
注意:如果源Observable發射最後一個數據後,在debounce規定的時間間隔內呼叫了onCompleted,那麼通過debounce操作符就把這個結果提交給訂閱者 throttleWithTimeOut使用也是呼叫了debounce操作符來實現
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if (subscriber.isUnsubscribed()) return;//如果沒有訂閱者就直接返回try { //發射資料的時間間隔:100~900毫秒, for (int i = 0; i < 10; i++) { subscriber.onNext(i); Thread.sleep輸出結果:(i * 100); } subscriber.onCompleted(); } catch (InterruptedException e) { subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()) .debounce(400, TimeUnit.MILLISECONDS)//超時時間為400毫秒,預期結果:時間間隔在400毫秒以上的資料都會提交給訂閱者,其他的不會。.subscribe(new Action1<Integer>() { @Overridepublic void call(Integer integer) { System.out.println("debounce onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });12-19 15:22:26.910 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:4 所線上程:RxComputationScheduler-1
12-19 15:22:27.310 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:5 所線上程:RxComputationScheduler-1
12-19 15:22:27.811 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:6 所線上程:RxComputationScheduler-1
12-19 15:22:28.411 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:7 所線上程:RxComputationScheduler-1
12-19 15:22:29.111 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:8 所線上程:RxComputationScheduler-1
12-19 15:22:29.911 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:9 所線上程:RxComputationScheduler-12.distinct()操作符:
過濾掉重複的資料項。過濾規則是隻允許沒有發射過的資料項通過。
變體distinct(Func1)根據返回的key值去過濾,不用資料本身. distinctUntilChanged()只判斷這個資料項跟前一個數據項是否相同,distinctUnitilChanged(Func1)也是根據返回的key值去比較過濾。 預設不在任何特定的排程器上執行。
Observable.just(1,2,2,3,4,4,5).distinct().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("distinct() onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:distinct() onNext:1 所線上程:main
distinct() onNext:2 所線上程:main
distinct() onNext:3 所線上程:main
distinct() onNext:4 所線上程:main
distinct() onNext:5 所線上程:mainObservable.just(1,2,2,3,4,4,5).distinct(new Func1<Integer, String>() { @Override public String call(Integer integer) { return 3 < integer ? "第一組" :"第二組";//這裡返回key值,小於3的key值是第一組,也就是說1和2的key值都是第一組,只會將1提交給訂閱者,2的key值與1相同就直接被過濾掉了,這個變體是根據key值進行過濾的 } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("distinct(Func1) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:distinct(Func1) onNext:1 所線上程:main
distinct(Func1) onNext:4 所線上程:mainObservable.just(1,2,3,2,4,5,4).distinctUntilChanged().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("distinctUntilChanged() onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:distinctUntilChanged() onNext:1 所線上程:main
distinctUntilChanged() onNext:2 所線上程:main
distinctUntilChanged() onNext:3 所線上程:main
distinctUntilChanged() onNext:2 所線上程:main
distinctUntilChanged() onNext:4 所線上程:main
distinctUntilChanged() onNext:5 所線上程:main
distinctUntilChanged() onNext:4 所線上程:main3.elementAt(index)
將指定索引的資料項提交給訂閱者,索引是從0開始,當沒有這個索引或者索引為負數會拋異常。
elementAtOrDefault(index,default):這個會設定一個預設值,當沒有指定的索引就提交預設值給訂閱者,為負數就拋異常。
Observable.just(1,2,3,4).elementAt(3).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("elementAt onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });Observable.just(1,2,3,4).elementAt(3).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("elementAt onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:elementAt onNext:4 所線上程:mainObservable.just(1,2,3,4).elementAtOrDefault(6,6).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("elementAtOrDefault onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:elementAtOrDefault onNext:6 所線上程:main4. filter操作符對源Observable發射的資料項按照指定的條件進行過濾,滿足的條件的才會調給訂閱者。預設不在任何特定的排程器上執行
Observable.just(1,2,3,4,5).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 3; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("filter(Func1) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:filter(Func1) onNext:4 所線上程:main
filter(Func1) onNext:5 所線上程:main5.first()操作符提交源Observable發射的第一項資料,如果只是想要一個過濾符,最好使用take(2)或者elementAt(0)
first(Func1)操作符是提交第一項符合自定義條件的資料 firstOrDefault(T)操作符是在Observable沒有發射任何資料時提交一個指定的預設值 takeFirst(Func1)操作符提交符合自定義條件的的第一項資料, 與first(Func1)不同的是,takeFirst(Func1)在沒有符合條件的時候,會呼叫onCompleted,而first(Func1)會拋一個NoSuchElementException的異常
Observable.just(1,2,3).first().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("first() onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:first() onNext:1 所線上程:main
Observable.just(1,2,3).first(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return 1 < integer; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("first(Func1) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:first(Func1) onNext:2 所線上程:main
6.ignoreElements()操作符不提交任何資料給訂閱者,只提交終止通知(onError或者onCompeleted)給訂閱者,預設不在任何特定的排程器上執行
7.last()操作符與first()操作符相反,只提交最後一個數據項給訂閱者,如果只是作為過濾操作符,最好使用takeLast(1),
官方文件解釋說:first()操作符和last()操作符在某些實現中會返回一個阻塞函式。 與first()操作符系列對應,也有last(Func1)、lastOrDefault(T)、lastOrDefault(T,Func1)
8.ofType操作符類似於filter操作符,區別在於ofType按照資料項的型別進行過濾,預設不在任何特定的排程器上執行
Observable.just(1,"String型別",true).ofType(String.class).subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("ofType(class) onNext:" + s + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:ofType(class) onNext:String型別 所線上程:main
9.sample操作符對Observable發射的資料定時進行取樣,採的是自從上一次取樣以來,Observable最近發射的一項資料,也就是這段時間間隔中最後一個數據項。如果自上一次取樣以來,源Observable沒有發射任何資料,sample操作符返回的Observable在此段時間也不會發射任何資料
預設在computation排程器上執行,但是可以指定它執行的執行緒 sample(long,TimeUnit,Scheduler) 與之對應的操作符是throttleFirst,它取樣的是取樣時間間隔中第一項資料,在最後一個時間段會發射最後一個數據項,看下面例子 * △time = 2.2s * Data : 1------2------3------4------5------6------7 * Time(s) : 0------1------2------3------4------5------6------7................ * △time點: 0------1------2-。----3------4-。---5------6----。-7................ * SampleResults: 3,5,7 * ThrottleFirst: 1,4,7
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if (subscriber.isUnsubscribed()) return;//如果沒有訂閱者直接返回 try { //前3個數字的時間間隔設定1秒,最後一個設定2秒 for (int i = 1; i <8 ; i++) { subscriber.onNext(i); Thread.sleep(1000); } subscriber.onCompleted(); } catch (InterruptedException e) { subscriber.onError(e); } } }).sample(2200, TimeUnit.MILLISECONDS)//取樣間隔時間為2200毫秒 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("sample onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:sample onNext:3 所線上程:RxComputationScheduler-1
sample onNext:5 所線上程:RxComputationScheduler-1
sample onNext:7 所線上程:RxComputationScheduler-1
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; try { for (int i = 1; i < 9 ; i++) { subscriber.onNext(i); Thread.sleep(1000); } subscriber.onCompleted(); } catch (InterruptedException e) { subscriber.onError(e); } } }).throttleFirst(2200,TimeUnit.MILLISECONDS) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("throttleFirst onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:throttleFirst onNext:1 所線上程:main
throttleFirst onNext:4 所線上程:main
throttleFirst onNext:7 所線上程:main10.single()操作符:在源Observable只發射一個數據項的時候,single()操作符會將這個資料提交給訂閱者,大於1個就拋Sequence contains too many elements的異常,不是正好是一個數據項就會拋異常
single(Func1)操作符是對源Observable發射的資料項進行判斷,如果符合條件的資料數量大於1就會拋異常。不是正好是一個數據項就會拋異常 也有設定預設值得api,預設不在任何特定的排程器上執行 11.skip操作符skip(count) 對於源Observable發射的資料項,跳過前count項,將後面的資料項提交給訂閱者 skip(long,TimeUnit)對於原Observalbe發射的資料項,跳過long前的資料項,將之後的資料提交給訂閱者,可以指定執行執行緒
Observable.just(1,2,3,4,5,6).skip(3).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("skip(count) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:skip(count) onNext:4 所線上程:main
skip(count) onNext:5 所線上程:main
skip(count) onNext:6 所線上程:mainObservable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; try { //每隔一秒發射一項資料 for (int i = 1; i < 5 ; i++) { subscriber.onNext(i); Thread.sleep(1000); } subscriber.onCompleted(); } catch (InterruptedException e) { subscriber.onError(e); } } }).skip(2, TimeUnit.SECONDS, Schedulers.newThread()).subscribe(new Action1<Integer>() {//發射2秒之後的資料項 @Override public void call(Integer integer) { System.out.println("skip(long,TimeUnit) onNext:" + integer + " 所線上程:" + Thread.currentThread().getName()); } });輸出結果:skip(long,TimeUnit) onNext:3 所線上程:main
skip(long,TimeUnit) onNext:4 所線上程:main12.skipLast操作符
skipLast(count) 對於源Observable發射的資料項,省略最後count項,將前面的資料項提交給訂閱者 skipLast(long,TimeUnit)對於原Observalbe發射的資料項,省略最後long時間段的資料項,將之前的資料提交給訂閱者,可以指定執行執行緒
13.take操作符
take(count)操作符對於源Observable發射的資料項,提取前面的count項資料提交給訂閱者,忽略後面的 take(long,TimeUnit)操作符對於源Obsrvable發射的資料項,提取前面long時間段裡的資料項提交給訂閱者,忽略後面的,可以指定執行緒
14.takeLast操作符
takeLast(count)操作符對於源Observable發射的資料項,提取前面的count項資料提交給訂閱者,忽略後面的 takeLast(long,TimeUnit)操作符對於源Obsrvable發射的資料項,提取前面long時間段裡的資料項提交給訂閱者,忽略後面的,可以指定執行緒
15.takeLastBuffer操作符
takeLastBuffer(count)操作符與takeLast(count)操作符類似,唯一不同就是takeLastBuffer(count)將最後的那些資料項收集到一個list集合中,提交這個集合給訂閱者 takeLastBuffer(long,TimeUnit)操作符與takeLast(long,TimeUnit)操作符類似,唯一不同就是將在最後時間段Long中資料項收集到一個list集合中, 將這個集合提交給了訂閱者,可以指定執行的執行緒。 更多詳細內容和例子,可以檢視原始碼