1. 程式人生 > >RxJava Observer與Subscriber的關係

RxJava Observer與Subscriber的關係

RxJava系列教程:

在說Observer與Subscriber的關係之前,我們下重溫下相關概念。

RxJava 的觀察者模式

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer。

與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext() (相當於 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
  • onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
  • 在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。

RxJava 的觀察者模式大致如下圖:
這裡寫圖片描述

RxJava的實現

基於以上的概念, RxJava 的基本實現主要有三點:

1) 建立 Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 介面的實現方式:

Observer<Apps> observer = new Observer<Apps>() {
        @Override
        public void onCompleted() {
            listView.onRefreshComplete();
        }

        @Override
public void onError(Throwable e) { listView.onRefreshComplete(); } @Override public void onNext(Apps appsList) { listView.onRefreshComplete(); appLists.addAll(appsList.apps); adapter.notifyDataSetChanged(); } };

除了 Observer 介面之外,RxJava 還內建了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:

Subscriber subscriber = new Subscriber<Apps>() {
        @Override
        public void onCompleted() {
            listView.onRefreshComplete();
        }

        @Override
        public void onError(Throwable e) {
            listView.onRefreshComplete();
        }

        @Override
        public void onNext(Apps appsList) {
            listView.onRefreshComplete();
            appLists.addAll(appsList.apps);
            adapter.notifyDataSetChanged();
        }
    };

不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。

Subscriber是Observer的實現類

public abstract class Subscriber<T> implements Observer<T>, Subscription 

而onStart()方法是Subscriber中的一個方法。它也屬於回撥級別的。

subscribe(Subscriber)方法中有如下程式碼:

// if not already wrapped   包裹一層
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

他將subscriber包裝起來,這個具體什麼意思有待研究,繼續下看。

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
   return hook.onSubscribeReturn(subscriber);

hook是什麼呢?

private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

RxJavaObservableExecutionHook.java原始碼:

/**
 * Copyright 2014 Netflix, Inc.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.plugins;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;

/**
 * Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} execution with a
 * default no-op implementation.
 * <p>
 * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
 * <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
 * <p>
 * <b>Note on thread-safety and performance:</b>
 * <p>
 * A single implementation of this class will be used globally so methods on this class will be invoked
 * concurrently from multiple threads so all functionality must be thread-safe.
 * <p>
 * Methods are also invoked synchronously and will add to execution time of the observable so all behavior
 * should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate
 * worker threads.
 * 
 */
public abstract class RxJavaObservableExecutionHook {
    /**
     * Invoked during the construction by {@link Observable#create(OnSubscribe)}
     * <p>
     * This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
     * logging, metrics and other such things and pass-thru the function.
     * 
     * @param f
     *            original {@link OnSubscribe}<{@code T}> to be executed
     * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
     *         returned as a pass-thru
     */
    public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;
    }

    /**
     * Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed.
     * <p>
     * This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
     * logging, metrics and other such things and pass-thru the function.
     * 
     * @param onSubscribe
     *            original {@link OnSubscribe}<{@code T}> to be executed
     * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
     *         returned as a pass-thru
     */
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass-thru by default
        return onSubscribe;
    }

    /**
     * Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with returned
     * {@link Subscription}.
     * <p>
     * This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging,
     * metrics and other such things and pass-thru the subscription.
     * 
     * @param subscription
     *            original {@link Subscription}
     * @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a
     *         pass-thru
     */
    public <T> Subscription onSubscribeReturn(Subscription subscription) {
        // pass-thru by default
        return subscription;
    }

    /**
     * Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown Throwable.
     * <p>
     * This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when
     * attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.
     * 
     * @param e
     *            Throwable thrown by {@link Observable#subscribe(Subscriber)}
     * @return Throwable that can be decorated, replaced or just returned as a pass-thru
     */
    public <T> Throwable onSubscribeError(Throwable e) {
        // pass-thru by default
        return e;
    }

    /**
     * Invoked just as the operator functions is called to bind two operations together into a new
     * {@link Observable} and the return value is used as the lifted function
     * <p>
     * This can be used to decorate or replace the {@link Operator} instance or just perform extra
     * logging, metrics and other such things and pass-thru the onSubscribe.
     * 
     * @param lift
     *            original {@link Operator}{@code <R, T>}
     * @return {@link Operator}{@code <R, T>} function that can be modified, decorated, replaced or just
     *         returned as a pass-thru
     */
    public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
        return lift;
    }
}

RxJavaObservableExecutionHook類的作用很特殊,似乎沒有什麼太大的作用,傳進去什麼(型別)引數,返回什麼(型別)引數。

如下程式碼所示:

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;
    }

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass-thru by default
        return onSubscribe;
    }

至於最後關鍵的返回結果:

public <T> Subscription onSubscribeReturn(Subscription subscription) {
        // pass-thru by default
        return subscription;
    }

說白了,就是返回訂閱的Observer物件。

Observer與Subscriber的區別

