1. 程式人生 > >Android Rxjava2升級(踩坑)筆記

Android Rxjava2升級(踩坑)筆記

Rxjava2 升級(踩坑)筆記

前言

最近接觸到別人程式碼的時候看到他們RxJava寫的和我的不一樣。Single、Completable、Disposable 什麼不知道,CompositeDisposable這又是什麼鬼??doAfterTerminate()這個方法好可以再事件結束的時候調,但是我的程式碼裡沒這個方法!最終發現,Rxjava升級了。本文也只對新出現的變化做記錄,沒有rxjava基礎的請自行學習。

感嘆完別人這麼用真是方便的同時,也開始著手做自己的庫Rxjava的升級工作。當然遇到不少問題。

正題

遇到的問題

DuplicateFileException: Duplicate files copied in APK META-INF/rxjava.properties

原因:Rxjava版本不統一,導致META-INF/rxjava.properties 這個合併衝突。不用去試圖統一他(我至少沒有成功)因為Retrofit、Rxjava、RxjavaAdapter 總有哪個裡面的版本不一致。

解決辦法:
在 app/build.gradle 中新增下面這段指令,意思是忽略這個打包衝突

packagingOptions {
        exclude 'META-INF/rxjava.properties'
    }

包名變了,類丟失

升級之後你會發現,Rxjava整個包名都TM變了!!對你沒看錯,同樣的庫,包名變了!而且不僅這樣,直接刪掉了Subscribe這些類,把他們改成的介面。所以你很多類要重新引用。並且可能有些方法會失效了。

所以老專案輕易不要升級!!!

    compile "io.reactivex.rxjava2:rxandroid:2.0.1"
    compile "io.reactivex.rxjava2:rxjava:2.1.0"
    compile "com.trello:rxlifecycle:0.3.1"
    compile "com.trello:rxlifecycle-components:0.3.1"
    compile "com.jakewharton.rxbinding:rxbinding:0.4.0"

    compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
compile "com.squareup.retrofit2:retrofit:2.2.0" compile "com.squareup.retrofit2:converter-gson:2.2.0"

這是我專案裡面目前用到的Rx庫。注意一點是RxJava升級之後Retrofit 最好也要同步升下級。

注意以前的引用是這個:io.reactivex:rxjava:2.0.1

這個是新的:io.reactivex.rxjava2:rxjava:2.1.0

學習筆記

升級之後看到一些方法一臉懵逼,開始上網找教程。推薦一篇大家可以看下,比較淺顯易懂。
https://www.jianshu.com/p/464fa025229e,一共有九篇,不過似乎沒出全,不過作者的思路簡單易懂,適合上手。

一口氣擼完了九篇,簡單實踐 and 記錄一下~

Rxjava基礎

建立三部曲
 private void demo01() {
        //上游被觀察者---->發射資料
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1111");
                e.onNext("2222");
                e.onNext("3333");
                e.onComplete();
            }
        });
        //下游觀察者---->接收資料
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("picher","onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d("picher","onNext:"+s);

            }

            @Override
            public void onError(Throwable e) {
                Log.d("picher","onError");
            }

            @Override
            public void onComplete() {
                Log.d("picher","onComplete");
            }
        };
        //訂閱
        observable.subscribe(observer);
    }

Rxjava2 新出現的類

兩個新東西:ObservableEmitterDisposable

ObservableEmitter

  1. 可以通過onNext() 傳送資料
  2. 呼叫onComplete() 或者 onError()將不在繼續傳送後續資料
  3. onComplete()、onError() 只能同時呼叫一個

Disposable

可以理解為控制器,dispose()之後 下游將不在繼續接收資料,但上游可以繼續傳送資料

Consumer

Rxjava1的版本 subscribe() 可以寫 new fun1() fun2()… 但是rxjava2裡面移除了這些,而用Consumer這個類代替,So!!!之前的如果你用掛了太多action或者fun,呵呵…改瘋掉

背壓問題 (MissBackgroundException)

其實這問題Rxjava1也有,只是那會沒有去了解是咋回事的,現在補上~

根本問題:下游處理能力不夠,上游資料被放入容器等待處理,如果無限增長則記憶體會溢位OOM

  • 同一個執行緒不會有這個問題,因為上游傳送一下,下游處理一個
  //同一執行緒沒問題
    private void demo02() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i=0; ;i++){
                    e.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d("picher",""+integer);
            }
        });
    }
  • 非同步任務的時候可能會出現背壓問題
  //背壓問題
    private void demo02() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i=0; ;i++){
                    e.onNext(i);
                }
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d("picher",""+integer);
            }
        });
    }
背壓問題的處理

方案一:降低傳送事件的次數

我們知道背壓問題是上游發的太快了,下游來不及處理導致擠壓,所以我們可以減少傳送的次數。比如加上Filter
過濾掉一部分可以過濾的資料。比如我們加上只讓能被10整除的資料傳送。

    //背壓問題的處理--降低傳送次數
    private void demo03() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i=0;;i++){
                    e.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer % 10 == 0;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d("picher",""+integer);
            }
        });
    }

方案二:降低傳送速度,讓下游來得及去處理事件

我們在每次傳送事件之後睡2s,也可以解決問題。

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i=0;;i++){
                    e.onNext(i);
                    Thread.sleep(2000);
                }
            }
        })

方案三:使用Flowable

  1. 設定BackpressureStrategy 策略
  2. Subscription.request()

也就是RxJava2幫你處理了速度和數量的問題,也順便幫你換了一個容器的容量。具體的去看下Blog吧,我不能保證有博主說的好。https://www.jianshu.com/p/a75ecf461e02

記錄一下如果以後需要寫上游傳送量比較大的程式碼,則使用下面的寫法。

    Flowable.interval(1, TimeUnit.MICROSECONDS)
                .onBackpressureDrop()  //加上背壓策略
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

舉個生活中的例子:
麵包廠一小時生產100個麵包,麵包店一小時賣出去50個麵包,剩下的50個放到倉庫裡面,生產速度比消費速度大,當倉庫存滿的時候在存就爆倉了。

舉個實際專案的例子:
電商專案裡面需要對商品列表做一些比較等耗時處理,比如有200件待處理的商品資料,處理耗時5s,那麼當傳送到第127個商品事件的時候,如果處理演算法還未計算完,那麼就會出現問題。

總結

  1. Rxjava2 包名變了,導包的時候注意匯入’io.reactivex.’這個包裡面的,當然最好不要同時有兩個版本的程式碼
  2. 去掉了 fun,action等方法,
  3. 添加了ObservableEmitter用來發射資料,Disposable 可以終止任務
  4. 背壓問題可以使用Flowable解決