RxJava之三—— observeOn()與subscribeOn()的詳解
你也可以檢視我的其他同類文章,也會讓你有一定的收貨!
為什麼多次呼叫subscribeOn()卻只有第一個起作用?
為什麼多次呼叫observeOn()卻可以切換到不同執行緒
observeOn()後能不能再次呼叫subscribeOn()?
如果你有這些疑問,那接下來的內容必定能解決你心頭的疑惑
subscribeOn()和observeOn()的區別
subscribeOn()和observeOn()都是用來切換執行緒用的
- subscribeOn()改變呼叫它之前程式碼的執行緒
- observeOn()改變呼叫它之後程式碼的執行緒
一、subscribeOn()
在講解他的原理之前,先來一個簡單的例子,有個感性認識,學起來更輕鬆
先說結論:subscribeOn 作用於該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的建立操作符總是被其之後最近的 subscribeOn 控制 。沒看懂不要緊,看下面程式碼和圖你就懂了。
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
結果如下:
- 1
- 2
- 3
- 4
- 5
- 1
- 2
- 3
- 4
- 5
3號框中的.doOnSubscribe(() -> threadInfo(“.doOnSubscribe()-2”)) 的之後由於沒有subscribeOn操作符所以回撥到該段程式碼被呼叫的執行緒(即主執行緒)
由於 subscribe 之前 沒有 使用observeOn 指定Scheduler,所以.onNext()的執行緒是和OnSubscribe.call()使用相同的Scheduler 。
下面通過原始碼來分析一下:
1、示例程式碼:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
執行如下:
- 1
- 2
- 1
- 2
2、subscribeOn()原始碼
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
很明顯,會走if之外的方法。
在這裡我們可以看到,又建立了一個OperatorSubscribeOn物件,但建立時傳入的引數為OperatorSubscribeOn(this,scheduler),我們看一下此物件以及其對應的構造方法
3、create()的原始碼:
- 1
- 2
- 3
- 1
- 2
- 3
我們看到這個方法,使用OperatorSubscribeOn這個類,來建立一個新的Observable,那就把它叫做Observable_2,把原來的Observable叫做Observable_1
4、OperatorSubscribeOn類的原始碼:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- OperatorSubscribeOn類implements 了Onsubscribe介面,並實現call()方法
- OperatorSubscribeOn的構造方法,
- 儲存了Observable物件,就是呼叫了subscribeOn()方法的Observable物件
- 並儲存了Scheduler物件。
這裡做個總結。
把Observable.create()建立的稱之為Observable_1,OnSubscribe_1。
把subscribeOn()建立的稱之為Observable_2,OnSubscribe_2
-
Observable_1, OnSubscribe_1是由示例程式碼的第1、2行建立的
-
OperatorSubscribeOn類implements Onsubscribe類,所以可以當做Onsubscribe類使用。(OnSubscribe_2)
-
並且OnSubscribe_2中儲存了Observable_1的應用,即source。(在OperatorSubscribeOn原始碼的第8行)
-
在subscribeOn()原始碼的倒數第二行,
create(new OperatorSubscribeOn<T>(this, scheduler))
返回新建立的Observable_2物件。
4.1、分析call()方法。
- inner.schedule()改變了執行緒,此時Action的call()在我們指定的執行緒中執行。
- 示例程式碼的Subscriber被包裝了一層,賦給物件S(Subscriber_2)。見上面程式碼21行。
- source.unsafeSubscribe(s);,
- 注意:source是Observable_1物件,這裡的s就是Subscriber_2
- 因為呼叫過subscribeOn(Schedulers.io())後,返回Observable_2物件,所以示例程式碼第13行程式碼的subscribe()就是Observable_2.subscribe(),也就是執行OnSubscribe_2的call()方法(即OperatorSubscribeOn類的原始碼的第12行)。
4.2 看一下source.unsafeSubscribe(s);(第65行)程式碼都做了什麼
這裡的source就是Observable_1,s是Subscriber_2
unsafeSubscribe()原始碼:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
關鍵程式碼:
- 1
- 1
該方法即呼叫了OnSubscribe_1.call()方法。注意,此時的call()方法在我們指定的執行緒中執行。那麼就起到了改變執行緒的作用。
對於以上執行緒,我們可以總結,其有如下流程:
-
Observable.create() : 建立了Observable_1和OnSubscribe_1;
-
subscribeOn(): 建立Observable_2和OperatorSubscribeOn(OnSubscribe_2),同時OperatorSubscribeOn儲存了Observable_1的引用。
-
示例程式碼中的subscribe(Observer) 實際上就是呼叫Observable_2.subscribe(Observer):
- 呼叫OperatorSubscribeOn的call()。call()改變了執行緒的執行,並且呼叫了Observable_1.unsafeSubscribe(s);
- Observable_1.unsafeSubscribe(s);,該方法的實現中呼叫了OnSubscribe_1的call()。
這樣就實現了在指定執行緒執行OnSubscribe的call()函式,無論我們的subscribeOn()放在哪裡,他改變的是subscribe()的過程,而不是onNext()的過程。
那麼如果有多個subscribeOn(),那麼執行緒會怎樣執行呢。如果按照我們的邏輯,有以下程式
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
那麼,我們根據之前的原始碼分析其執行邏輯。
-
Observable.just(“ss”),建立Observable,OnSubscribe
-