上車RxJava2(一)
RxJava 是一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫。
RxJava的觀察者模式
如果不知道觀察者設計模式的話,建議先傳送到這裡:
RxJava它有四個概念
- Observer(觀察者)
- Observable(被觀察者)
- subscribe(訂閱)
- 事件
以上的幾個概念和普通觀察者模式基本一樣,但是RxJava 的事件回撥方法除了普通事件 onNext() 之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
這兩個事件都是在事件結束的時候一種標記,並且必須有且只有一個,
onCompleted() 標記事件流的完成,onError() 標記事件的異常結束,終止其他事件的發出。
RxJava的基本使用
根據以上的概念,RxJava的基本使用如下:
1.建立觀察者 -Observer
觀察者會對被觀察者的事件觸發做出響應。比如被觀察者 發起onNext 事件就會回撥觀察者的 onNext 方法。但在發起onNext事件之前,被觀察者需要訂閱觀察者,這個時候就會回撥 onSubscribe 方法。而 onError 和 onComplete 方法分別是在被觀察中發起 onComplete 和 onError 事件時呼叫,關於這兩者的區別,剛剛提過。
public Observer<String> getObserver(){ return new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.d(TAG, "onNext: ->"+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: Complete"); } }; }
2.被觀察者 -Observable
public Observable<String> getObservable(){ return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("吃飯"); emitter.onNext("睡覺"); emitter.onNext("打豆豆"); emitter.onComplete(); } }); }
這個方法裡的內容也可以這麼寫
Observable<String> observable = Observable.just("吃飯","睡覺","打豆豆"); Observable<String> observable = Observable.fromArray("吃飯","睡覺","打豆豆");
3.訂閱-subscribe
建立好了 Observer 和 Observable,需要用訂閱關係也就是 subScribe方法 將它們建立連線,
Observable<String> observable = getObservable();//建立被觀察者 Observer<String> observer = getObserver(); //建立觀察者 observable.subscribe(observer);//訂閱
4.一步式寫法
如果感覺上面的寫法有點麻煩也可以用下面的方法,一步寫成
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("吃飯"); emitter.onNext("睡覺"); emitter.onNext("打豆豆"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.d(TAG, "onNext: ->"+s); sb.append(s); contentText.setText(sb+"->"); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { } });
在 RxJava 的預設規則中,事件的發出和消費都是在同一個執行緒的。如果不指定執行緒的話,在哪個執行緒呼叫 subscribe(),就在哪個執行緒生產事件。上面的例子它就是在主執行緒中,所以在 onNext 方法裡,我對UI進行了更新是沒有拋異常的。這樣它實現出來的只是一個同步的觀察者模式。
但但但,,,但是非同步對於 RxJava 是及其重要的
RxJava 的執行緒控制
RxJava的非同步,比如進行網路請求(在io執行緒中)後更新ui(主執行緒)操作,這就要對 RxJava 的執行緒進行控制。RxJava 的執行緒控制需要用到排程器-Scheduler
RxJava 內建了幾個排程器:
- Schedulers.immediate() :這是預設的 Scheduler。它直接在當前執行緒執行,不指定執行緒。
- Schedulers.newThread() : 總是啟用新執行緒,並在新執行緒執行操作。
- Schedulers.io() :I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。它和
- newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。
- Schedulers.computation() : 計算所使用的Scheduler。這個計算指的是CPU密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- AndroidSchedulers.mainThread( ),它指定的操作將在 Android 主執行緒執行,由於更新ui。
下面我們用一個栗子來學習排程器的用法:
進行一個網路請求(需要切換到io執行緒中),獲得返回的請求資料並更新UI(切斷到主執行緒)
新增以下依賴庫
implementation 'io.reactivex.rxjava2:rxjava:2.2.5' implementation 'com.squareup.retrofit2:retrofit:2.5.0' implementation 'io.reactivex:rxandroid:1.2.1' implementation 'com.squareup.retrofit2:converter-gson:2.5.0' implementation 'com.squareup.retrofit2:adapter-rxjava:2.5.0'
這裡的網路請求我們藉助Retrofit框架
關於Retrofit這裡就不介紹了,在後面筆者會關於它單獨寫一篇
定義一個介面
public interface Books { @GET("chaxunyuyue/") Call<List<BookInfo>> getBookInfo(@Query("username") String username); }
針對返回的資料寫一個javaBean
public class BookInfo { private String yuyue_address; private String yuyue_reason; private String yuyue_time; //....省略所以getter和setter方法 }
開始網路請求並更新ui
private void queryBookInfo(final String username){ String url = "http://smpark.chzu.edu.cn:8081/ipv6/"; Retrofit retrofit = new Retrofit.Builder() .baseUrl(url) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); final Books books = retrofit.create(Books.class); Observable.create(new ObservableOnSubscribe<BookInfo>() { @Override public void subscribe(ObservableEmitter<BookInfo> emitter) throws Exception { List<BookInfo> bookInfos = books.getBookInfo(username).execute().body(); Log.d(TAG, "subscribe: " + bookInfos.get(0).getYuyue_address()); BookInfo bookInfo = bookInfos.get(0); emitter.onNext(bookInfo); } }) .subscribeOn(Schedulers.io())// 1.指定 subscribe() 發生在 IO 執行緒 .subscribe(new Observer<BookInfo>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(BookInfo bookInfo) { // Log.d(TAG, "onNext: " + bookInfo.getYuyue_reason()); String bookAddress = bookInfo.getYuyue_address(); Log.d(TAG, "onNext: " + bookAddress); tvContent.setText("地址 :" + bookAddress); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); e.printStackTrace(); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); }
上面程式碼我用了一段式寫法,在建立被觀察者之後使用了排程器
.subscribeOn(Schedulers.io())
這樣就把subscribe() 排程到 IO 執行緒中執行,開始我以為排程的同時也會把回撥的結果 onNext() 等排程到IO執行緒,看別人的部落格都會加上這樣一句話
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主執行緒
但是發現我更新ui的時候並沒有拋異常,所以我覺得它沒有將回調結果放於io執行緒中也有可能它自動切換到主執行緒中了。
總結
RxJava 的本質可以理解為非同步這一個詞,這篇部落格筆者主要介紹了Rxjava的基本使用和執行緒控制,在下一篇部落格,筆者會介紹它的其他內容。