RxJava 原理分析(一)訂閱關係的形成
最近複習RxJava的時候發現解說RxJava的原理不多,所以,來機智的騙心心來了。
依賴:
implementation "io.reactivex.rxjava2:rxjava:2.2.4"
簡單使用
待會兒講解的原理呢,是由這個使用 demo 來講解。
Observable.create(new ObservableOnSubscribe<Integer>() {// 建立被觀察者 @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1);// 發射器,發射一個事件 emitter.onNext(2); } }).subscribe(new Consumer<Integer>() {// 觀察者 @Override public void accept(Integer integer) throws Exception {// 用於處理 onNext 事件 Log.d(TAG, "accept: "+integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "accept: "+throwable.getMessage()); } });
原理解析
Observable.create(ObservableOnSubscribe<T>)
在上面的使用demo中,這是整個訂閱過程的開始:建立一個被觀察者。來,我們看一下我們的被觀察者是什麼。
Observable.class
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null");// 判空 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));// 預設直接返回這個new出來的ObservableCreate }
這個 ObjectHelper.requireNonNull
怎麼判空啊?
public static <T> T requireNonNull(T object, String message) { if (object == null) { throw new NullPointerException(message); } return object; }
簡直不要太簡單,就是判斷輸入的object是不是null,不是就返回,是就丟擲異常。
那這個 RxJavaPlugins
又是幹嘛的呢?用過的都知道,沒用過推薦看一下 ofollow,noindex">給初學者的RxJava2.0教程(十) 。其實我們沒有對這個 RxJavaPlugins 做設定的話,就是返回我們傳進來的這個引數。後面我們看到這個RxJavaPlugins就可以預設為直接返回引數。
書接上文,那呼叫 Observable.create(ObservableOnSubscribe)
,豈不是就是把自己在外部實現的 ObservableOnSubscribe 匿名內部類包裝到 ObservableCreate
中去羅。
public final class ObservableCreate<T> extends Observable<T> {// 注意繼承關係哦 final ObservableOnSubscribe<T> source;// 最原始的目標:被觀察者 public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source;// 傳進來的是外面new的匿名內部類ObservableOnSubscribe } ... }
看到這裡,emmm,nice,儲存住了被觀察者,返回了 ObservableCreate
物件。注意哦,這個時候後面一個 .
操作的就是這個 ObservableCreate
物件(換言之,後面點的就是 ObservableCreate
中的方法)。
好的,下面就是 .subscribe
.subscribe
這個方法就很有意思了,實現了訂閱這個操作,或者說,是觸發了onSuscribe、onNext 等操作。
先看一下 subscribe 這個方法的過載:
public final Disposable subscribe() {}// 注意哦,這裡的都是final方法 public final Disposable subscribe(Consumer<? super T> onNext) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} public final void subscribe(Observer<? super T> observer) {}
有點多哈,但是實現呢?很簡單,大家都呼叫最後一個處理,沒有傳參的都添上預設的 Custom 就可以了。什麼叫預設實現,就是實現了介面,但是預設不處理,比如預設的Action,還有就是簡單處理,比如onError,但是都不是很重要,我們主要關注我們自己實現的東西嘛。
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) { return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null");// 挨個判空 ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);// 用一個類來包裹 subscribe(ls);// 包裹類的訂閱 return ls; }
那麼這個用來包裹的 LambdaObserver 又是什麼鬼呢?
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable, LambdaConsumerIntrospection { private static final long serialVersionUID = -7251123623727029452L; final Consumer<? super T> onNext; final Consumer<? super Throwable> onError; final Action onComplete; final Consumer<? super Disposable> onSubscribe; public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { super(); this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; this.onSubscribe = onSubscribe; } ... }
就是將四個型別(onNext,onError,onComplete,onSubscribe)封裝成一個一個物件,方便對整個流程的呼叫。
將四個物件封裝在一起了過後,就是應該是訂閱了把?
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer);// 檢查需要執行預操作不,預設返回 observer // 判空 ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer);// 真正處理的函式 } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e);// 錯誤的話,捕獲,並交給RxJavaPlugins#onError來處理,沒有交給我們自定義的onError,莫看錯啦 NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
真正有用的就是 subscribeActual(observer);
這一句,吐血哦,還不訂閱,我都快等不及了。好嘛,大神就是大神,一看,自己沒實現。後面一想,就是牛逼。
/** * Operator implementations (both source and intermediate) should implement this method that * performs the necessary business logic and handles the incoming {@link Observer}s. * <p>There is no need to call any of the plugin hooks on the current {@code Observable} instance or * the {@code Observer}; all hooks and basic safeguards have been * applied by {@link #subscribe(Observer)} before this method gets called. * @param observer the incoming Observer, never null */ protected abstract void subscribeActual(Observer<? super T> observer);
所有的操作符都會重寫這個方法,我是不是暴露了什麼,emm, 所有運算子返回都是 Observable 的子類 ,就比如說 create
返回的就是 Observable
的子類 ObservableCreate
,後面就是在這基礎上呼叫了。這裡我們可以看到,其實所有的 subscribe 我們都重寫不了,唯一能重寫且必須重寫的就是這個 subscribeActual。這就意味著,這個方法是我子類實現真正訂閱的入口。
前面講解我們知道,現在在demo中呼叫的是 ObservableCreate 的 subscribe,那麼事情就變得很簡單,我們直接找到 ObservableCreate#subscribeActual(observer) 進行分析就好。在分析之前,先用圖總結下前面的東西:

好的嘞,那我們接下來就看一下核心的 subscribeActual 方法如何實現的?
@Override protected void subscribeActual(Observer<? super T> observer) {//傳進來的就是前面的 LambdaObserver CreateEmitter<T> parent = new CreateEmitter<T>(observer);// 發射器,每一個操作符對應的類內部都會自己實現,因為處理邏輯不一樣啊 observer.onSubscribe(parent);// 回調了自己實現的 onSubscribe 對用的Consumer try { source.subscribe(parent);// 這個source就是初始化類的時候傳進來的被觀察者,這裡將傳送器給了它 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex);// 這裡捕獲的異常才交給自己寫的Consumer處理 } }
這裡程式碼不多,乾的事還不少,首先是new出我們的發射器,然後觸發被觀察者的訂閱回撥,然後再,執行被觀察者的subscribe方法,如果捕獲到異常就交給自定義的 onError 處理。
有一個概念可以先了解,無論以後把這個物件怎麼包裹,傳遞,只有這裡才呼叫了被觀察者 Observable 的 subscribe
方法。
接下來我們來詳解一下,這個回撥流程。
observer.onSubscribe(parent);
這裡的 observer 是什麼?傳進來的 包裝有自定義的觀察者的 LambdaObserver ,他是 Observable 的子類。接下來看一下他的呼叫:
@Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) {// 設定並判斷是否是第一次 try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); d.dispose();// 出現異常,解除訂閱 onError(ex);// 並向下傳遞 } } } @Override public void onError(Throwable t) { if (!isDisposed()) {// 判斷是否斷開連線 lazySet(DisposableHelper.DISPOSED); try { onError.accept(t);//交給自定義的消費者 } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(new CompositeException(t, e)); } } else { RxJavaPlugins.onError(t); } }
很簡單,對不對,就是直接呼叫了 onNext 消費者的 accept 方法(自定義或者預設)。出現異常,如果沒有斷開連線,且是第一次接收到,就交給 onError (自定義或者預設)處理。
那麼 source.subscribe(parent);
不會也這麼簡單把?
source 就是傳進來的 ObservableOnSubscribe,自定義的被觀察者
parent 就是發射器。 這一呼叫,我們自己寫的被觀察者的邏輯就巴拉巴拉的執行了
這裡就是直接就回調了。嚇人。。
那 parent.onError(ex);
怎麼實現的?這就要真正說道說道這個發射器了。
傳送器是什麼狗東西呢?它是 ObservableCreate
的靜態內部類:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { final Observer<? super T> observer;// 觀察者,就是傳進來的 LambdaObserver CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) {// 是否已經斷開連線 observer.onNext(t);// 呼叫自定義的Custom回撥處理 } } @Override public void onError(Throwable t) { if (!tryOnError(t)) {// 交給觀察者 RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) {//沒斷開,不為null try { observer.onError(t); } finally { dispose();//失敗直接斷開連線 } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete();// 交給觀察者 } finally { dispose(); } } } @Override public void setDisposable(Disposable d) {DisposableHelper.set(this, d);} @Override public void dispose() {DisposableHelper.dispose(this);} @Override public boolean isDisposed() {return DisposableHelper.isDisposed(get());} }
咦,這結構簡直不要太簡單,就 相當於一次中轉 ,只不過是加入了一些容錯機制。到這裡整個流程要用到的類就結束了,當我們在書寫被觀察者的時候,使用這個發射器提交東西,就是直接呼叫這個東西來呼叫消費者的對應回撥。
千言不如一圖

不對,放錯了

- 建立真正的 被觀察者 包裹物件(繼承於 Observable),並將自己寫的真正被觀察者包裹起來
- 呼叫被觀察者的subscribe方法,將自己建立的 觀察者 包裹起來
- 作為引數,傳入ObservableCreate 對方實現的 subscribeActual(Observer) 中。
- 在 subscribeActual 方法中 生成 emitter ,並且回撥被觀察者的onSubscribe,確定連線關係。
- 自己寫的 被觀察者的 subscribe 呼叫 ,我們可以使用 emitter 提交東西,一提交就將提交的東西交給 被觀察者對應的方法執行