1. 程式人生 > >Android 響應式程式設計 RxJava2 解析

Android 響應式程式設計 RxJava2 解析

使用了 RxJava2 有一段時間了,深深感受到了其“牛逼”之處。

RxJava2 基礎

RxJava可以濃縮為非同步兩個字,其核心的東西不外乎兩個, Observables(被觀察者) 和 Observable(觀察者)。Observables可以發出一系列的 事件(例如網路請求、複雜計算、資料庫操作、檔案讀取等),事件執行結束後交給Observable 的回撥處理。

RxJava2 的觀察者模式

觀察者模式是物件的行為模式,也叫做釋出-訂閱(Publish/Subscribe)模式、模型-檢視(Model/View)模式、源-監聽器(Source/Listener)模式或從屬者(Dependents)模式。

什麼是觀察者模式?舉個栗子,Android中View的點選監聽器的實現,View是被觀察者,OnClickListener物件是觀察者,Activity要如何知道View被點選了?那就是派一個OnClickListener物件,入駐View,與View達成一個訂閱關係,一旦View被點選了,就通過OnClickListener物件的OnClick方法傳達給Activity。採用觀察者模式可以避免去輪詢檢查,節約有限的cpu資源。

RxJava2 響應式程式設計結構

什麼是響應式程式設計?舉個栗子,a = b + c; 這句程式碼將b+c的值賦給a,而之後如果b和c的值改變了不會影響到a,然而,對於響應式程式設計,之後b和c的值的改變也動態影響著a,意味著a會隨著b和c的變化而變化。

響應式程式設計的組成為Observable/Operator/Subscriber,RxJava在響應式程式設計中的基本流程如下:

這個流程,可以簡單的理解為:Observable -> Operator1 -> Operator2 -> Operator3 -> Subscriber

1. Observable發出一系列事件,他是事件的產生者;
2. Subscriber負責處理事件,他是事件的消費者;
3. Operator是對Observable發出的事件進行修改和變換;
4. 若事件從產生到消費不需要其他處理,則可以省略掉中間的Operator,從而流程變為Obsevable -> Subscriber;
5. Subscriber通常在主執行緒執行,所以原則上不要去處理太多的事務,而這些複雜的處理則交給Operator;

建立一個完整的 RxJava2 呼叫

首先需要新增 RxJava2 在 Android 中的 Gradle 依賴:

compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile "io.reactivex.rxjava2:rxjava:2.1.6"
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
RxJava2 可以通過下面這幾種方法建立被觀察者:
// 傳送對應的方法Observable.create(newObservableOnSubscribe<String>(){// 預設在主執行緒裡執行該方法@Overridepublicvoidsubscribe(@NonNullObservableEmitter<String> e)throwsException{
        e.onNext("Hello");
        e.onNext("World");// 結束標識
        e.onComplete();}});// 傳送多個數據Observable.just("Hello","World");// 傳送陣列Observable.fromArray("Hello","World");// 傳送一個數據Observable.fromCallable(newCallable<String>(){@OverridepublicStringcall()throwsException{return"Hello";}});
RxJava2 支援鏈式程式設計,下來我們建立被觀察者,然後建立觀察者並訂閱:
// 建立被觀察者Observable.just("Hello","World")// 將被觀察者切換到子執行緒.subscribeOn(Schedulers.io())// 將觀察者切換到主執行緒.observeOn(AndroidSchedulers.mainThread())// 建立觀察者並訂閱.subscribe(newObserver<String>(){// Disposable 相當於RxJava1.x中的 Subscription,用於解除訂閱privateDisposable disposable;@OverridepublicvoidonSubscribe(Disposable d){
        disposable = d;}@OverridepublicvoidonNext(String s){Log.i("JAVA","被觀察者向觀察者傳送的資料:"+ s);if(s =="-1"){// "-1" 時為異常資料,解除訂閱
            disposable.dispose();}}@OverridepublicvoidonError(Throwable e){}@OverridepublicvoidonComplete(){}});
一旦 Observer 訂閱了 Observable,Observable 就會呼叫 Observer 的 onNext()、onCompleted()、onError() 等方法。至此一個完整的 RxJava 呼叫就完成了。看一下輸出的Log:
I/JAVA: 被觀察者向觀察者傳送的資料:Hello
I/JAVA: 被觀察者向觀察者傳送的資料:World

