RxJava2.0學習筆記(簡介,執行緒控制,常見操作符)
文章轉載自:大神的簡書
要在android中使用RxJava2,先新增Gradle配置:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
RxJava簡介:
先假設有兩根水管:
產生事件的水管稱作上游,即RxJava中的Observable,接受事件的水管稱作下游,即RxJava中的Observer,之間的連線關係就是subscribe()。這個關係用RxJava來表示就是:
//建立一個上游 Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//建立一個下游 Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error" );
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立連線
observable.subscribe(observer);
注意:只有當上下游建立連線,也就是subscribe()方法執行的時候才開始傳送事件。
此外,把這段程式碼連起來就是RxJava中的鏈式操作了。即省去了不必要的物件建立。
ObservableEmitter顧名思義為發射器,用來發出三種類型的事件,通過呼叫其物件的onNext(),onError(),onComplete()三個方法來發出Next,Error,Complete三個事件,需要注意以下規則:
- 上游可以傳送無限個onNext,下游也可以無限接受onNext
- 上游傳送了一個onError和OnComplete事件之後,可以繼續傳送剩下的事件,而下游接收到這兩個事件之一,就會停止接收事件。
- 上游可以不傳送onError和OnComplete事件
- 注意:onError和OnComplete事件的傳送必須互斥並且唯一,不能傳送多個,也不能同時傳送兩種事件
Disposable字面意思是一次性用品,可以看作是管道之間的開關,當呼叫其dispose()方法後,會從管道中間切開,導致下游無法接收到事件。注意:上游不會因為呼叫該方法停止傳送事件。
來看個例子, 我們讓上游依次傳送1,2,3,complete,4,在下游收到第二個事件之後, 切斷水管, 看看執行結果:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "emit 4");
emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
private int i;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
i++;
if (i == 2) {
Log.d(TAG, "dispose");
mDisposable.dispose();
Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
執行結果為:
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4
可以看到傳送第二個事件以後水管被切斷了,但是上游並沒有因為傳送了OnComplete事件而停止傳送,並且下游的onSubscribe()方法是最先呼叫的。
subscribe()方法還有多個過載方法:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
- 不帶任何引數的表示無論上游傳送什麼下游不管,隨意發。
- 帶有一個Consumer型別的引數的表示下游只關心上游傳送的onNext事件,其他事件不管。
也就可以這樣寫:
subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
RxJava執行緒控制
通常情況下上下游是在同一執行緒內工作的,比如在主執行緒中建立一個Observable傳送事件,則上游在主執行緒中工作,同樣在主執行緒中建立的Observer也在主執行緒中接收事件。看下面的例子:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribe(consumer);
}
結果為:
D/TAG: Observable thread is : main
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1
一般我們是在子執行緒中執行耗時操作,在主執行緒中更新UI,如下圖:
黃色管子代表子執行緒,藍色代表主執行緒,要想做到兩個事件在不同的執行緒中執行,需要用到RxJava內建的執行緒排程器。
在剛才的例子中,將最後的subscribe()方法修改一下:
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
執行結果:
D/TAG: Observable thread is : RxNewThreadScheduler-2
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1
可以看到執行緒改變了,其中RxNewThread是RxJava內建的一種執行緒,subscribeOn()指定了上游所在的執行緒,observeOn()指定了下游所在的執行緒。
注意:對上游多次呼叫subscribeOn()指定執行緒,只有第一次指定起作用,而對於下游多次呼叫observeOn()指定執行緒,每次都會改變下游所在的執行緒。
在RxJava中,內建了很多執行緒供我們選擇,並且這些執行緒由執行緒池來維護的,所以效率比較高:
- Schedulers.io()代表IO操作的執行緒,通常用於網路、讀寫檔案等IO密集的操作
- Schedulers.computation()代表CPU計算密集的執行緒,例如有大量計算得操作
- Schedulers.newThread代表常規的執行緒
- AndroidSchedulers.mainThread代表Android主執行緒
操作符
map
map的作用是對於上游傳送的每個事件都應用於一個函式,使得每個事件都按照相應的函式去變化。如圖:
圖中的Map函式將上游的圓形變換為了下游的正方形,程式碼表示為:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
執行結果為:
D/TAG: This is result 1
D/TAG: This is result 2
D/TAG: This is result 3
上游的整型數字經過map操作後在下游變成了String型別。
可以看出發來的事件都可以轉換為想要的型別,Object,集合等等。
FlatMap
FlatMap作用是將上游傳送事件的一個Observable變換為多個傳送事件的Observables,再講它們傳送的事件合併到一個Observable中傳送。如圖:
分解動作:
圖中的FlatMap將圓形轉換為三角形和正方形,並且對於每個顏色的事件都建立了一個管道,在管道中進行變換,再將轉換過的合併傳送到下游。
注意:FlatMap並不能保證事件傳送的順序,如果想要保證順序需要使用concatMap。
程式碼demo如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
程式碼中將上游傳過來的3個數字每個迴圈三遍新增到新建立的String管道,並且加了10ms延遲,所以輸出結果可以看出不是順序的。
將上面程式碼中的flatMap改為concatMap可以發現,結果是順序的了,接收到的事件和傳送的事件順序一致。
zip
zip函式的作用是將多個Observable傳送的事件組合到一起,然後將組合的事件作為整體傳送到下游,它嚴格按照發送的順序組合,並且傳送的事件個數只與多個Observable中資料較少的那個相同。如圖:
通過分解動作可以看出:
- 組合嚴格按照各個Observable傳送的事件順序,對應事件組合,不會出現交叉組合。
- 最終傳送的組合事件個數只與Observables中資料最少的相同,因為沒有事件能夠取出來,就無法組合了。
demo如下:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
執行結果:
D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: emit 2
D/TAG: emit 3
D/TAG: emit 4
D/TAG: emit complete1
D/TAG: emit A
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
可以看出組合傳送了,但是第一個Observable傳送完以後才開始組合,這是因為兩個Observable在同一個執行緒中,執行緒的執行肯定有先後順序,採用上面的subscribeOn()方法將兩個Observable放到不同的執行緒中執行,這樣就可以實現一一結合了,結果如下:
D/TAG: onSubscribe
D/TAG: emit A
D/TAG: emit 1
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: emit 2
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: emit 3
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
可以發現Observable1中還有兩個事件沒有傳送,因為已經沒有組合物件了,傳送也沒有意義,前面是因為在一個執行緒中,所以會一次性發送完,在不同執行緒中,即使不傳送complete,那兩個事件也不會發送。
zip實踐
一個介面需要展示使用者的一些資訊,需要從兩個伺服器中抓取資料,要用到zip操作符來組合抓取的資訊。
首先分別定義兩個請求介面:
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
然後用zip來打包請求:
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});