1. 程式人生 > >Rxjava基本原理解析(四)

Rxjava基本原理解析(四)

    接著上一篇的分享模式,今天我們介紹和分析執行緒切換操作符subscribeOn以及其原始碼設計。

    Rxjava的一個最大優點之一就是靈活的執行緒切換,切換過程不影響整體鏈式邏輯流程,既方便又清新。為了對比,還是再次將一個操作符的圖放上:

結構圖

subscribeOn操作符用於切換事件源的執行緒,一般用在第一個observable的後面:

Observable.create((ObservableOnSubscribe<String>) e -> e.onNext("hello"))
       .subscribeOn(Schedulers.newThread())
       .subscribe(s -> System.out.println("onNext"));

    與上一篇create操作符使用例項程式碼少了很多,原因是使用鏈式寫法和lamda表示式。create操作符的事件源還是一個ObservableOnSubscribe物件,但是訂閱的不再是observer,而是一個Consumer,這個後面文章會講解。

    subscribeOn操作符需要傳一個scheduler用於指定執行緒,並建立一個被觀察者:ObservableSubscribeOn和一個觀察者SubscribeOnObserver。 ObservableSubscribeOn的source為上一個observable,最重的是其subscribeActual()方法,原始碼如下:

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}

     subscribeOn操作符是改變事件源的執行緒,即設定第一個observable.subscribe的執行緒。subscribeActual方法中不是直接呼叫source.subscribe(parent);而是通過傳入的scheduler呼叫。由於傳入的scheduler指定了新的執行緒,那麼scheduler內部會切換到指定執行緒然後呼叫source.subscribe(parent);實現切換事件源執行緒。

observable

    下面來看看scheduler內部是如何實現執行緒切換的。本篇以Schedulers.newThread()為例進行分析,其他執行緒以及對比分析後面會單獨寫一篇。Scheduler通過入口方法scheduleDirect傳入一個runable介面,通過runable的run方法呼叫source.subscribe(parent);,scheduleDirect內部實際是呼叫如下的過載方法:

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);
        return w;
}

    該方法呼叫了createWorker()獲得一個Worker,然後通過呼叫worker的schedule方法來呼叫傳入的runable介面方法。createWorker是個抽象方法,需要子類重新。Schedulers.newThread()實際是個Scheduler子類物件NewThreadScheduler。該類重新了createWorker並得到一個Worker的子類NewThreadWorker。那麼真正的執行緒切換就是靠Worker來控制的。NewThreadWorker的schedule方法實際呼叫的是scheduleActual方法,原始碼如下:

public ScheduledRunnable scheduleActual(final Runnable run, ...) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
       //...
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } 
        catch (RejectedExecutionException ex) {
            //...
        }
        return sr;
}

最後會使用執行緒池executor.submit((Callable<Object>)sr);去執行傳入的runable介面方法。整體流程執行緒變換如下圖所示:

整體流程

注意以下兩點:

    1.根據ObservableSubscribeOn 的subscribeActual原始碼可知,s.onSubscribe(parent)線上程切換前執行,那麼onSubscribe不受subscribeOn執行緒切換影響,在原執行緒執行;

    2.在使用subscribeOn操作符切換到新執行緒後,上游的所有subscribe訂閱方法都會在新執行緒執行,一直穿透到事件源。如果整鏈式操作中,上游有新的subscribeOn操作符,那麼執行緒又會改變,因此多個subscribeOn後,只有第一個會對事件源執行緒有效。