若喜歡簡潔、定製服務,那麼可以實現的方法跟上面的實現方法是對應起來的,大家看引數就知道哪個對應哪個了,你可以通過new Consumer(不需要實現的方法你可以不寫,看上去更簡潔),Consumer就是消費者的意思,可以理解為消費了 onNext 等事件:

Observable.just("Hello","World").subscribe(newConsumer<String>(){@Overridepublicvoidaccept(@NonNullString s)throwsException{Log.i("JAVA","被觀察者向觀察者傳送的資料:"+ s);}},newConsumer<Throwable>(){@Overridepublicvoidaccept(@NonNullThrowable throwable)throwsException{}},newAction(){@Overridepublicvoidrun()throwsException{}},newConsumer<Disposable>(){@Overridepublicvoidaccept(@NonNullDisposable disposable)throwsException{}});

RxJava2 的操作符

RxJava中提供了大量不同種類,不同場景的Operators(操作符),RxJava的強大性就來自於它所定義的操作符。主要分類:

RxJava 的操作符 說明 例如
建立操作 用於建立Observable的操作符 create、defer、from、just、start、repeat、range
變換操作 用於對Observable發射的資料進行變換 buffer、window、map、flatMap、groupBy、scan
過濾操作 用於從Observable發射的資料中進行選擇 debounce、distinct、filter、sample、skip、take
組合操作 用於將多個Observable組合成一個單一的Observable and、startwith、join、merge、switch、zip
異常處理 用於從錯誤通知中恢復 catch、retry
輔助操作 用於處理Observable的操作符 delay、do、observeOn、subscribeOn、subscribe
條件和布林操作 all、amb、contains、skipUntil、takeUntil
演算法和聚合操作 average、concat、count、max、min、sum、reduce
非同步操作 start、toAsync、startFuture、FromAction、FromCallable、runAsync
連線操作 connect、publish、refcount、replay
轉換操作 toFuture、toList、toIterable、toMap、toMultiMap
阻塞操作 forEach、first、last、mostRecent、next、single
字串操作 byLine、decode、encode、from、join、split、stringConcat

其中有一些高頻使用的操作符如下:

常用操作符 說明
interval 延時幾秒,每隔幾秒開始執行
take 超過多少秒停止執行
map 型別轉換
observeOn 在主執行緒執行
doOnSubscribe 在執行的過程中
subscribe 訂閱

RxJava2 執行緒排程器

排程器 Scheduler 用於控制操作符和被觀察者事件所執行的執行緒,不同的排程器對應不同的執行緒。RxJava提供了5種排程器:

RxJava 執行緒排程器 說明
Schedulers.immediate() 預設執行緒,允許立即在當前執行緒執行所指定的工作。
Schedulers.newThread() 新建執行緒,總是啟用新執行緒,並在新執行緒執行操作。
Schedulers.io() 適用於I/O操作,根據需要增長或縮減來自適應的執行緒池。多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。
Schedulers.computation() 適用於計算工作(CPU 密集型計算),即不會被 I/O 等操作限制性能的操作。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
Schedulers.trampoline() 當我們想在當前執行緒執行一個任務時,並不是立即,我們可以用.trampoline()將它入隊。這個排程器將會處理它的佇列並且按序執行佇列中每一個任務。
AndroidSchedulers.mainThread() RxAndroid 提供的,它指定的操作將在 Android 主執行緒執行。

可以使用 subscribeOn() 和 ObserveOn() 操作符進行執行緒排程,讓 Observable 在一個特定的排程器上執行。subscribeOn() 指定 subscribe() 所發生的執行緒,事件產生的執行緒。ObserveOn() 指定 Observer 所執行在的執行緒,事件消費的執行緒。

RxJava2 模擬傳送驗證碼倒計時功能

publicvoidonCodeClick(){finallong count =60;// 設定60秒Observable.interval(0,1,TimeUnit.SECONDS).take(count +1).map(newFunction<Long,Long>(){@OverridepublicLongapply(@NonNullLong aLong)throwsException{return