1. 程式人生 > >關於RxJava2.0你不知道的事

關於RxJava2.0你不知道的事

前言

如果你對RxJava1.x還不是瞭解,可以參考下面文章。

開始

Rxjava 已經於2016年11月12日正式釋出了2.0.1版本。

RxJava 2.0 已經按照Reactive-Streams specification規範完全的重寫了。RxJava2.0 已經獨立於RxJava 1.x而存在。

RxJava2.0相比RxJava1.x,它的改動還是很大的,下面我將帶大家瞭解這些改動。

RxJava2.0與1.x的區別

Maven地址

為了讓 RxJava 1.x 和 RxJava 2.x 相互獨立,我們把RxJava 2.x 被放在了maven io.reactivex.rxjava2:rxjava:2.x.y 下,類放在了 io.reactivex 包下使用者從 1.x 切換到 2.x 時需要匯入的相應的包,但注意不要把1.x和2.x混淆了。

這裡寫圖片描述

介面變化

RxJava2.0 是遵循 Reactive Streams Specification 的規範完成的,新的特性依賴其提供的4個基礎介面。分別是:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

在後邊的介紹中我們會涉及到。

Javadoc文件

新增依賴

Android端使用RxJava需要依賴新的包名:

//RxJava的依賴包
compile 'io.reactivex.rxjava2:rxjava:2.0.3'
//RxAndroid的依賴包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

Nulls

RxJava1.x中,支援 null 值,如下程式碼所示:

Observable.just(null);
Single.just(null);

RxJava 2.0不再支援 null 值,如果傳入一個null會丟擲 NullPointerException

Observable.fromCallable(() -> null)
    .subscribe(System.out::println, Throwable::printStackTrace);

Observable.just(1).map(v -> null)
    .subscribe
(System.out::println, Throwable::printStackTrace);

Observable and Flowable

在本節開始之前,我們先了解下RxJava背壓(Backpressure)機制的問題。

什麼是背壓(Backpressure)

在RxJava中,可以通過對Observable連續呼叫多個Operator組成一個呼叫鏈,其中資料從上游向下遊傳遞。當上遊傳送資料的速度大於下游處理資料的速度時,就需要進行Flow Control了。如果不進行Flow Control,就會丟擲MissingBackpressureException異常。

這就像小學做的那道數學題:一個水池,有一個進水管和一個出水管。如果進水管水流更大,過一段時間水池就會滿(溢位)。這就是沒有Flow Control導致的結果。

再舉個例子,在 RxJava1.x 中的 observeOn, 因為是切換了消費者的執行緒,因此內部實現用佇列儲存事件。在 Android 中預設的 buffersize 大小是16,因此當消費比生產慢時, 佇列中的數目積累到超過16個,就會丟擲MissingBackpressureException。

如果你想了解更多關於背壓的知識,請參考:

下面我們通過一段程式碼來“感受”一下背壓。

Observable.interval(1, TimeUnit.MILLISECONDS)
       //將觀察者的工作放在新執行緒環境中
       .observeOn(Schedulers.newThread())
       //觀察者處理每1000ms才處理一個事件
       .subscribe(new Subscriber<Long>() {
           @Override
           public void onCompleted() {
               System.out.println("onCompleted");
           }
           @Override
           public void onError(Throwable e) {
            System.out.println("onError :"+ e);
           }
           @Override
           public void onNext(Long value) {

            try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
            System.out.println("onNext value :"+ value);
           }
       });

Flow Control有哪些思路呢?大概是有四種:

  1. 背壓(Backpressure);
  2. 節流(Throttling);
  3. 打包處理;
  4. 呼叫棧阻塞(Callstack blocking)。

如何讓Observable支援Backpressure?

在RxJava 1.x中,有些Observable是支援Backpressure的,而有些不支援。但不支援Backpressure的Observable可以通過一些operator來轉化成支援Backpressure的Observable。這些operator包括:

  • onBackpressureBuffer
  • onBackpressureDrop
  • onBackpressureLatest
  • onBackpressureBlock(已過期)

它們轉化成的Observable分別具有不同的Backpressure策略。

