1. 程式人生 > >RxJava----操作符:輔助操作符

RxJava----操作符:輔助操作符

Observable Utility Operators(輔助操作符)

delay

顧名思義,Delay操作符就是讓發射資料的時機延後一段時間,這樣所有的資料都會依次延後一段時間發射。
這裡寫圖片描述

        log("start subscrib:" + System.currentTimeMillis()/1000);
        Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {
            @Override
            public
void call(Subscriber<? super Long> subscriber) { for (int i = 1; i <= 2; i++) { Long currentTime=System.currentTimeMillis()/1000; log("subscrib:" + currentTime); subscriber.onNext(currentTime); try { Thread.sleep(1000
); } catch (InterruptedException e) { e.printStackTrace(); } } } }).subscribeOn(Schedulers.newThread()); observable.delay(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() { @Override
public void call(Long aLong) { log("delay:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong)); } });

結果:

start subscrib:1462519228
subscrib:1462519228
subscrib:1462519229
delay:1462519230---2
delay:1462519231---2

delaySubscription

不同之處在於Delay是延時資料的發射,而DelaySubscription是延時註冊Subscriber。
dealy是延遲發射,delaySubscription則是延遲收到。
這裡寫圖片描述

        log("start subscrib:" + System.currentTimeMillis()/1000);
        Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {
            @Override
            public void call(Subscriber<? super Long> subscriber) {
                for (int i = 1; i <= 2; i++) {
                    Long currentTime=System.currentTimeMillis()/1000;
                    log("subscrib:" + currentTime);
                    subscriber.onNext(currentTime);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).subscribeOn(Schedulers.newThread());
        observable.delaySubscription(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                log("delaySubscription:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));
            }
        });

結果:

start subscrib:1462519279
subscrib:1462519281
delaySubscription:1462519281---0
subscrib:1462519282
delaySubscription:1462519282---0

do

do操作符就是給Observable的生命週期的各個階段加上一系列的回撥監聽,當Observable執行到這個階段的時候,這些回撥就會被觸發。在Rxjava實現了很多的doxxx操作符。

doOnEach

doOnEach可以給Observable加上這樣的樣一個回撥:Observable每發射一個數據的時候就會觸發這個回撥,不僅包括onNext還包括onError和onCompleted。
這裡寫圖片描述

        Observable observable=Observable.just(1,2,3);
        observable.doOnEach(new Action1<Notification>() {
            @Override
            public void call(Notification notification) {
                log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());
            }
        }).subscribe(new Action1() {
            @Override
            public void call(Object o) {
                log(o.toString());
            }
        });
        Subject<Integer, Integer> values = ReplaySubject.create();
        values.doOnEach(new Action1<Notification<? super Integer>>() {
            @Override
            public void call(Notification<? super Integer> notification) {
                log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());
            }
        }).subscribe(new Action1() {
            @Override
            public void call(Object o) {
                log(o.toString());
            }
        });
        values.onNext(4);
        values.onNext(5);
        values.onNext(6);
        values.onError(new Exception("Oops"));

結果:

doOnEach send 1 type:OnNext
1
doOnEach send 2 type:OnNext
2
doOnEach send 3 type:OnNext
3
doOnEach send null type:OnCompleted

doOnEach send 4 type:OnNext
4
doOnEach send 5 type:OnNext
5
doOnEach send 6 type:OnNext
6
doOnEach send null type:OnError

doOnNext

doOnNext則只有onNext的時候才會被觸發。
這裡寫圖片描述

        Subject<Integer, Integer> values = ReplaySubject.create();
        values.doOnNext(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                log("doOnNext send :"+integer.toString());
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                log(integer.toString());
            }
        });
        values.onNext(4);
        values.onError(new Exception("Oops"));

結果:

doOnNext send :4
4

doOnSubscribe

doOnSubscribe會在Subscriber進行訂閱的時候觸發回撥。
這裡寫圖片描述

        Observable observable=Observable.just(1,2);
        observable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                log("first:"+o.toString());
            }
        });
        observable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                log("second:"+o.toString());
            }
        });

結果:

I'm be subscribed!
first:1
first:2
I'm be subscribed!
second:1
second:2

doOnUnSubscribe

doOnUnSubscribe則會在Subscriber進行反訂閱的時候觸發回撥。
當一個Observable通過OnError或者OnCompleted結束的時候,會反訂閱所有的Subscriber。
這裡寫圖片描述

Observable observable = Observable.just(1, 2).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                log("I'm be unSubscribed!");
            }
        });
        Subscription subscribe1 = observable.subscribe();
        Subscription subscribe2 = observable.subscribe();
        subscribe1.unsubscribe();
        subscribe2.unsubscribe();

