Rxjava 2學習筆記(一)
Rxjava 在這些年在android開發中非常的火爆,它和Retrofit 的結合堪稱完美,他們可以把我們從以前我們進行網路請求中各種執行緒切換,各種介面回掉中解放出來了。可以讓我們的邏輯變的非常清晰,便於程式碼維護。我們公司的專案目前使用的Rxjava 版本還是1.0版本的,準備打算升級到使用Rx2.0。所以打算學習一下Rxjava 2
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("hello world"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(String s) { Log.e(TAG, "onNext "+s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError"); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } }); 10-13 16:24:39.176 11578-11578/? E/huangjie: onSubscribe 10-13 16:24:39.176 11578-11578/? E/huangjie: onNext hello world 10-13 16:24:39.176 11578-11578/? E/huangjie: onComplete
上面程式碼中我們看到了兩個陌生的物件,ObservableEmitter和Disposable.
ObservableEmitter:
Emitter是發射器的意思,那就很好猜了,這個就是用來發出事件的,它可以發出三種類型的事件,通過呼叫emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發出next事件、complete事件和error事件
Disposable:
這個單詞的字面意思是一次性用品,用完即可丟棄的. 那麼在RxJava中怎麼去理解它呢, 可以想象成一條河流 當呼叫它的dispose()方法時, 它就會放下柵欄, 從而導致下游收不到事件.
在Rxjava 2 中,Observable不再支援訂閱Subscriber了,而是需要使用Observer作為觀察者。但是有一點是沒有變化的,不管是在Rxjava 1 還是在Rxjava 2中,被觀察者,觀察者,訂閱 三者缺一不可。只有在使用了subscribe()方法過後(就是被觀察者訂閱觀察者),被觀察者才會傳送資料。
在Rxjava 2中有五種觀察者模式

1.png
型別 | 描述 |
---|---|
Observale | 能夠發射0或者n 個數據,並以成功或者錯誤事件終止 |
Flowable | 能夠發射0或n個數據,並以成功或者錯誤事件終止,支援背壓,可以控制資料來源發射的速度 |
Single | 只能發射單個數據或錯誤事件 |
Completable | 從來不發射資料,只處理onComplete 和onError事件,可以看成Rxjava的Runnable |
Maybe | 能夠發射0或者n個數據,要麼成功,要麼失敗。有點類似於Optional |
do操作符
do操作符可以給Observable的生命週期的各個階段加上一系列的監聽,當Observable執行到各個階段的時,這些回掉就會觸發。
Observable.just("hello") .doOnNext(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, "doOnNext" + s); } }) .doAfterNext(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, "doAfterNext" + s); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doOnComplete"); } }) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.e(TAG, "doOnSubscribe"); } }) .doAfterTerminate(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doAfterTerminate"); } }) .doFinally(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doFinally"); } }) .doOnEach(new Consumer<Notification<String>>() { @Override public void accept(Notification<String> stringNotification) throws Exception { Log.e(TAG, "doOnEach" + (stringNotification.isOnNext() ? "onNext" : stringNotification.isOnComplete() ? "onComplete" : "onError")); } }) .doOnLifecycle(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.e(TAG, "doOnLifecycle"); } }, new Action() { @Override public void run() throws Exception { Log.e(TAG, "doOnLifecycle run"); } }) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.e(TAG, " onNext" + s); } @Override public void onError(Throwable e) { Log.e(TAG, " onError"); } @Override public void onComplete() { } }); E/huangjie: doOnSubscribe E/huangjie: doOnLifecycle E/huangjie: doOnNexthello E/huangjie: doOnEachonNext E/huangjie:onNexthello E/huangjie: doAfterNexthello E/huangjie: doOnComplete E/huangjie: doOnEachonComplete E/huangjie: doFinally E/huangjie: doAfterTerminate
根據列印的日誌我們看出Rxjava 內部資料的流向
操作符 | 用途 |
---|---|
doOnSubscribe | 一旦觀察者訂閱了Observable,它就會呼叫 |
doOnLifecycle | 可以在觀察者訂閱之後,設定是否取消訂閱 |
doOnNext | 它產生的Observable 每發射一項就會呼叫它一次,它的Consumer接受發射的資料項,一般用於在subscrible之前的資料進行處理 |
doOnEach | 它產生的Observable每發射一項資料就會呼叫它一次,不僅包括onNext,還包括onError和onCompleted |
doAfterNext | 在onNext之後執行,而doOnNext()是在onNext之前執行 |
doOnComplete | 當它產生的Observable在正常終止呼叫onComplete時會被呼叫 |
doFinally | 在當它產生的Observable終止之後會被呼叫,無論是正常終止,還是異常終止,doFinally 優於doAfterTerminate的呼叫 |
doAfterTerminate | 註冊一個Action,當Observable呼叫onComplete或onError時觸發 |
上面的程式碼中我們發現了Consumer 和Action,我可以認為其實Consumer 就是Rxjava 1.x 的Action1, Consumer是單一引數,Action是無引數型別。