而在RxJava2.0 中,Observable 不再支援背壓,而是改用Flowable 支援非阻塞式的背壓。Flowable是RxJava2.0中專門用於應對背壓(Backpressure)問題而新增的(抽象)類。其中,Flowable預設佇列大小為128。並且規範要求,所有的操作符強制支援背壓。幸運的是, Flowable 中的操作符大多與舊有的 Observable 類似。

上面提到的四種operator的前三種分別對應Flowable的三種Backpressure策略:

  • BackpressureStrategy.BUFFER
  • BackpressureStrategy.DROP
  • BackpressureStrategy.LATEST

onBackpressureBuffer是不丟棄資料的處理方式。把上游收到的全部快取下來,等下游來請求再發給下游。相當於一個水庫。但上游太快,水庫(buffer)就會溢位。

這裡寫圖片描述

onBackpressureDrop和onBackpressureLatest比較類似,都會丟棄資料。這兩種策略相當於一種令牌機制(或者配額機制),下游通過request請求產生令牌(配額)給上游,上游接到多少令牌,就給下游傳送多少資料。當令牌數消耗到0的時候,上游開始丟棄資料。但這兩種策略在令牌數為0的時候有一點微妙的區別:onBackpressureDrop直接丟棄資料,不快取任何資料;而onBackpressureLatest則快取最新的一條資料,這樣當上遊接到新令牌的時候,它就先把快取的上一條“最新”資料傳送給下游。可以結合下面兩幅圖來理解。

這裡寫圖片描述

這裡寫圖片描述
onBackpressureBlock是看下游有沒有需求,有需求就發給下游,下游沒有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的情況了),自己並不快取。這種策略已經廢棄不用。

注意:在RxJava2.0中,舊的Observable也保留了,你還可以像以前那樣使用,同時要注意介面的變化。

需要說明的是,RxJava2.0中,Flowable是對Observable的補充(而不是替代),也可以這麼說,Flowable是能夠支援Backpressure的Observable。

何時用Observable

  1. 當上遊在一段時間傳送的資料量不大(以1000為界限)的時候優先選擇使用Observable;
  2. 在處理GUI相關的事件,比如滑鼠移動或觸控事件,這種情況下很少會出現backpressured的問題,用Observable就足以滿足需求;
  3. 獲取資料操作是同步的,但你的平臺不支援Java流或者相關特性。使用Observable的開銷低於Flowable。

何時用Flowable

  1. 當上遊在一段時間傳送的資料量過大的時候(這個量我們往往無法預計),此時就要使用Flowable以限制它所產生的量的元素10K +處理。
  2. 當你從本地磁碟某個檔案或者資料庫讀取資料時(這個資料量往往也很大),應當使用Flowable,這樣下游可以根據需求自己控制一次讀取多少資料;
  3. 以讀取資料為主且有阻塞執行緒的可能時用Flowable,下游可以根據某種條件自己主動讀取資料。

Single、Completable

Single 與 Completable 都基於新的 Reactive Streams 的思想重新設計了介面,主要是消費者的介面, 現在他們是這樣的:

interface SingleObserver<T> {  
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}

interface CompletableObserver<T> {  
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable error);
}

Subscriber

對比一下 Subscriber :

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

我們會發現和以前不一樣的是多了一個 onSubscribe 的方法, Subscription 如下:

Subscription

public interface Subscription {  
    public void request(long n);
    public void cancel();
}

熟悉 RxJava 1.x 的朋友能發現, 新的 Subscription 更像是綜合了舊的 Producer 與 Subscription 的綜合體。他既可以向上遊請求(request)資料,又可以打斷並釋放(cancel)資源。而舊的 Subscription 在這裡因為名字被佔,而被重新命名成了 Disposable。

注意:Subscription 不再有訂閱subcribe和unSubcribe的概念。

Disposable

public interface Disposable {  
    void dispose();
    boolean isDisposed();
}

這裡最大的不同就是這個 onSubscribe ,根據 Specification, 這個函式一定是第一個被呼叫的, 然後就會傳給呼叫方一個 Subscription ,通過這種方式組織新的背壓關係。當我們消費資料時,可以通過 Subscription 物件,自己決定請求資料。

這裡就可以解釋上面的非阻塞的背壓。舊的阻塞式的背壓,就是根據下游的消費速度,中游可以選擇阻塞住等待下游的消費,隨後向上遊請求資料。而新的非阻塞就不在有中間阻塞的過程,由下游自己決定取多少,還有背壓策略,如拋棄最新、拋棄最舊、快取、拋異常等。

