Android 響應式程式設計 RxJava2 解析
使用了 RxJava2 有一段時間了,深深感受到了其“牛逼”之處。
RxJava2 基礎
RxJava可以濃縮為非同步兩個字,其核心的東西不外乎兩個, Observables(被觀察者) 和 Observable(觀察者)。Observables可以發出一系列的 事件(例如網路請求、複雜計算、資料庫操作、檔案讀取等),事件執行結束後交給Observable 的回撥處理。
RxJava2 的觀察者模式
觀察者模式是物件的行為模式,也叫做釋出-訂閱(Publish/Subscribe)模式、模型-檢視(Model/View)模式、源-監聽器(Source/Listener)模式或從屬者(Dependents)模式。
什麼是觀察者模式?舉個栗子,Android中View的點選監聽器的實現,View是被觀察者,OnClickListener物件是觀察者,Activity要如何知道View被點選了?那就是派一個OnClickListener物件,入駐View,與View達成一個訂閱關係,一旦View被點選了,就通過OnClickListener物件的OnClick方法傳達給Activity。採用觀察者模式可以避免去輪詢檢查,節約有限的cpu資源。
RxJava2 響應式程式設計結構
什麼是響應式程式設計?舉個栗子,a = b + c; 這句程式碼將b+c的值賦給a,而之後如果b和c的值改變了不會影響到a,然而,對於響應式程式設計,之後b和c的值的改變也動態影響著a,意味著a會隨著b和c的變化而變化。
響應式程式設計的組成為Observable/Operator/Subscriber,RxJava在響應式程式設計中的基本流程如下:
這個流程,可以簡單的理解為:Observable -> Operator1 -> Operator2 -> Operator3 -> Subscriber
1. Observable發出一系列事件,他是事件的產生者; 2. Subscriber負責處理事件,他是事件的消費者; 3. Operator是對Observable發出的事件進行修改和變換; 4. 若事件從產生到消費不需要其他處理,則可以省略掉中間的Operator,從而流程變為Obsevable -> Subscriber; 5. Subscriber通常在主執行緒執行,所以原則上不要去處理太多的事務,而這些複雜的處理則交給Operator;
建立一個完整的 RxJava2 呼叫
首先需要新增 RxJava2 在 Android 中的 Gradle 依賴:
compile 'com.squareup.retrofit2:retrofit:2.3.0' compile 'com.squareup.retrofit2:converter-gson:2.3.0' compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' compile "io.reactivex.rxjava2:rxjava:2.1.6" compile 'io.reactivex.rxjava2:rxandroid:2.0.1'RxJava2 可以通過下面這幾種方法建立被觀察者:
// 傳送對應的方法Observable.create(newObservableOnSubscribe<String>(){// 預設在主執行緒裡執行該方法@Overridepublicvoidsubscribe(@NonNullObservableEmitter<String> e)throwsException{
e.onNext("Hello");
e.onNext("World");// 結束標識
e.onComplete();}});// 傳送多個數據Observable.just("Hello","World");// 傳送陣列Observable.fromArray("Hello","World");// 傳送一個數據Observable.fromCallable(newCallable<String>(){@OverridepublicStringcall()throwsException{return"Hello";}});
RxJava2 支援鏈式程式設計,下來我們建立被觀察者,然後建立觀察者並訂閱:// 建立被觀察者Observable.just("Hello","World")// 將被觀察者切換到子執行緒.subscribeOn(Schedulers.io())// 將觀察者切換到主執行緒.observeOn(AndroidSchedulers.mainThread())// 建立觀察者並訂閱.subscribe(newObserver<String>(){// Disposable 相當於RxJava1.x中的 Subscription,用於解除訂閱privateDisposable disposable;@OverridepublicvoidonSubscribe(Disposable d){
disposable = d;}@OverridepublicvoidonNext(String s){Log.i("JAVA","被觀察者向觀察者傳送的資料:"+ s);if(s =="-1"){// "-1" 時為異常資料,解除訂閱
disposable.dispose();}}@OverridepublicvoidonError(Throwable e){}@OverridepublicvoidonComplete(){}});
一旦 Observer 訂閱了 Observable,Observable 就會呼叫 Observer 的 onNext()、onCompleted()、onError() 等方法。至此一個完整的 RxJava 呼叫就完成了。看一下輸出的Log:I/JAVA: 被觀察者向觀察者傳送的資料:Hello
I/JAVA: 被觀察者向觀察者傳送的資料:World
若喜歡簡潔、定製服務,那麼可以實現的方法跟上面的實現方法是對應起來的,大家看引數就知道哪個對應哪個了,你可以通過new Consumer(不需要實現的方法你可以不寫,看上去更簡潔),Consumer就是消費者的意思,可以理解為消費了 onNext 等事件:
Observable.just("Hello","World").subscribe(newConsumer<String>(){@Overridepublicvoidaccept(@NonNullString s)throwsException{Log.i("JAVA","被觀察者向觀察者傳送的資料:"+ s);}},newConsumer<Throwable>(){@Overridepublicvoidaccept(@NonNullThrowable throwable)throwsException{}},newAction(){@Overridepublicvoidrun()throwsException{}},newConsumer<Disposable>(){@Overridepublicvoidaccept(@NonNullDisposable disposable)throwsException{}});
RxJava2 的操作符
RxJava中提供了大量不同種類,不同場景的Operators(操作符),RxJava的強大性就來自於它所定義的操作符。主要分類:
RxJava 的操作符 | 說明 | 例如 |
---|---|---|
建立操作 | 用於建立Observable的操作符 | create、defer、from、just、start、repeat、range |
變換操作 | 用於對Observable發射的資料進行變換 | buffer、window、map、flatMap、groupBy、scan |
過濾操作 | 用於從Observable發射的資料中進行選擇 | debounce、distinct、filter、sample、skip、take |
組合操作 | 用於將多個Observable組合成一個單一的Observable | and、startwith、join、merge、switch、zip |
異常處理 | 用於從錯誤通知中恢復 | catch、retry |
輔助操作 | 用於處理Observable的操作符 | delay、do、observeOn、subscribeOn、subscribe |
條件和布林操作 | all、amb、contains、skipUntil、takeUntil | |
演算法和聚合操作 | average、concat、count、max、min、sum、reduce | |
非同步操作 | start、toAsync、startFuture、FromAction、FromCallable、runAsync | |
連線操作 | connect、publish、refcount、replay | |
轉換操作 | toFuture、toList、toIterable、toMap、toMultiMap | |
阻塞操作 | forEach、first、last、mostRecent、next、single | |
字串操作 | byLine、decode、encode、from、join、split、stringConcat |
其中有一些高頻使用的操作符如下:
常用操作符 | 說明 |
---|---|
interval | 延時幾秒,每隔幾秒開始執行 |
take | 超過多少秒停止執行 |
map | 型別轉換 |
observeOn | 在主執行緒執行 |
doOnSubscribe | 在執行的過程中 |
subscribe | 訂閱 |
RxJava2 執行緒排程器
排程器 Scheduler 用於控制操作符和被觀察者事件所執行的執行緒,不同的排程器對應不同的執行緒。RxJava提供了5種排程器:
RxJava 執行緒排程器 | 說明 |
---|---|
Schedulers.immediate() | 預設執行緒,允許立即在當前執行緒執行所指定的工作。 |
Schedulers.newThread() | 新建執行緒,總是啟用新執行緒,並在新執行緒執行操作。 |
Schedulers.io() | 適用於I/O操作,根據需要增長或縮減來自適應的執行緒池。多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。 |
Schedulers.computation() | 適用於計算工作(CPU 密集型計算),即不會被 I/O 等操作限制性能的操作。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。 |
Schedulers.trampoline() | 當我們想在當前執行緒執行一個任務時,並不是立即,我們可以用.trampoline()將它入隊。這個排程器將會處理它的佇列並且按序執行佇列中每一個任務。 |
AndroidSchedulers.mainThread() | RxAndroid 提供的,它指定的操作將在 Android 主執行緒執行。 |
可以使用 subscribeOn() 和 ObserveOn() 操作符進行執行緒排程,讓 Observable 在一個特定的排程器上執行。subscribeOn() 指定 subscribe() 所發生的執行緒,事件產生的執行緒。ObserveOn() 指定 Observer 所執行在的執行緒,事件消費的執行緒。
RxJava2 模擬傳送驗證碼倒計時功能
publicvoidonCodeClick(){finallong count =60;// 設定60秒Observable.interval(0,1,TimeUnit.SECONDS).take(count +1).map(newFunction<Long,Long>(){@OverridepublicLongapply(@NonNullLong aLong)throwsException{return