結果:

I'm be unSubscribed!
I'm be unSubscribed!

doOnError

doOnError會在OnError發生的時候觸發回撥,並將Throwable物件作為引數傳進回撥函式裡;
這裡寫圖片描述

         try {
            Observable observable = Observable.error(new Throwable("呵呵噠")).doOnError(new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    log(throwable.getMessage().toString());
                }
            });
            observable.subscribe();
        }catch (Exception e){
            log("catch the exception");
        }

結果:

呵呵噠
catch the exception

doOnComplete

doOnComplete會在OnCompleted發生的時候觸發回撥。
這裡寫圖片描述

        Observable observable = Observable.empty().doOnCompleted(new Action0() {
            @Override
            public void call() {
                log("Complete!");
            }
        });
        observable.subscribe();

結果:

Complete!

doOnTerminate

DoOnTerminate會在Observable結束前觸發回撥,無論是正常還是異常終止;
這裡寫圖片描述

        Subject<Integer, Integer> values = ReplaySubject.create();
        values.doOnTerminate(new Action0() {
            @Override
            public void call() {
                log("order to terminate");
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                log(integer.toString());
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                log(throwable.getMessage().toString());
            }
        });
        values.onNext(4);
        values.onError(new Exception("Oops"));

結果:

4
order to terminate
Oops

finallyDo

finallyDo會在Observable結束後觸發回撥,無論是正常還是異常終止。
這裡寫圖片描述

        Observable observable = Observable.empty().finallyDo(new Action0() {
            @Override
            public void call() {
                log("already terminate");
            }
        });
        observable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
            }
        }, new Action0() {
            @Override
            public void call() {
                log("Complete!");
            }
        });

結果:

Complete!
already terminate

materialize

materialize操作符將OnNext/OnError/OnComplete都轉化為一個Notification物件並按照原來的順序發射出來。

public final Observable<Notification<T>> materialize()

這裡寫圖片描述
元資料中包含了源 Observable 所發射的動作,是呼叫 onNext 還是 onComplete。注意上圖中,源 Observable 結束的時候, materialize 還會發射一個 onComplete 資料,然後才發射一個結束事件。

        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
        values.take(3)
                .materialize()
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        log(o.toString());
                    }
                });

結果:

meterialize:0--type:OnNext
meterialize:1--type:OnNext
meterialize:2--type:OnNext
meterialize:null--type:OnCompleted

Notification 類包含了一些判斷每個資料發射型別的方法,如果出錯了還可以獲取錯誤資訊 Throwable 物件。

dematerialize

deMeterialize則是與materialize 執行相反的過程。

        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
        values.take(3)
                .materialize()
                .dematerialize()
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        log(o.toString());
                    }
                });

結果:

0
1
2

注意:在呼叫dematerialize()之前必須先呼叫materialize(),否則會報錯。

serialize

強制Observable按次序發射資料並且功能是有效的

如果你無法確保自定義的操作符符合 Rx 的約定,例如從多個源非同步獲取資料,則可以使用 serialize 操作函式。 serialize 可以把一個不符合約定的 Observable 轉換為一個符合約定的 Observable。
這裡寫圖片描述
下面建立一個不符合約定的 Observable,並且訂閱到該 Observable上:

        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onCompleted();
                subscriber.onNext(3);
                subscriber.onCompleted();
            }
        });

        observable.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                log("Unsubscribed");
            }
        }) .subscribe(
                new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        log("Complete!");
                    }
                });

結果:

1
2
Complete!
Unsubscribed

先不管上面的 Observable 發射的資料,訂閱結束的情況看起來符合 Rx 約定。 這是由於 subscribe 認為當前資料流結束的時候會主動結束這個 Subscription。但實際使用中我們可能並不想直接結束這個Subscription。還有一個函式為 unsafeSubscribe ,該函式不會自動取消訂閱。

        observable.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                log("Unsubscribed");
            }
        })
                .unsafeSubscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(Integer integer) {
                    }
                });

結果:

1
2
Complete!
3
Complete!

上面的示例最後就沒有列印 Unsubscribed 字串。
unsafeSubscribe 也不能很好的處理錯誤情況。所以該函式幾乎沒用。在文件中說:該函式應該僅僅在自定義操作函式中處理巢狀訂閱的情況。 為了避免這種操作函式接受到不合法的資料流,我們可以在其上應用 serialize 操作函式:

        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onCompleted();
                subscriber.onNext(3);
                subscriber.onCompleted();
            }
        })
                .cast(Integer.class)
                .serialize();                    
        observable.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                log("Unsubscribed");
            }
        })
                .unsafeSubscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(Integer integer) {
                    }
                });