而新的介面帶來的新的呼叫方式與舊的也不太一樣, subscribe 後不再會有 Subscription 也就是如今的 Disposable,為了保持向後的相容, Flowable 提供了 subscribeWith方法 返回當前的 Subscriber 物件, 並且同時提供了 DefaultSubscriber , ResourceSubscriber , DisposableSubscriber ,讓他們提供了 Disposable 介面,並且可以從外面取消 dispose()。 現在也可以完成和以前類似的程式碼:

ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {  
    @Override
    public void onStart() {
        request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(t);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
};

Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);

subscriber.dispose();

注意,由於Reactive-Streams的相容性,方法onCompleted被重新命名為onComplete。另外注意dispose()方法,這個方法允許你釋放資源。

RxJava2.x中提供了幾個Subcriber物件,如下所示:

  • DefaultSubscriber:通過實現Subscriber介面,可以通過呼叫request(long n)方法請求或者cancel()方法取消訂閱(同步請求)
public abstract class DefaultSubscriber<T> implements Subscriber<T>
  • DisposableSubscriber:通過實現Desposable非同步刪除。
public abstract class DisposableSubscriber<T> implements Subscriber<T>, Disposable
  • ResourceSubscriber:允許非同步取消其訂閱相關資源,節省記憶體而且是執行緒安全。
public abstract class ResourceSubscriber<T> implements Subscriber<T>, Disposable
  • SafeSubscriber:包裝另一個訂閱者,並確保所有onXXX方法遵守協議(序列化要求訪問除外)。
public final class SafeSubscriber<T> implements Subscriber<T>, Subscription
  • SerializedSubscriber:序列化訪問另一個訂閱者的onNext,onError和onComplete方法。
public final class SerializedSubscriber<T> implements Subscriber<T>, Subscription

在onSubscribe/onStart中呼叫request

注意,在Subscriber.onSubscribe或ResourceSubscriber.onStart中呼叫request(n)將會立即呼叫onNext,例項程式碼如下:

Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {

    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("OnSubscribe start");
        s.request(Long.MAX_VALUE);
        System.out.println("OnSubscribe end");
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
});

輸出結果如下:

OnSubscribe start
1
2
3
Done
OnSubscribe end

當你在onSubscribe/onStart中做了一些初始化的工作,而這些工作是在request後面時,會出現一些問題,在onNext執行時,你的初始化工作的那部分程式碼還沒有執行。為了避免這種情況,請確保你呼叫request時,已經把所有初始化工作做完了。

這個行為不同於1.x中的 request要經過延遲的邏輯直到上游的Producer到達時。在2.0中,總是Subscription先傳遞下來,90%的情況下沒有延遲請求的必要。

Subscription

在RxJava 1.x中,介面rx.Subscription負責流和資源的生命週期管理,即退訂和釋放資源,例如scheduled tasks。Reactive-Streams規範用這個名稱指定source和consumer之間的關係: org.reactivestreams.Subscription 允許從上游請求一個正數,並支援取消。

為了避免名字衝突,1.x的rx.Subscription被改成了 io.reactivex.Disposable。

因為Reactive-Streams的基礎介面org.reactivestreams.Publisher 定義subscribe()為無返回值,Flowable.subscribe(Subscriber)不再返回任何Subscription。其他的基礎型別也遵循這種規律。
在2.x中其他的subscribe的過載方法返回Disposable。

原始的Subscription容器型別已經被重新命名和修改。

  • CompositeSubscription 改成 CompositeDisposable,
  • SerialSubscription 和MultipleAssignmentSubscription 被合併到了 SerialDisposable。 set() 方法取消了舊值,而replace()方法沒有。
  • RefCountSubscription 已被刪除。

收回 create 方法許可權

在RxJava 1.x 最明顯的問題就是由於 create 的太過開放,導致其被開發者濫用,而不是學習使用提供的操作符。並且使用者對 RxJava 不夠了解,導致各種各樣的問題,如背壓、異常處理等。

