RxJava 執行緒切換原理
RxJava的執行緒切換主要涉及到
observeOn(),subscribeOn()
我們來分析一下這兩個方法是怎麼做到切換的。
observeOn()作用於上一個構造好的Observable例項,RxJava設計比較巧妙的地方是,把執行緒切換的操作也封裝成了Observable放在Observable subscribe()方法和Observer onNext()執行鏈路中。
先分析subscribeOn()
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
new 了一個ObservableSubscribeOn物件,同時傳入了當前節點作為previous 節點,scheduler作為要排程的排程器。
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
關鍵程式碼就在於subscribeActual()中,把observer封裝成一個Task後,呼叫了scheduler.scheduleDirect();
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
scheduleDirect()做的事情是把傳入的Runnable做了再次的封裝並交給了worker來處理。我們現在不去研究具體什麼樣的執行緒池。現在只看怎麼實現切換的。
剛才的執行緒排程發生在subscribeActual()方法中,但是我們知道subscribe()會不斷地遞歸向上溯源,一直追到原點ObservableCreate中,呼叫ObservableOnsubScribe類中的subscribe()方法,也就是說,subscirbeOn()會影響到subscribe()中資料來源產生的地方。如果這中間有多次執行subscribeOn()那麼,由於中間尚未吐出資料,所以subscribeOn()只有在最靠近起點的地方有效,後面的多次subscribeOn()都無效。
我們再分析一下ObserveOn(),先在現有的observable鏈後加入一個新的節點ObservableObserveOn(),
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
看看ObservableObserveOn中比較關鍵的幾個方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
可以看到其subscribeActual()方法中並未去做執行緒切換,只是構造了Scheduler.Worker,並把它傳到了自己的ObservableOnObsever物件的建構函式的引數中,可以猜想到這樣的目的是影響Observer中的onNext()執行所在的執行緒,這樣可以影響到資料流的處理。我們順著思路往下看:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
...
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.worker = worker;
...
}
...
@Override
public void onNext(T t) {
...
schedule();
}
@Override
public void onError(Throwable t) {
...
schedule();
}
@Override
public void onComplete() {
...
schedule();
}
...
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
}
果然,在其onNext(),onComplete(),onError()中都呼叫到了schedule()方法,由於ObserveOnObserver實現了Runnable介面,直接在worker.schedule()中把當前物件的例項傳入。
下面以NewThreadWorker為例,分析一下丟給schedule做了一些什麼事情。
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
...
return scheduleActual(action, delayTime, unit, null);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
...
Future<?> f;
...
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
...
return sr;
}
有沒有一種很熟悉的感覺呢?
如果定時任務就交給了executor執行緒池的schedule方法處理,普通任務直接submit()。至此,提交給了具體的某個由Schedulers抽象類中的靜態工廠生成的某個執行緒池。
由於ObserveOn()作用於資料從源頭往下流淌的downstream過程,所以,攔截在這個過程中的執行緒切換就會生效,作用於當前任務中的onNext()等方法。
瞭解清楚了subscribeActual()的向上溯源和subscribe()後不斷onNext()向下責任鏈,就不難理清楚RxJava的執行緒切換原理了。
下面舉一個例子來說明一下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
System.out.println("Thread:"+Thread.currentThread().getName());
}
}).observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
System.out.println("Thread:"+Thread.currentThread().getName());
return "abc-"+s;
}
}).observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
System.out.println("Thread:"+Thread.currentThread().getName());
return s+"-def";
}
}).observeOn(Schedulers.single())
.subscribeOn(Schedulers.single())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("Thread:"+Thread.currentThread().getName());
System.out.println("result:"+s);
}
});
這裡面涉及到了多次執行緒切換,多次呼叫observeOn(),subscribeOn(),所以按照之前的分析,subscribeOn()只有離observable連結串列最原點的地方的切換才有效,所以這裡只有第一個subscribeOn()有效,也就說資料應該從Schedulers.io()執行緒裡面答應出來,後來的每次observeOn()切換都會改變onNext()也就是相應function()中的apply()所在執行執行緒,因為Function最後也會被封裝成observer。
我們看看列印結果和我們的分析是否一致:
Thread:RxCachedThreadScheduler-1
Thread:RxComputationThreadPool-1
Thread:RxNewThreadScheduler-1
Thread:RxSingleScheduler-1
result:abc-hello-def
這裡要說明的是Schedulers.io()所用到的ThreadFactory傳入的標籤是“RxCachedThreadScheduler”,所以會列印CachedThread。