1. 程式人生 > >RxJava 執行緒切換原理

RxJava 執行緒切換原理

 

RxJava的執行緒切換主要涉及到

observeOn(),subscribeOn()

我們來分析一下這兩個方法是怎麼做到切換的。

observeOn()作用於上一個構造好的Observable例項,RxJava設計比較巧妙的地方是,把執行緒切換的操作也封裝成了Observable放在Observable subscribe()方法和Observer onNext()執行鏈路中。

先分析subscribeOn()

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

new 了一個ObservableSubscribeOn物件,同時傳入了當前節點作為previous 節點,scheduler作為要排程的排程器。

@Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

關鍵程式碼就在於subscribeActual()中,把observer封裝成一個Task後,呼叫了scheduler.scheduleDirect();

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

scheduleDirect()做的事情是把傳入的Runnable做了再次的封裝並交給了worker來處理。我們現在不去研究具體什麼樣的執行緒池。現在只看怎麼實現切換的。

剛才的執行緒排程發生在subscribeActual()方法中,但是我們知道subscribe()會不斷地遞歸向上溯源,一直追到原點ObservableCreate中,呼叫ObservableOnsubScribe類中的subscribe()方法,也就是說,subscirbeOn()會影響到subscribe()中資料來源產生的地方。如果這中間有多次執行subscribeOn()那麼,由於中間尚未吐出資料,所以subscribeOn()只有在最靠近起點的地方有效,後面的多次subscribeOn()都無效。

我們再分析一下ObserveOn(),先在現有的observable鏈後加入一個新的節點ObservableObserveOn(),

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

看看ObservableObserveOn中比較關鍵的幾個方法。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

可以看到其subscribeActual()方法中並未去做執行緒切換,只是構造了Scheduler.Worker,並把它傳到了自己的ObservableOnObsever物件的建構函式的引數中,可以猜想到這樣的目的是影響Observer中的onNext()執行所在的執行緒,這樣可以影響到資料流的處理。我們順著思路往下看:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        ...

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.worker = worker;
            ...
        }

        ...

        @Override
        public void onNext(T t) {
            ...
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            ...
            schedule();
        }

        @Override
        public void onComplete() {
            ...
            schedule();
        }

       ...

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
}

果然,在其onNext(),onComplete(),onError()中都呼叫到了schedule()方法,由於ObserveOnObserver實現了Runnable介面,直接在worker.schedule()中把當前物件的例項傳入。

下面以NewThreadWorker為例,分析一下丟給schedule做了一些什麼事情。

public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        ...
        return scheduleActual(action, delayTime, unit, null);
    }
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        ...

        Future<?> f;
        ...
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        ...
        return sr;
    }

有沒有一種很熟悉的感覺呢?

如果定時任務就交給了executor執行緒池的schedule方法處理,普通任務直接submit()。至此,提交給了具體的某個由Schedulers抽象類中的靜態工廠生成的某個執行緒池。

由於ObserveOn()作用於資料從源頭往下流淌的downstream過程,所以,攔截在這個過程中的執行緒切換就會生效,作用於當前任務中的onNext()等方法。

瞭解清楚了subscribeActual()的向上溯源和subscribe()後不斷onNext()向下責任鏈,就不難理清楚RxJava的執行緒切換原理了。

下面舉一個例子來說明一下:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello");
                System.out.println("Thread:"+Thread.currentThread().getName());
            }
        }).observeOn(Schedulers.computation())
                .subscribeOn(Schedulers.io())
                .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                System.out.println("Thread:"+Thread.currentThread().getName());
                return "abc-"+s;
            }
        }).observeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                System.out.println("Thread:"+Thread.currentThread().getName());
                return s+"-def";
            }
        }).observeOn(Schedulers.single())
                .subscribeOn(Schedulers.single())
                .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("Thread:"+Thread.currentThread().getName());
                System.out.println("result:"+s);
            }
        });

這裡面涉及到了多次執行緒切換,多次呼叫observeOn(),subscribeOn(),所以按照之前的分析,subscribeOn()只有離observable連結串列最原點的地方的切換才有效,所以這裡只有第一個subscribeOn()有效,也就說資料應該從Schedulers.io()執行緒裡面答應出來,後來的每次observeOn()切換都會改變onNext()也就是相應function()中的apply()所在執行執行緒,因為Function最後也會被封裝成observer。

我們看看列印結果和我們的分析是否一致:

Thread:RxCachedThreadScheduler-1
Thread:RxComputationThreadPool-1
Thread:RxNewThreadScheduler-1
Thread:RxSingleScheduler-1
result:abc-hello-def

這裡要說明的是Schedulers.io()所用到的ThreadFactory傳入的標籤是“RxCachedThreadScheduler”,所以會列印CachedThread。