它們的區別對於使用者來說主要有兩點:

  1. onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未傳送之前被呼叫,可以用於做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實現為空。需要注意的是,如果對準備工作的執行緒有要求(例如彈出一個顯示進度的對話方塊,這必須在主執行緒執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的執行緒被呼叫,而不能指定執行緒。要在指定的執行緒來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。
  2. unsubscribe(): 這是 Subscriber 所實現的另一個介面 Subscription 的方法,用於取消訂閱。在這個方法被呼叫後,Subscriber 將不再接收事件。一般在這個方法呼叫前,可以使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有記憶體洩露的風險。所以最好保持一個原則:要在不再使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)呼叫 unsubscribe() 來解除引用關係,以避免記憶體洩露的發生。

2) 建立 Observable

Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併為它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("John");
        subscriber.onCompleted();
    }
});

可以看到,這裡傳入了一個 OnSubscribe 物件作為引數。OnSubscribe 會被儲存在返回的 Observable 物件中,它的作用相當於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber 將會被呼叫兩次 onNext() 和一次 onCompleted())。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

3) Subscribe (訂閱)
建立了 Observable 和 Observer 之後,再用 subscribe() 方法將它們聯結起來,整條鏈子就可以工作了。程式碼形式很簡單:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心程式碼):

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

可以看到,subscriber() 做了3件事:

  1. 呼叫 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。
  2. 呼叫 Observable 中的 OnSubscribe.call(Subscriber) 。在這裡,事件傳送的邏輯開始執行。從這也可以看出,在 RxJava 中, Observable 並不是在建立的時候就立即開始傳送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。
  3. 將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().

整個過程中物件間的關係如下圖:
這裡寫圖片描述

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支援不完整定義的回撥,RxJava 會自動根據定義創建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 過程中最終會被轉換成 Subscriber 物件,因此,從某種程度上來說用 Subscriber 來代替 Observer ,這樣會更加嚴謹。

根據目前的經驗來看,Observer與Subscriber的主要區別在於onCompleted()方法執行完畢後是否取消了訂閱。

首先,我們分別定義mSubscriber 和 mObserver 。

如下程式碼:

protected Subscriber<D> mSubscriber = new Subscriber<D>() {
        @Override
        public void onCompleted() {
            executeOnLoadFinish();
        }

        @Override
        public void onError(Throwable e) {
            TLog.error("onError " + e.toString());
            executeOnLoadDataError(null);
        }

        @Override
        public void onNext(D d) {
            TLog.log("onNext " );
            List<T> list = d;
            TLog.log("entity " + list.size());
            executeOnLoadDataSuccess(list);

            TLog.log("onSuccess totalPage " + totalPage);
        }
    };
    protected Observer<D> mObserver = new Observer<D>() {
        @Override
        public void onCompleted() {
            executeOnLoadFinish();
        }

        @Override
        public void onError(Throwable e) {
            TLog.error("onError " + e.toString());
            executeOnLoadDataError(null);
        }

        @Override
        public void onNext(D d) {
            TLog.log("onNext " );
            List<T> list = d;
            TLog.log("entity " + list.size());
            executeOnLoadDataSuccess(list);

            TLog.log("onSuccess totalPage " + totalPage);
        }
    };
observable.subscribeOn(Schedulers.io())
                .map(new Func1<Response<D>,D>() {

                    @Override
                    public D call(Response<D> response) {
                        if(response == null){
                            throw new ApiException(100);
                        }
                        totalPage = response.total;
                        return response.result;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                //.subscribe(mObserver);
                .subscribe(mSubscriber);

subscribe(mObserver)和subscribe(mSubscriber)執行結果就會有區別:

  • mObserver可以重複使用,也就是subscribe(mObserver)可以重複訂閱;
  • 當使用Observable.create方式建立Observable時,mSubscriber也能重複使用,也可以實現subscribe(mSubscriber)重複訂閱,但當使用Observable.just、Observable.from方式建立Observable時,mSubscriber不能重複使用,也就不能重複訂閱了(詳見文章:http://blog.csdn.net/jdsjlzx/article/details/54959052)。
  • RxJava給出的建議是每次使用都new一個新的Subscriber 或者是使用Observer,也就是不要重複使用mSubscriber。

提醒:個人以為subscribe(mObserver)這個方式更適合分頁載入。

請注意,如果你每次都使用subscribe(new Subscriber< T>() {})方式實現訂閱,就不會出現上面的問題。

如下程式碼:

private void toSubscribe(Observable<Response<D>> observable) {
        observable.subscribeOn(Schedulers.io())
                .map(new Func1<Response<D>,D>() {

                    @Override
                    public D call(Response<D> response) {
                        if(response == null){
                            throw new ApiException(100);
                        }
                        totalPage = response.total;
                        return response.result;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<D>() {
                    @Override
                    public void onCompleted() {
                        executeOnLoadFinish();
                    }

                    @Override
                    public void onError(Throwable e) {
                        TLog.error("onError " + e.toString());
                        executeOnLoadDataError(null);
                    }

                    @Override
                    public void onNext(D d) {
                        TLog.log("onNext " );
                        List<T> list = d;
                        TLog.log("entity " + list.size());
                        executeOnLoadDataSuccess(list);

                        TLog.log("onSuccess totalPage " + totalPage);
                    }
                });
    }

當然,這個方式實現分頁載入也是可以的。至於哪個更好,還需要再驗證。

引用:

Subscriber是Observer介面的一個抽象實現;推薦使用Subscriber,實際上RxJava在subscibe過程中,會先將Observer轉換為一個Subscriber。