結果:

1
2
Complete!

儘管上面的程式碼中沒有呼叫unsubscribe, 但是資料流事件依然符合約定。最後也收到了完成事件。

timeout

新增超時機制,如果過了指定的一段時間沒有發射資料,就發射一個錯誤通知

  • 我們可以認為timeout()為一個Observable的限時的副本。
  • 如果在指定的時間間隔內Observable不發射值的話,它監聽的原始的Observable時就會觸發onError()函式。
    這裡寫圖片描述
        Observable<Long> values = Observable.interval(200, TimeUnit.MILLISECONDS);
        Subscription subscription = values
                .timeout(300,TimeUnit.MILLISECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Long aLong) {
                        log(aLong+"");
                    }
                });

結果:

0
1
2
...

Rxjava將Timeout實現為很多不同功能的操作符,比如說超時後用一個備用的Observable繼續發射資料等。
這裡寫圖片描述

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i <= 3; i++) {
                    try {
                        Thread.sleep(i * 100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6)).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                log(integer.toString());
            }
        });

結果:

0
1
2
5
6

timestamp

給Observable發射的每個資料項新增一個時間戳

timestamp 把資料轉換為 Timestamped 型別,裡面包含了原始的資料和一個原始資料是何時發射的時間戳。
這裡寫圖片描述

public final Observable<Timestamped<T>> timestamp()
        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
        values.take(3)
                .timestamp()
                .subscribe(new Action1<Timestamped>() {
                    @Override
                    public void call(Timestamped mTimestamped) {
                        log(mTimestamped.toString());
                    }
                });

結果:

Timestamped(timestampMillis = 1461758360570, value = 0)
Timestamped(timestampMillis = 1461758360670, value = 1)
Timestamped(timestampMillis = 1461758360771, value = 2)

從結果可以看到,上面的資料大概每隔100毫秒發射一個。

timeInterval

將一個Observable轉換為發射兩個資料之間所耗費時間的Observable

如果你想知道前一個數據和當前資料發射直接的時間間隔,則可以使用 timeInterval 函式。

這裡寫圖片描述

public final Observable<TimeInterval<T>> timeInterval()
         Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
        values.take(3)
                .timeInterval()
                .subscribe(new Action1<TimeInterval>() {
                    @Override
                    public void call(TimeInterval mTimeInterval) {
                        log(mTimeInterval.toString());
                    }
                });

結果:

TimeInterval [intervalInMilliseconds=101, value=0]
TimeInterval [intervalInMilliseconds=99, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]

using

建立一個只在Observable的生命週期記憶體在的一次性資源

Using操作符建立一個在Observable生命週期記憶體活的資源,也可以這樣理解:我們建立一個資源並使用它,用一個Observable來限制這個資源的使用時間,當這個Observable終止的時候,這個資源就會被銷燬。

public static final <T,Resource> Observable<T> using(
    Func0<Resource> resourceFactory,
    Func1<? super Resource,? extends Observable<? extends T>> observableFactory,
    Action1<? super Resource> disposeAction)

using 有三個引數,分別是:

  • 1.建立這個一次性資源的函式
  • 2.建立Observable的函式
  • 3.釋放資源的函式

當 Observable 被訂閱的時候,resourceFactory 用來獲取到需要的資源;observableFactory 用這個資源來發射資料;當 Observable 完成的時候,disposeAction 來釋放資源。
這裡寫圖片描述

        Observable observable = Observable.using(new Func0<Animal>() {
            @Override
            public Animal call() {
                return new Animal();
            }
        }, new Func1<Animal, Observable<?>>() {
            @Override
            public Observable<?> call(Animal animal) {
                return Observable.timer(3, TimeUnit.SECONDS);//三秒後發射一次就completed
//                return Observable.timer(4, 2, TimeUnit.SECONDS);//沒有completed,不停的發射資料
//                return Observable.range(1,3);//一次發射三個資料,馬上結束
//                return Observable.just(1,2,3);//一次發射三個資料,馬上結束
            }
        }, new Action1<Animal>() {
            @Override
            public void call(Animal animal) {
                animal.relase();
            }
        });
        Subscriber subscriber = new Subscriber() {
            @Override
            public void onCompleted() {
                log("subscriber---onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                log("subscriber---onError");
            }
            @Override
            public void onNext(Object o) {
                log("subscriber---onNext"+o.toString());//o是發射的次數統計,可以用timer(4, 2, TimeUnit.SECONDS)測試
            }
        };
        observable.count().subscribe(subscriber);

結果:

create animal
animal eat
animal eat
animal eat
subscriber---onNext1
subscriber---onCompleted
animal released