由於規範要求所有的操作符強制支援背壓,因此新的 create 採用了保守的設計,讓使用者實現 FlowableOnSubscribe 介面,並選取背壓策略,然後在內部實現封裝支援背壓,簡單的例子如下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
  @Override
  public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
      emitter.onNext(1);
      emitter.onNext(2);
      emitter.onComplete();
    }
 }, BackpressureStrategy.BUFFER);

Functions可以丟擲異常

不同於RxJava1.x,RxJava2.x中沒有了一系列的Action/Func介面,取而代之的是與Java8命名類似的函式式介面,如下圖:

這裡寫圖片描述

而Consumer即消費者,用於接收單個值,BiConsumer則是接收兩個值,Function用於變換物件,Predicate用於判斷。這些介面命名大多參照了Java8,熟悉Java8新特性的應該都知道意思,這裡也就不再贅述了。

public interface Consumer<T> {
    void accept(T t) throws Exception;
}

新的ActionX、FunctionX的方法宣告都增加了一個throws Exception,這帶來了顯而易見的好處,現在我們可以這樣寫:

Flowable.just("qq.txt")
    .map(new Function<String, Integer>() {
        @Override
        public Integer apply(String value) throws Exception {
            File file = new File(value);
            file.createNewFile();
            return 99;
        }
    });

而createNewFile方法顯式的丟擲了一個IOException,而在以前是不可以這樣寫的。

Schedulers

在2.0的API中仍然支援主要的預設scheduler: computation, io, newThread 和 trampoline,可以通過io.reactivex.schedulers.Schedulers這個實用的工具類來排程。

2.0中不存在immediate 排程器。 它被頻繁的誤用,並沒有正常的實現 Scheduler 規範;它包含用於延遲動作的阻塞睡眠,並且不支援遞迴排程。你可以使用Schedulers.trampoline()來代替它。

Schedulers.test()已經被移除,這樣避免了預設排程器休息的概念差異。那些返回一個”global”的排程器例項是鑑於test()總是返回一個新的TestScheduler例項。現在我們鼓勵測試人員使用這樣簡單的程式碼new TestScheduler()。

io.reactivex.Scheduler抽象類現在支援直接排程任務,不需要先建立然後通過Worker排程。

操作符的差別

2.0中大部分操作符仍然被保留,實際上大部分行為和1.x一樣。

關於操作符,引用JakeWharton的總結就是:

All the same operators(you konw and love or hate and despise) are still there.

Transformer

RxJava 1.x 中Transformer實際上就是Func1<Observable,Observable>,換句話說就是提供給他一個Observable它會返回給你另一個Observable,這和內聯一系列操作符有著同等功效。

相關API如下:

public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
   // cover for generics insanity
}

public interface Func1<T, R> extends Function {
    R call(T t);
}

實際操作下,寫個方法,建立一個Transformer排程器:

//子執行緒執行,主執行緒回撥
public Observable.Transformer<T, T> io_main(final RxAppCompatActivity context) {
        return new Observable.Transformer<T, T>() {

            @Override
            public Observable<T> call(Observable<T> tObservable) {

                Observable<T> observable = (Observable<T>) tObservable
                        .subscribeOn(Schedulers.io())
                        .doOnSubscribe(new Action0() {
                            @Override
                            public void call() {
                                DialogHelper.showProgressDlg(context, mMessage);
                            }
                        })
                        .subscribeOn(AndroidSchedulers.mainThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .compose(RxLifecycle.bindUntilEvent(context.lifecycle(), ActivityEvent.STOP));

                return observable;

            }
        };
    }

在實際應用中,Transformer 經常和 Observable.compose() 一起使用。本人的Community框架也有使用,這裡就不多介紹了。

在RxJava2.0中,Transformer劃分的更加細緻了,每一種“Observable”都對應的有自己的Transformer,相關API如下所示:

public interface ObservableTransformer<Upstream, Downstream> {
    ObservableSource<Downstream> apply(Observable<Upstream> upstream);
}

public interface CompletableTransformer {
    CompletableSource apply(Completable upstream);
}

public interface FlowableTransformer<Upstream, Downstream> {
    Publisher<Downstream> apply(Flowable<Upstream> upstream);
}

public interface MaybeTransformer<Upstream, Downstream> {
    MaybeSource<Downstream> apply(Maybe<Upstream> upstream);
}


public interface SingleTransformer<Upstream, Downstream> {
    SingleSource<Downstream> apply(Single<Upstream> upstream);
}

這裡以FlowableTransformer為例,建立一個Transformer排程器:

//子執行緒執行,主執行緒回撥
    public FlowableTransformer<T, T> io_main(final RxAppCompatActivity context) {
        return new FlowableTransformer<T, T>() {


            @Override
            public Publisher<T> apply(Flowable<T> flowable) {
                return flowable
                        .subscribeOn(Schedulers.io())
                        .doOnSubscribe(new Consumer<Subscription>() {
                            @Override
                            public void accept(Subscription subscription) throws Exception {
                                DialogHelper.showProgressDlg(context, mMessage);
                            }
                        })
                        .subscribeOn(AndroidSchedulers.mainThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(context.lifecycle(), ActivityEvent.DESTROY));
            }
        };
    }

其他改變

doOnCancel/doOnDispose/unsubscribeOn

在1.x中,doOnUnsubscribe總是執行終端事件,因為SafeSubscriber呼叫了unsubscribe。這實際上是沒有必要的。Reactive-Streams規範中,一個終端事件到達Subscriber,上游的Subscription會取消,因此呼叫 cancel()是一個空操作。

由於同樣的原因unsubscribeOn也沒被在終端路徑上呼叫,但只有實際在鏈上呼叫cancel時,才會呼叫unsubscribeOn。

因此,下面的序列不會被呼叫

doOnCancel

Flowable.just(1,2,3)
        .doOnCancel(new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, " doOnCancel " );
            }
        })
        .subscribe(new DisposableSubscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, " onNext : " + integer);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {
                Log.e(TAG, " onComplete isDisposed() = " + isDisposed());

            }
        });

輸出結果如下:

onNext : 1
onNext : 2
onNext : 3
onComplete isDisposed() = false

然而,下面將會呼叫take操作符在傳送過程中取消onNext


Flowable.just(1,2,3)
        .doOnCancel(new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, " doOnCancel " );
            }
        })
        .take(2)
        .subscribe(new DisposableSubscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, " onNext : " + integer);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {
                Log.e(TAG, " onComplete isDisposed() = " + isDisposed());

            }
        });

輸出結果如下:

onNext : 1
onNext : 2
doOnCancel 
onComplete isDisposed() = false

使用take操作符,呼叫了cancel方法,我們看一下take操作符的原始碼:

  @CheckReturnValue
  @BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
  @SchedulerSupport(SchedulerSupport.NONE)
  public final Flowable<T> take(long count) {
      if (count < 0) {
          throw new IllegalArgumentException("count >= 0 required but it was " + count);
      }
      return RxJavaPlugins.onAssembly(new FlowableTake<T>(this, count));
  }

關鍵點就是這個FlowableTake類,這裡限於篇幅的原因就不看原始碼了,大家可以自己看一下,然後找找是什麼地方呼叫了cancel。

同樣的,如果你需要在終端或者取消時執行清理,考慮使用using操作符代替。

以上就是RxJava2.0中的改動,下面我們重點介紹下RxJava2.0中的觀察者模式。

RxJava2.0中的觀察者模式

RxJava始終以觀察者模式為骨架,在2.0中依然如此。

在RxJava2.0中,有五種觀察者模式:

  1. Observable/Observer
  2. Flowable/Subscriber
  3. Single/SingleObserver
  4. Completable/CompletableObserver
  5. Maybe/MaybeObserver

後面三種觀察者模式差不多,Maybe/MaybeObserver可以說是Single/SingleObserverCompletable/CompletableObserver的複合體。

下面列出這五個觀察者模式相關的介面。

Observable/Observer

public abstract class Observable<T> implements ObservableSource<T>{...}

public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

public interface Observer<T> {
    void onSubscribe(Disposable d);
    void onNext(T t);
    void onError(Throwable e);
    void onComplete();
}

Completable/CompletableObserver

//代表一個延遲計算沒有任何價值,但只顯示完成或異常。類似事件模式Reactive-Streams:onSubscribe(onError | onComplete)?
public abstract class Completable implements CompletableSource{...}

//沒有子類繼承Completable
public interface CompletableSource {
    void subscribe(CompletableObserver cs);
}

public interface CompletableObserver {
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable e);
}

Flowable/Subscriber

