RxJava2探索-執行緒切換原理之subscribeOn
前言
說起來有點丟人,上週去某公司面試,做足了什麼像java記憶體模型、hashmap原理、設計模式、Android多執行緒、自定義View等等相關的知識準備,然而面試的時候,前面幾個一個沒問!!!自定義view考察了onmeasure和Mnuspace那幾個mode以及touch事件傳遞等,我真想給自己一巴掌,居然把那幾個mode給忘了,只記得兩個還拼不出單詞。。。然後就問了RxJava,雖然沒有直接問執行緒切換原理,但是確實考察的就是執行緒切換的問題,我只能靠猜了,並且還是以RXJAVA2來問的,因為我的簡歷上寫著瞭解RXJAVA,不是精通,不是精通,不是精通。結果可想而知。
當然,我並不是說面試官有什麼不好的地方,只是恨自己學得還不夠多不夠深,不善於總結和回顧以前的技能,希望自己知恥而後勇吧。俗話說,吃一塹長一智,面試完回來之後,我就把他所問到的問題全部研究了一遍,希望以後不要再出現上一次的窘境了^_^。
tips:以下所有關於原始碼的部分都是基於RxJava2.1.15版本
一、為什麼要寫這個東西
你們肯定覺得我是為了寫部落格裝x用的,或者是為了把學到的知識記錄下來,以便於以後查閱。我告訴你們,都不是!今天在這裡寫這篇文章的原因很簡單:我看了很多圖文並茂的文章講述RXJAVA執行緒切換原理,可是呢,最後我竟然是自己翻原始碼才實實在在的理解了這個原理的。嘿嘿,對,我就是這個意思,他們寫得不夠好!今天,我就是要讓更多的人很快速的就理解RXJAVA執行緒切換的原理。
裝x完畢,開始一本正經吹牛x了,先盜個圖。
二、切換執行緒的那些事兒
2.1 什麼叫執行緒切換
從程式的角度舉個列子,作為一隻Android開發攻城獅,大家肯定都能猜到我要以什麼列子切入啦,沒錯,thread+handler更新UI,原始點的方式就是new一個Thread來執行網路請求(子執行緒),然後通過一個在主執行緒的handler傳送訊息到主執行緒更新UI(主執行緒),資料就從子執行緒切換到了主執行緒。Android裡還有一個經典的可以切換執行緒的東西就是AsyncTask了,這裡就不做額外展開了。
換個通俗易懂的例子,我有一片玉米地,玉米成熟了,我請了小明來幫我掰玉米,小明掰完玉米後我讓他幫我數一下一共有多少個,小明卻說,我沒讀過書,不會數數,只會掰玉米,沒辦法,我就請了擅長數學的小紅幫我數玉米,小紅數完玉米後,我想讓她幫我把玉米煮熟了拿來賣,可小紅又說了,我只會數玉米,不會煮玉米,我又請了會煮玉米的小剛來幫我煮玉米,然後自己賣玉米了。至此,掰玉米的事情是小明負責的(thread1),數玉米的事情是小紅做的(thread2),煮玉米的事情是小剛做的(thread3),他們彼此相互獨立卻又幫我完成了一整套事情並且效率實現了最大化。
2.2 執行緒切換有什麼用
我們再來看個例子,Android系統告訴你,不允許網路耗時任務發生在主執行緒哦,好嘛,那我就new一個Thread來執行網路請求嘛,等了半天,資料請求回來了,我開開開心心的把請求到的資料拿去渲染UI,結果,Android系統又告訴你,子執行緒不能更新UI哦。。。WTF???逗我玩呢麼?Android系統又說了,彆著急,我給你個小拖車,你把你在new Thread裡取到的東西放在小拖車裡,小拖車會來給我(主執行緒)的,大家開開心心,何樂而不為。大家不要噴我,我只是把Handler+Thread這種模式說得複雜了一些。。。
作為一隻Android開發攻城獅,這個套路大家肯定早就聊熟於心,閉著眼睛都能寫出來,但是,可能很多人像我一樣,並沒有深究其表示的意義。因為Android作業系統有自己的一些特性(多執行緒)和規則(不允許網路耗時任務發生在主執行緒、子執行緒不能更新UI等),我們不得不保持這些特性和遵守這些規則,在這些特性和規則的束縛下,執行緒切換就必不可少。
“執行緒切換的意義”,你們可以去google或是baidu一下,我目前是沒有搜尋到對應的符合標題的結果的,所以,要麼是這個問題太簡單,簡單到大家不需要問,都能理解;要麼就是這個問題本身就沒有意義,沒有必要去尋求一個定義。我本想,多執行緒由來已久,都沒有人去關注和定義執行緒切換的意義,那麼我就來開個先河吧(一本正經臉),可是我琢磨了好些天,我總是會被Android系統本身或RxJava所束縛,無法站在更高的地方去闡述其意義,所以,關於執行緒切換的意義,還是等待將來有哪位大神可以分享出來吧。
附上我用google搜尋“執行緒切換的意義”結果截圖
三、RxJava2的執行緒切換
如果還沒有RxJava2基礎的童鞋,請先去了解下RxJava2在來看下面的內容會更好哦
請各位往這邊走,我好推你們下去。。。
ps:如果你們不想去看,也沒關係,隨便整個工程,引入RxJava2依賴,跟著我的步伐,不停地反覆的戳進原始碼,或許會有另一種意想不到的效果。
3.1從一段簡單的程式碼說起
接下來,我們先看個RxJava2的subscribeOn方法切換執行緒的例子
1.不呼叫subscribeOn方法指定執行緒
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
System.out.println("ObservableOnSubscribe.subscribe thread : " + Thread.currentThread().getName());
observableEmitter.onNext(1);
observableEmitter.onComplete();
})//頂層Observable
.map(integer -> {
System.out.println("map thread : " + Thread.currentThread().getName());
return String.valueOf(integer);
})//第二個Observable
.filter(s -> {
System.out.println("filter thread : " + Thread.currentThread().getName());
return Integer.parseInt(s) > 0;
})//第三個Observable
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread : " + Thread.currentThread().getName());
}
});
此時的輸出結果如下:
onSubscribe thread : main
ObservableOnSubscribe.subscribe thread : main
map thread : main
filter thread : main
onNext thread : main
onComplete thread : main
2.呼叫一次subscribeOn方法指定執行緒
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
System.out.println("ObservableOnSubscribe.subscribe thread : " + Thread.currentThread().getName());
observableEmitter.onNext(1);
observableEmitter.onComplete();
})//頂層Observable
.subscribeOn(Schedulers.io())//第一次subscribeOn
.map(integer -> {
System.out.println("map thread : " + Thread.currentThread().getName());
return String.valueOf(integer);
})//第二個Observable
.filter(s -> {
System.out.println("filter thread : " + Thread.currentThread().getName());
return Integer.parseInt(s) > 0;
})//第三個Observable
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread : " + Thread.currentThread().getName());
}
});
此時的輸出結果如下:
onSubscribe thread : main
ObservableOnSubscribe.subscribe thread : RxCachedThreadScheduler-1
map thread : RxCachedThreadScheduler-1
filter thread : RxCachedThreadScheduler-1
onNext thread : RxCachedThreadScheduler-1
onComplete thread : RxCachedThreadScheduler-1
3.呼叫兩次subscribeOn方法指定執行緒
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
System.out.println("ObservableOnSubscribe.subscribe thread : " + Thread.currentThread().getName());
observableEmitter.onNext(1);
observableEmitter.onComplete();
})//頂層Observable
.subscribeOn(Schedulers.io())//第一次subscribeOn
.map(integer -> {
System.out.println("map thread : " + Thread.currentThread().getName());
return String.valueOf(integer);
})//第二個Observable
.subscribeOn(Schedulers.newThread())//第二次subscribeOn
.filter(s -> {
System.out.println("filter thread : " + Thread.currentThread().getName());
return Integer.parseInt(s) > 0;
})//第三個Observable
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("filter thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread : " + Thread.currentThread().getName());
}
});
此時的輸出結果如下:
onSubscribe thread : main
ObservableOnSubscribe.subscribe thread : RxCachedThreadScheduler-1
map thread : RxCachedThreadScheduler-1
filter thread : RxCachedThreadScheduler-1
onNext thread : RxCachedThreadScheduler-1
onComplete thread : RxCachedThreadScheduler-1
從log列印日誌可以看到,呼叫subscribeOn方法後,在他之前的和在他之後的程式碼執行的執行緒都是subscribeOn指定的執行緒——RxCachedThreadScheduler-1(onSubscribe方法除外,後面會說明原因),並且是第一次呼叫subscribeOn方法指定的io執行緒,那麼這裡會先有一個結論:在這個鏈式呼叫結構中,無論你呼叫多少次subscribeOn去指定Observable的工作執行緒,總是以第一次呼叫subscribeOn時指定的執行緒為Observable的工作執行緒。關於這個結論,將是我們接下來討論的重點。
現在把這個事情分為兩個問題:
1.subscribeOn方法是如何做到執行緒切換的?
2.為什麼只有第一次呼叫subscribeOn方法指定的執行緒才有效?
那麼接下來,請帶著問題跟我上車啦。
3.2 從RTFSC說起
搞Android的,甚至可以說是搞軟體的,RTFSC (Read the fucking source code —— Linus)才是生活中最重要的。我們天天就是要讀懂別人的,理解別人的,然後再模仿別人的,最後才是創新自己的。人生大半的時間是在學習,所以我們一定要RTFSC。
在開始看原始碼之前,我門先看看要講的內容會涉及到的類的類圖:
可能有不對的方,還望大家斧正,感激不盡。
important:這個類圖一定要有個大概印象,不然後面你就會把頁面滾上來滾下去的,不方便思維連續,或者你可以把這個圖片存下來放到一邊,以便隨時看一眼,後面的內容就很好理解了。
關於Runnable介面:這裡寫出來是為了讓大家很清楚,Schedule類下面的Worker的schedule方法的引數就是需要一個Runnable,然鵝,ObservableSubscribeOn類裡的SubscribeTask恰好又實現了Runnable介面,所以大家可以很容易的就猜到Schedule的scheduleDirect方法實際上最終就是會呼叫SubscribeTask的run方法。,我們先記住這個結論。
接下來,我們開始解決前面說到的第一個問題:subscribeOn方法是如何做到執行緒切換的?
3.3 subscribeOn方法切換執行緒的藝術
要看到執行緒切換,首先,我們先得有個Observable啊,所以我們先把Observable創建出來。
3.3.1 Observable建立的過程
我們以上面只調用了一次subscribeOn方法的列子為基礎,去掉filter和map操作,簡化後如下
Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
System.out.println("ObservableOnSubscribe.subscribe thread : " + Thread.currentThread().getName());
observableEmitter.onNext("1");
observableEmitter.onComplete();
})//頂層Observable
.subscribeOn(Schedulers.io())//第一次subscribeOn
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread : " + Thread.currentThread().getName());
}
});
首先,我們先看Observable.create(ObservableOnSubscribe)方法
@CheckReturnValue
@SchedulerSupport("none")
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}
簡化無關程式碼後,我們只關注
new ObservableCreate(source)
這句程式碼,ObservableOnSubscribe使我們new的一個內部內。然後繼續往下
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//...省略無關程式碼
}
這裡沒什麼,就是把我們new的ObservableOnSubscribe傳遞給了ObservableCreate,這個時候,我們手裡持有的物件就是一個Observable。
—-> 其次,我們得呼叫一次subscribeOn方法
@CheckReturnValue
@SchedulerSupport("custom")
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
簡化無關程式碼後,我們只關注
new ObservableSubscribeOn(this, scheduler)
這句程式碼,schedule在這裡是我們傳遞的一個Schedules.io,this的話當然是指的當前物件啦——剛才我們建立的一個Observable。接下來我們繼續看
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}
ObservableSource,如果還記得我剛才的類圖,你應該知道,Observable實現了ObservableSource這個介面的,所以這裡的source是指我們剛才建立的那個Observable。這裡同時持有了Schedule的引用,這個後面會用到。
到了這裡,請大家一定要很清楚一個問題,那就是subscribeOn方法返回了一個新的Observable,而這個新的Observable裡面持有一個上一層Observable的引用。那個引用就是source。清楚了這個問題,我們磁能在後面一步豁然開朗。
—-> 最後,我們得呼叫一次Observable的subscribe方法
Observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread : " + Thread.currentThread().getName());
}
});
現在,大家能不能立馬回答我,這裡的subscribe方法是屬於哪個Observable的?對,是屬於subscribeOn方法返回的那個Observable的。明白了這一點,那麼我們就要去看Observable的subscribe方法了。
3.3.2 Observable訂閱的過程
Observable.subscribe(Observer)
@SchedulerSupport("none")
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//真正訂閱的地方
this.subscribeActual(observer);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
throw npe;
}
}
簡化無關程式碼後,我們只需關注subscribeActual方法,但是,還記得我類圖的同學不知道有沒有注意到,subscribeActual是一個抽象方法,所以,我們要去找他的實現類啦。
這裡又要回到我提醒大家關注的問題,當前的Observable是誰?對,是屬於subscribeOn方法返回的那個Observable。我們再次戳進
subscribeOn方法
@CheckReturnValue
@SchedulerSupport("custom")
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
這裡return的是一個ObservableSubscribeOn,不用猜,ObservableSubscribeOn肯定是繼承了Observable的,我類圖裡已經畫的很清楚了,只不過中間多了一層而已,那我們就直接看ObservableSubscribeOn的subscribeActual方法
public void subscribeActual(Observer<? super T> s) {
ObservableSubscribeOn.SubscribeOnObserver<T> parent = new ObservableSubscribeOn.SubscribeOnObserver(s);
s.onSubscribe(parent);
parent.setDisposable(this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent)));
}
重頭戲來了,s.onSubscribe(parent)方法在這裡居然就被呼叫了!!!s是誰?就是我們new的那個Observer。我前面說到過這句話:呼叫subscribeOn方法後,在他之前的和在他之後的程式碼執行的執行緒都是subscribeOn指定的執行緒——RxCachedThreadScheduler-1(onSubscribe方法除外,後面會說明原因),這裡就是原因。此時的Observable的subscribe方法發生在當前執行緒,所以Observer的onSubscribe方法的執行執行緒和當前呼叫Observable的subscribe方法的執行緒一致!!!
在這裡,還有一件事情正悄悄醞釀著,那就是
this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent))
3.3.3 切換執行緒的點點滴滴
請容我深呼一口氣。。。
important:因為這裡其實要涉及到java.util.concurrent包下關於執行緒池的很多類,比如Executors、ScheduledExecutorService等等等等,關於這部分的知識我也沒有去仔細研究過,所以在這裡我不能做展開。大家只需要知道,這裡使用這些工具類就是為了提供一個Thread來執行ObservableSubscribeOn.SubscribeTask。我後面一定會去研究執行緒池相關的知識,然後再快來分享給大家。
通過前面的說明和檢視類圖大家可以知道,ObservableSubscribeOn.SubscribeTask是一個Runnable,他的run方法如下
public void run() {
ObservableSubscribeOn.this.source.subscribe(this.parent);
}
這裡面的ObservableSubscribeOn.this.source是誰?就是我們建立的第一個Observable,ObservableCreate。那麼,這個run方法會在哪裡被呼叫呢?我們回顧到上一步我說的有件事情正在悄悄醞釀著那裡,即ObservableSubscribeOn的subscribeActual方法裡:
this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent))
這裡的schedule在本列中是IoSchedule,在類圖裡也有的,他的scheduleDirect(Runnable)方法是在Schedule裡實現的,只是把Worke的建立放到了IoSchedule(這是什麼設計模式???)。Schedule的scheduleDirect(Runnable)方法最終呼叫了Schedule的scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit)方法。
在這裡請大家開始記住一個關鍵點,現在傳遞的Runnable就是ObservableSubscribeOn.SubscribeTask
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
Scheduler.Worker w = this.createWorker();
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
Scheduler.DisposeTask task = new Scheduler.DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
省略無關程式碼後,我們只關注w.schedule(task, delay, unit)方法。剛才說了,createWorker是子類實現的,所以我們直接戳進IoSchedule的createWorke方法
@NonNull
public Worker createWorker() {
return new IoScheduler.EventLoopWorker((IoScheduler.CachedWorkerPool)this.pool.get());
}
這裡通過IoScheduler.CachedWorkerPool構造了一個IoScheduler.EventLoopWorker,IoScheduler.CachedWorkerPool提供了一個什麼呢?一個IoScheduler.ThreadWorker,而IoScheduler.ThreadWorker繼承至NewThreadWorker。這幾個類的關係在類圖裡都有的,大家看一下就明白了。
那麼很明顯,我們接下來就要看的是IoScheduler.EventLoopWorker的schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit)方法
@NonNull
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
return (Disposable)(this.tasks.isDisposed() ? EmptyDisposable.INSTANCE : this.threadWorker.scheduleActual(action, delayTime, unit, this.tasks));
}
this.threadWorker就是剛才我們說的IoScheduler.CachedWorkerPool提供給IoScheduler.EventLoopWorker的,為了怕大家不理解,我們還是看一下IoScheduler.EventLoopWorker的變數和構造方法
static final class EventLoopWorker extends Worker {
private final CompositeDisposable tasks;
private final IoScheduler.CachedWorkerPool pool;
private final IoScheduler.ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(IoScheduler.CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
}
到了這裡,如果大家要追溯threadWorker的建立,就可以從 this.threadWorker = pool.get()這句程式碼入手啦,這後面就是關於執行緒池的內容了,我們暫且打住。。。
現在我們繼續檢視IoScheduler.ThreadWorker的scheduleActual(action, delayTime, unit, this.tasks)方法,這個方法是由他的父類NewThreadWorker處理的
@NonNull
public ScheduledRunnable scheduleActual(Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null && !parent.add(sr)) {
return sr;
} else {
try {
Object f;
if (delayTime <= 0L) {
f = this.executor.submit(sr);
} else {
f = this.executor.schedule(sr, delayTime, unit);
}
sr.setFuture((Future)f);
} catch (RejectedExecutionException var10) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(var10);
}
return sr;
}
}
這裡就只需要看his.executor.submit(sr)方法和this.executor.schedule(sr, delayTime, unit)方法,其實你戳進去看的話,最後都呼叫了ScheduledThreadPoolExecutor的schedule(Runnable command,long delay,TimeUnit unit)方法,因為NewThreadWorker的executor就是ScheduledExecutorService,也就是ScheduledThreadPoolExecutor 。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
}
總之呢,到這裡的時候,雖然還封裝了一個ScheduledRunnable ,不過最後ScheduledRunnable的run方法還是呼叫了scheduleActual方法傳入的Runnable的run方法(這又是什麼設計模式???)。現在就相當於
new Thread(ObservableSubscribeOn.SubscribeTask).start();
迅速的回過去看一眼ObservableSubscribeOn.SubscribeTask的run方法
ObservableSubscribeOn.this.source.subscribe(this.parent);
大聲的告訴我,source是誰?是不是我們的ObservableCreate???他的subscribe方法會呼叫他的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer);
observer.onSubscribe(parent);
try {
this.source.subscribe(parent);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
parent.onError(var4);
}
}
這裡的this.source又是誰???是不是我們在Observable.create時傳入的ObservableOnSubscribe???然後 this.source.subscribe(parent)不就是我們new的那個ObservableOnSubscribe的subscribe方法,我們在裡面傳送資料啦。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
System.out.println("ObservableOnSubscribe.subscribe thread : " + Thread.currentThread().getName());
observableEmitter.onNext(1);
observableEmitter.onComplete();
}
})
是誰在傳送呢?是不是ObservableCreate.CreateEmitter在傳送資料?我們接著看看ObservableCreate.CreateEmitter的部分原始碼
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
public void onNext(T t) {
if (t == null) {
this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
if (!this.isDisposed()) {
this.observer.onNext(t);
}
}
}
public void onError(Throwable t) {
if (!this.tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public void onComplete() {
if (!this.isDisposed()) {
try {
this.observer.onComplete();
} finally {
this.dispose();
}
}
}
}
它裡面有一個Observer物件,這個物件哪裡來的?就是subscribeOn方法返回的那個Observable——ObservableSubscribeOn裡的ObservableSubscribeOn.SubscribeOnObserver,我們再看ObservableSubscribeOn的subscribeActual方法
public void subscribeActual(Observer<? super T> s) {
ObservableSubscribeOn.SubscribeOnObserver<T> parent = new ObservableSubscribeOn.SubscribeOnObserver(s);
s.onSubscribe(parent);
parent.setDisposable(this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent)));
}
再看看ObservableSubscribeOn.SubscribeOnObserver的實現
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference();
}
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
public void onNext(T t) {
this.actual.onNext(t);
}
public void onError(Throwable t) {
this.actual.onError(t);
}
public void onComplete() {
this.actual.onComplete();
}
}
ObservableSubscribeOn.SubscribeOnObserver裡面也有個Observer——actual,這個actual是我們在最外面new的那個Observer。那麼這個時候就清楚了,如果ObservableCreate.CreateEmitter呼叫一次onNext方法,那麼後續的執行邏輯就是
ObservableCreate.CreateEmitter.onNext—> ObservableSubscribeOn.SubscribeOnObserver.onNext—->最外面new的那個Observer.onNext
那麼onComplete和onError方法也是一樣的,就不在贅述了。那麼現在大家可以開始記住下面這個結論:ObservableCreate.CreateEmitter.onNext方法是在ObservableSubscribeOn.SubscribeTask的run方法裡被呼叫的,剛才也說了,經過各種高深複雜的方式把ObservableSubscribeOn.SubscribeTask放到了一個新的Thread(Schedules.io)裡面去執行,那麼從ObservableCreate.CreateEmitter.onNext方法開始,後續的執行邏輯就也都在一個新的Thread(我們指定的Schedules.io)裡面去了,後續我們也沒有再切換執行緒,所以後續程式碼的工作執行緒都是我們指定的Schedules.io。至此,執行緒切換的原理就清楚了。
可能很多童鞋看到這裡也如同我剛開始看別人的文章一樣的感覺
不過,如果你願意順著我的思路去多戳幾下原始碼,順便動動筆自己畫點圖,我相信你肯定可以比我描述的更好,更重要的是,你肯定能記得非常深刻。
好了,我們還有一個問題沒有解決,請允許我再吹幾分鐘的牛x。。。我有個建議,大家可以先不往下看,先把這上面的邏輯理清楚了,後面部分你們根本不用看我的講解了,真的,騙你是小狗,汪汪汪。。。
3.4 subscribeOn第一次有效原理
3.4.1 來一段非常非常simple的程式碼
先定義一個列印類,它內部可以持有一個其他的列印類source,在他的列印方法裡,如果source不為空,會開啟新執行緒執行source的列印方法,如果source為空,則不開啟新執行緒執行,直接開始列印當前執行緒名稱
private static class Printer {
Printer source;
String name;
Printer(Printer source, String name) {
this.name=name;
this.source = source;
}
void print() {
if(source==null){
System.out.println(name +"-"+ Thread.currentThread().getName());
return;
}
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(name +"-"+ Thread.currentThread().getName());
if (null != source) {
source.print();
}
}
}).start();
}
}
然後我們在main方法裡執行如下程式碼
public static void main(String args[]) {
final Printer printer1 = new Printer(null,"printer1");
final Printer printer2 = new Printer(printer1,"printer2");
final Printer printer3 = new Printer(printer2,"printer3");
final Printer printer4 = new Printer(printer3,"printer4");
printer4.print();
}
輸出結果如下
printer4-Thread-0
printer3-Thread-1
printer2-Thread-2
printer1-Thread-2
如果你清楚了這裡要表示的這個原理,你可能就已經猜到為什麼subscribeOn方法只有第一次指定執行緒的地方是有效的,我就不用費口舌了,文章到此結束。。。不過,那是不可能的,說好了要再裝幾分鐘的x,怎麼可能這麼快。產品經理快扶我一把,雖然手斷了,但我還要擼程式碼。
言歸正傳,這個原理是什麼?對於任意一個Printer而言,不管外面包裹了多少個新的Printer去呼叫他的print方法,他的print方法裡的執行語句的工作執行緒永遠都是他下游的第一個Printer的print方法裡new的那個Thread,因為第一個Printer的source為空,print方法裡的輸出語句就沒有在被別的Thread包裹了。類似如下程式碼
new Thread(new Runnable() {
@Override
public void run() {
new Thread(new Runnable() {
@Override
public void run() {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
},"thread3").start();
}
},"thread2").start();
}
},"thread1").start();
Thread.currentThread().getName()的結果永遠是thread3。這裡程式碼執行順序和RxJava實際的順序剛好相反,因為RxJava是逆向向上呼叫的,大家注意區分就好了。總之就是,第一個Printer的print方法的執行執行緒,只能是被他下游的Printer控制,到了這段程式碼裡,那句輸出語句打印出的執行緒名稱,肯定是他外層Thread的名稱。的清楚了這一點,我們就可以愉快的繼續往下講了。
3.4.2 Observable多次subscribeOn的流程類比
我們用一個例子來解釋。首先,定義一個IPrinter介面和一個IPaper介面
interface IPrinter {
void subscribe(IPaper paper);
void preparePrint(IPaper paper);
void print(IPaper paper);
}
interface IPaper {
void show();
}
其次,定義一個Printer類來實現IPrinter介面
private static class Printer implements IPrinter {
private IPrinter source;
private String name;
Printer(IPrinter source, String name) {
this.source = source;
this.name = name;
}
@Override
public void subscribe(IPaper paper) {
//這個地方非常關鍵
IPaper parent = new Paper(paper, name);
preparePrint(parent);
}
@Override
public void preparePrint(IPaper paper) {
System.out.println(name + " preparePrint on " +
Thread.currentThread().getName());
if (null == source) {
print(paper);
return;
}
new Thread(new Runnable() {
@Override
public void run() {
if (null != source) {
source.subscribe(paper);
}
}
}).start();
}
@Override
public void print(IPaper paper) {
System.out.println(name + " start print work on " +
Thread.currentThread().getName());
paper.show("哈哈哈哈哈");
}
}
再定義一個Paper類實現IPaper介面
private static class Paper implements IPaper {
private IPaper actual;
private String printerName;
Paper(IPaper actual,String printerName) {
this.printerName=printerName;
this.actual = actual;
}
@Override
public void show(String content) {
System.out.println( printerName+" print on paper , and work on " +
Thread.currentThread().getName());
actual.show(content);
}
}
最後在main方法呼叫
public static void main(String args[]) {
final IPrinter printer1 = new Printer(null, "printer1");
final IPrinter printer2 = new Printer(printer1, "printer2");
final IPrinter printer3 = new Printer(printer2, "printer3");
final IPrinter printer4 = new Printer(printer3, "printer4");
printer4.subscribe(new IPaper() {
@Override
public void show(String content) {
System.out.println(content + " show on " +
Thread.currentThread().getName());
}
});
}
輸出結果如下
printer4 preparePrint on main//onSubscribe執行緒-main
printer3 preparePrint on Thread-0//printer4 new的
printer2 preparePrint on Thread-1//printer3 new的
printer1 preparePrint on Thread-2//printer2 new的
printer1 start print work on Thread-2
printer1 print on paper , and work on Thread-2
printer2 print on paper , and work on Thread-2
printer3 print on paper , and work on Thread-2
printer4 print on paper , and work on Thread-2
哈哈哈哈哈 show on Thread-2
程式碼你們可以直接考出去執行的,看看我有沒有說錯。subscribeOn之所以只有第一次呼叫才有效,就是利用的類似這個demo展示的原理。頂層Observer傳送資料的執行緒永遠是第一次呼叫subscribeOn時指定的執行緒,因為資料的發射流程過程中,我們再也沒有去切換過執行緒了,所以這其實很好理解的吧。
就下面這個段程式碼做個簡單類比分析
Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
System.out.println("ObservableOnSubscribe.subscribe thread : " + Thread.currentThread().getName());
observableEmitter.onNext("1");
observableEmitter.onComplete();
})//頂層Observable
.subscribeOn(Schedulers.io())//第一次subscribeOn
.subscribeOn(Schedulers.newThread())//第二次subscribeOn
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("filter thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread : " + Thread.currentThread().getName());
}
});
根據3.3節講的subscribeOn切換執行緒的原理和3.4.1提到的那個原理以及上一個demo我們可以知道,一但一個Observable呼叫了一次subscribeOn方法後,後續無論你再呼叫多少次subscribeOn方法都無法在改變Observable發射資料的執行緒了。
我們簡化來看,我們呼叫了兩次subscribeOn方法——subscribeOn(Schedulers.io())、subscribeOn(Schedulers.newThread()),我們就類比成printer1(create)—>printer2(subscribeOn(Schedulers.io()))—->print3(subscribeOn(Schedulers.newThread()))就很好理解了。
四、結語
寫了這麼多,也不知道大家懂了沒,雖然我開頭寫的很霸氣,我要寫的比別人好怎麼怎麼的,可是寫到這裡的時候,我才發現,這個東西自己理解起來感覺很簡單,但是要描述清楚卻是很難。不怕你們笑話,寫到這裡我足足用了三天時間,不停地戳原始碼、揣摩、舉例、重寫,深怕給大家講錯了,深怕大家不理解。結果就是這個樣子了,我只能說,我盡力了^_^我還要繼續擼點程式碼養家餬口。
其實大家明白三個點就可以了
1.Observable subscribe的時候是從下游往上游執行的,只是在這個過程中,使用subscribeOn去切指定了頂層Observable的發射執行緒;
2.頂層Observable傳送資料的時候是從上游往下游的,在這個過程中,發射資料的執行緒已經被subscribeOn指定過了,這個過程本身不會主動去切換執行緒,所以資料發射和傳遞的所有工作執行緒都是同一個(onSubscribe除外,他也不屬於發射流程)。
3.訂閱和發射的過程,在思想設計上,是一個圓環,只是在這個圓環閉合的過程中,你可以把它的某些部分的構建過程交給不同的廠家去完成;在程式碼層面,它又是一根無限接近直線的一條線,僅此而已。
其實我應該把這個部分的知識分成三個部分來講的,第一部分將創與訂閱的流程,第二部分講subscribeOn切換執行緒的原理,第三部分講observerOn切換執行緒的原理,可是我並不想這樣做,我只分了兩部分,這是第一部分,還有下一部分就是講ObservOn切換下游執行緒的原理。不要問我為什麼這麼分,原因非常非常的簡單,因為我懶啊!!因為我懶啊!!因為我懶啊!!!