Android Rxjava2升級(踩坑)筆記
Rxjava2 升級(踩坑)筆記
前言
最近接觸到別人程式碼的時候看到他們RxJava寫的和我的不一樣。Single、Completable、Disposable 什麼不知道,CompositeDisposable這又是什麼鬼??doAfterTerminate()這個方法好可以再事件結束的時候調,但是我的程式碼裡沒這個方法!最終發現,Rxjava升級了。本文也只對新出現的變化做記錄,沒有rxjava基礎的請自行學習。
感嘆完別人這麼用真是方便的同時,也開始著手做自己的庫Rxjava的升級工作。當然遇到不少問題。
正題
遇到的問題
DuplicateFileException: Duplicate files copied in APK META-INF/rxjava.properties
原因:Rxjava版本不統一,導致META-INF/rxjava.properties 這個合併衝突。不用去試圖統一他(我至少沒有成功)因為Retrofit、Rxjava、RxjavaAdapter 總有哪個裡面的版本不一致。
解決辦法:
在 app/build.gradle 中新增下面這段指令,意思是忽略這個打包衝突
packagingOptions {
exclude 'META-INF/rxjava.properties'
}
包名變了,類丟失
升級之後你會發現,Rxjava整個包名都TM變了!!對你沒看錯,同樣的庫,包名變了!而且不僅這樣,直接刪掉了Subscribe這些類,把他們改成的介面。所以你很多類要重新引用。並且可能有些方法會失效了。
所以老專案輕易不要升級!!!
compile "io.reactivex.rxjava2:rxandroid:2.0.1"
compile "io.reactivex.rxjava2:rxjava:2.1.0"
compile "com.trello:rxlifecycle:0.3.1"
compile "com.trello:rxlifecycle-components:0.3.1"
compile "com.jakewharton.rxbinding:rxbinding:0.4.0"
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
compile "com.squareup.retrofit2:retrofit:2.2.0"
compile "com.squareup.retrofit2:converter-gson:2.2.0"
這是我專案裡面目前用到的Rx庫。注意一點是RxJava升級之後Retrofit 最好也要同步升下級。
注意以前的引用是這個:io.reactivex:rxjava:2.0.1
這個是新的:io.reactivex.rxjava2:rxjava:2.1.0
學習筆記
升級之後看到一些方法一臉懵逼,開始上網找教程。推薦一篇大家可以看下,比較淺顯易懂。
https://www.jianshu.com/p/464fa025229e,一共有九篇,不過似乎沒出全,不過作者的思路簡單易懂,適合上手。
一口氣擼完了九篇,簡單實踐 and 記錄一下~
Rxjava基礎
建立三部曲
private void demo01() {
//上游被觀察者---->發射資料
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1111");
e.onNext("2222");
e.onNext("3333");
e.onComplete();
}
});
//下游觀察者---->接收資料
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("picher","onSubscribe");
}
@Override
public void onNext(String s) {
Log.d("picher","onNext:"+s);
}
@Override
public void onError(Throwable e) {
Log.d("picher","onError");
}
@Override
public void onComplete() {
Log.d("picher","onComplete");
}
};
//訂閱
observable.subscribe(observer);
}
Rxjava2 新出現的類
兩個新東西:ObservableEmitter
和 Disposable
ObservableEmitter
- 可以通過onNext() 傳送資料
- 呼叫onComplete() 或者 onError()將不在繼續傳送後續資料
- onComplete()、onError() 只能同時呼叫一個
Disposable
可以理解為控制器,dispose()之後 下游將不在繼續接收資料,但上游可以繼續傳送資料
Consumer
Rxjava1的版本 subscribe() 可以寫 new fun1() fun2()… 但是rxjava2裡面移除了這些,而用Consumer這個類代替,So!!!之前的如果你用掛了太多action或者fun,呵呵…改瘋掉
背壓問題 (MissBackgroundException)
其實這問題Rxjava1也有,只是那會沒有去了解是咋回事的,現在補上~
根本問題:下游處理能力不夠,上游資料被放入容器等待處理,如果無限增長則記憶體會溢位OOM
- 同一個執行緒不會有這個問題,因為上游傳送一下,下游處理一個
//同一執行緒沒問題
private void demo02() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i=0; ;i++){
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("picher",""+integer);
}
});
}
- 非同步任務的時候可能會出現背壓問題
//背壓問題
private void demo02() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i=0; ;i++){
e.onNext(i);
}
e.onComplete();
}
}).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("picher",""+integer);
}
});
}
背壓問題的處理
方案一:降低傳送事件的次數
我們知道背壓問題是上游發的太快了,下游來不及處理導致擠壓,所以我們可以減少傳送的次數。比如加上Filter
過濾掉一部分可以過濾的資料。比如我們加上只讓能被10整除的資料傳送。
//背壓問題的處理--降低傳送次數
private void demo03() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i=0;;i++){
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 10 == 0;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("picher",""+integer);
}
});
}
方案二:降低傳送速度,讓下游來得及去處理事件
我們在每次傳送事件之後睡2s,也可以解決問題。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i=0;;i++){
e.onNext(i);
Thread.sleep(2000);
}
}
})
方案三:使用Flowable
- 設定BackpressureStrategy 策略
- Subscription.request()
也就是RxJava2幫你處理了速度和數量的問題,也順便幫你換了一個容器的容量。具體的去看下Blog吧,我不能保證有博主說的好。https://www.jianshu.com/p/a75ecf461e02
記錄一下如果以後需要寫上游傳送量比較大的程式碼,則使用下面的寫法。
Flowable.interval(1, TimeUnit.MICROSECONDS)
.onBackpressureDrop() //加上背壓策略
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
舉個生活中的例子:
麵包廠一小時生產100個麵包,麵包店一小時賣出去50個麵包,剩下的50個放到倉庫裡面,生產速度比消費速度大,當倉庫存滿的時候在存就爆倉了。
舉個實際專案的例子:
電商專案裡面需要對商品列表做一些比較等耗時處理,比如有200件待處理的商品資料,處理耗時5s,那麼當傳送到第127個商品事件的時候,如果處理演算法還未計算完,那麼就會出現問題。
總結
- Rxjava2 包名變了,導包的時候注意匯入’io.reactivex.’這個包裡面的,當然最好不要同時有兩個版本的程式碼
- 去掉了 fun,action等方法,
- 添加了ObservableEmitter用來發射資料,Disposable 可以終止任務
- 背壓問題可以使用Flowable解決