public abstract class Flowable<T> implements  Publisher<T>{...}

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Maybe/MaybeObserver

//Maybe類似Completable,它的主要消費型別是MaybeObserver順序的方式,遵循這個協議:onSubscribe(onSuccess | onError | onComplete)
public abstract class Maybe<T> implements MaybeSource<T>{...}

public interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}

public interface MaybeObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T t);
    void onError(Throwable e);
    void onComplete();
}

Single/SingleObserver

//Single功能類似於Observable,除了它只能發出一個成功的值,或者一個錯誤(沒有“onComplete”事件),這個特性是由SingleSource介面決定的。
public abstract class Single<T> implements SingleSource<T>{...}

public interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}

public interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T t);
    void onError(Throwable e);
}

其實從API中我們可以看到,每一種觀察者都繼承自各自的介面(都有一個共同的方法subscrib()),但是引數不一樣),正是各自介面的不同,決定了他們功能不同,各自獨立(特別是Observable和Flowable),同時保證了他們各自的拓展或者配套的操作符不會相互影響。

這裡寫圖片描述

下面我們重點說說在實際開發中經常會用到的兩個模式:Observable/Observer和Flowable/Subscriber。

Observable/Observer

Observable正常用法:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

需要注意的是,這類觀察模式不支援背壓,下面我們具體分析下。

當被觀察者快速傳送大量資料時,下游不會做其他處理,即使資料大量堆積,呼叫鏈也不會報MissingBackpressureException,消耗記憶體過大隻會OOM。

在測試的時候,快速傳送了100000個整形資料,下游延遲接收,結果被觀察者的資料全部發送出去了,記憶體確實明顯增加了,遺憾的是沒有OOM。

所以,當我們使用Observable/Observer的時候,我們需要考慮的是,資料量是不是很大(官方給出以1000個事件為分界線,供各位參考)。

Flowable/Subscriber

Flowable.range(0, 10)
        .subscribe(new Subscriber<Integer>() {
            Subscription subscription;

            //當訂閱後,會首先呼叫這個方法,其實就相當於onStart(),
            //傳入的Subscription s引數可以用於請求資料或者取消訂閱
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onsubscribe start");
                subscription = s;
                subscription.request(1);
                Log.d(TAG, "onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
                Log.d(TAG, "onNext--->" + o);
                subscription.request(3);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

輸出結果如下:

onsubscribe start
onNext--->0
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onNext--->6
onNext--->7
onNext--->8
onNext--->9
onComplete
onsubscribe end

Flowable是支援背壓的,也就是說,一般而言,上游的被觀察者會響應下游觀察者的資料請求,下游呼叫request(n)來告訴上游傳送多少個數據。這樣避免了大量資料堆積在呼叫鏈上,使記憶體一直處於較低水平。

當然,Flowable也可以通過create()來建立:

Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    }, BackpressureStrategy.BUFFER);//指定背壓策略

Flowable雖然可以通過create()來建立,但是你必須指定背壓的策略,以保證你建立的Flowable是支援背壓的(這個在1.0的時候就很難保證,可以說RxJava2.0收緊了create()的許可權)。

根據上面的程式碼的結果輸出中可以看到,當我們呼叫subscription.request(n)方法的時候,不等onSubscribe()中後面的程式碼執行,就會立刻執行onNext方法,因此,如果你在onNext方法中使用到需要初始化的類時,應當儘量在subscription.request(n)這個方法呼叫之前做好初始化的工作;

當然,這也不是絕對的,我在測試的時候發現,通過create()自定義Flowable的時候,即使呼叫了subscription.request(n)方法,也會等onSubscribe()方法中後面的程式碼都執行完之後,才開始呼叫onNext。

平滑升級

RxJava1.x 如何平滑升級到RxJava2.0呢?

由於RxJava2.0變化較大無法直接升級,幸運的是,官方提供了RxJava2Interop這個庫,可以方便地將RxJava1.x升級到RxJava2.0,或者將RxJava2.0轉回RxJava1.x。

總結

可以明顯的看到,RxJava2.0最大的改動就是對於backpressure的處理,為此將原來的Observable拆分成了新的Observable和Flowable,同時其他相關部分也同時進行了拆分。

除此之外,就是我們最熟悉和喜愛的RxJava。