RxJava之五—— observeOn()與subscribeOn()的詳解
你也可以檢視我的其他同類文章,也會讓你有一定的收貨!
為什麼多次呼叫subscribeOn()卻只有第一個起作用?
為什麼多次呼叫observeOn()卻可以切換到不同執行緒
observeOn()後能不能再次呼叫subscribeOn()?
如果你有這些疑問,那接下來的內容必定能解決你心頭的疑惑
subscribeOn()和observeOn()的區別
subscribeOn()和observeOn()都是用來切換執行緒用的
- subscribeOn()改變呼叫它之前程式碼的執行緒
- observeOn()改變呼叫它之後程式碼的執行緒
這裡給出下面示例中用到的兩個函式
//用指定的名稱新建一個執行緒
public static Scheduler getNamedScheduler(final String name) {
return Schedulers.from(Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(@android.support.annotation.NonNull Runnable runnable) {
return new Thread(runnable, name);
}
}));
}
//列印當前執行緒的名稱
public static void threadInfo(String caller) {
System.out.println(caller + " => " + Thread.currentThread().getName());
}
一、subscribeOn()
在講解他的原理之前,先來一個簡單的例子,有個感性認識,學起來更輕鬆
先說結論:subscribeOn 作用於該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
threadInfo("OnSubscribe.call()");
subscriber.onNext("RxJava");
}
})
.subscribeOn(getNamedScheduler("create之後的subscribeOn"))
.doOnSubscribe(() -> threadInfo(".doOnSubscribe()-1"))
.subscribeOn(getNamedScheduler("doOnSubscribe1之後的subscribeOn"))
.doOnSubscribe(() -> threadInfo(".doOnSubscribe()-2"))
.subscribe(s -> {
threadInfo(".onNext()");
System.out.println(s + "-onNext");
});
結果如下:
.doOnSubscribe()-2 => main
.doOnSubscribe()-1 => doOnSubscribe1之後的subscribeOn
OnSubscribe.call() => create之後的subscribeOn
.onNext() => create之後的subscribeOn
RxJava-onNext
3號框中的.doOnSubscribe(() -> threadInfo(“.doOnSubscribe()-2”)) 的之後由於沒有subscribeOn操作符所以回撥到該段程式碼被呼叫的執行緒(即主執行緒)
由於 subscribe 之前 沒有 使用observeOn 指定Scheduler,所以.onNext()的執行緒是和OnSubscribe.call()使用相同的Scheduler 。
下面通過原始碼來分析一下:
1、示例程式碼:
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String integer) {
System.out.println(integer);
}
});
執行如下:
a
b
2、subscribeOn()原始碼
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
很明顯,會走if之外的方法。
在這裡我們可以看到,又建立了一個OperatorSubscribeOn物件,但建立時傳入的引數為OperatorSubscribeOn(this,scheduler),我們看一下此物件以及其對應的構造方法
3、create()的原始碼:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
我們看到這個方法,使用OperatorSubscribeOn這個類,來建立一個新的Observable,那就把它叫做Observable_2,把原來的Observable叫做Observable_1
4、OperatorSubscribeOn類的原始碼:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
- OperatorSubscribeOn類implements 了Onsubscribe介面,並實現call()方法
- OperatorSubscribeOn的構造方法,
- 儲存了Observable物件,就是呼叫了subscribeOn()方法的Observable物件
- 並儲存了Scheduler物件。
這裡做個總結。
把Observable.create()建立的稱之為Observable_1,OnSubscribe_1。
把subscribeOn()建立的稱之為Observable_2,OnSubscribe_2
Observable_1是由示例程式碼的第1、2行建立的
OperatorSubscribeOn類是implements Onsubscribe介面的,所以可以當做Onsubscribe類使用。(OnSubscribe_2)
並且OnSubscribe_2中儲存了Observable_1的應用,即source。(在OperatorSubscribeOn原始碼的第8行)
在subscribeOn()原始碼的倒數第二行,
create(new OperatorSubscribeOn<T>(this, scheduler))
返回新建立的Observable_2物件。
4.1、分析call()方法。
- inner.schedule()改變了執行緒,此時Action的call()執行在指定的執行緒中。
- 把示例程式碼中的Subscriber包裝了一層,賦給物件S(Subscriber_2)。見上面程式碼21行。
- source.unsafeSubscribe(s);,
- 注意:source是Observable_1物件,這裡的s就是Subscriber_2
- 因為呼叫過subscribeOn(Schedulers.io())後,返回Observable_2物件,所以示例程式碼第13行程式碼的subscribe()就是Observable_2.subscribe(),也就是執行OnSubscribe_2的call()方法(即OperatorSubscribeOn類的原始碼的第12行)。
4.2 看一下source.unsafeSubscribe(s);(第65行)程式碼都做了什麼
這裡的source就是Observable_1,s是Subscriber_2
unsafeSubscribe()原始碼:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
關鍵程式碼:
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
該方法即呼叫了OnSubscribe_1.call()方法。
注意,此時的call()方法在我們指定的執行緒中執行。起到了改變執行緒的作用。
對於以上執行緒,我們可以總結,其有如下流程:
Observable.create() : 建立了Observable_1和OnSubscribe_1;
subscribeOn(): 建立Observable_2和OperatorSubscribeOn(OnSubscribe_2),同時OperatorSubscribeOn儲存了Observable_1的引用。
示例程式碼中的subscribe(Observer) 實際上就是呼叫Observable_2.subscribe(Observer):
- 呼叫OperatorSubscribeOn的call()。call()改變了執行緒的執行,並且呼叫了Observable_1.unsafeSubscribe(s);
- Observable_1.unsafeSubscribe(s);,該方法的實現中呼叫了OnSubscribe_1的call()。
這樣就實現了在指定執行緒執行OnSubscribe的call()函式,無論我們的subscribeOn()放在哪裡,他改變的是subscribe()的過程,而不是onNext()的過程。
那麼如果有多個subscribeOn(),那麼執行緒會怎樣執行呢。如果按照我們的邏輯,有以下程式
Observable.just("ss")
.subscribeOn(Schedulers.io()) // ----1---
.subscribeOn(Schedulers.newThread()) //----2----
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
那麼,我們根據之前的原始碼分析其執行邏輯。
Observable.just(“ss”),建立Observable,OnSubscribe
Observable_1.subscribeOn(Schedulers.io()):建立Observable_1,OperatorSubscribeOn_1並儲存Observable的引用。
Observable_2.subscribeOn(Schedulers.newThread()):建立Observable_2,OperatorSubscribeOn_2並儲存Observable_1的引用。
Observable_3.subscribe():
- 呼叫OperatorSubscribeOn_2.call(),改變執行緒為Schedulers.newThread()。
- 呼叫OperatorSubscribeOn_1.call(),改變執行緒為Schedulers.io()。
- 呼叫OnSubscribe.call(),此時call()執行在Schedulers.io()。
根據以上邏輯分析,會按照1的執行緒進行執行。
二、observeOn()
先說結論:observeOn作用於該操作符之後操作符直到出現新的observeOn操作符
舉個例子:
Observable.just("RxJava")
.observeOn(getNamedScheduler("map之前的observeOn"))
.map(s -> {
threadInfo(".map()-1");
return s + "-map1";
})
.map( s -> {
threadInfo(".map()-2");
return s + "-map2";
})
.observeOn(getNamedScheduler("subscribe之前的observeOn"))
.subscribe(s -> {
threadInfo(".onNext()");
System.out.println(s + "-onNext");
});
結果如下:
.map()-1 => map之前的observeOn
.map()-2 => map之前的observeOn
.onNext() => subscribe之前的observeOn
RxJava-map1-map2-onNext
下面通過原始碼來進行分析:
1、observeOn()原始碼
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
這裡引出了lift()函式
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
關於lift的詳細介紹,如果不明白lift的原理,參考這裡:RxJava 之二—— Lift()詳解
用OperatorObserveOn物件,建立OnSubscribeLift物件(實現了OnSubscribe介面),接著建立Observable物件。為了加以區分,這裡我們把OnSubscribeLift叫做OnSubscribe_2,Observable叫做Observable_2。
2、OperatorObserveOn程式碼:
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
public static <T> Operator<T, T> rebatch(final int n) {
return new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
parent.init();
return parent;
}
};
}
/** Observe through individual queue per observer. */
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final NotificationLite<T> on;
final boolean delayError;
final Queue<Object> queue;
/** The emission threshold that should trigger a replenishing request. */
final int limit;
// the status of the current stream
volatile boolean finished;
final AtomicLong requested = new AtomicLong();
final AtomicLong counter = new AtomicLong();
/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;
/** Remembers how many elements have been emitted before the requests run out. */
long emitted;
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}
if (done) {
if (delayError) {
if (isEmpty) {
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
Throwable e = error;
if (e != null) {
q.clear();
try {
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}
}
return false;
}
}
}
雖然程式碼很長,但是也就是三部分
- 建構函式,
- 實現Operator所繼承的Func1中的call()函式
- 靜態內部類ObserveOnSubscriber< T>
下面來逐一分析:
因為呼叫Observable.等函式而需要建立的稱之為Observable_1,Subscriber_1。
因為呼叫observeOn()而建立的稱之為Observable_2,Subscriber_2
2.1、建立OperatorObserveOn物件
上面這段程式碼,主要功能就是建立OperatorObserveOn物件
既然是Operator,那麼它的職責就是把一個Subscriber轉換成另外一個Subscriber,
2.2、OperatorObserveOn物件中的call()函式返回ObserveOnSubscriber物件(Subscriber_2)
我們來看下call函式都做了什麼:
ObserveOnSubscriber是一個靜態類(第53行),建立一個ObserveOnSubscriber類(繼承Subscriber< T>(Subscriber_2))(OperatorObserveOn程式碼第35行),在引數中傳入Subscriber_1(即區域性變數child)和scheduler(指定執行緒)等引數。
呼叫了observeOn(),在subscribe()中呼叫onSubscribe.call(subscriber);時,就會呼叫上面程式碼第27行的call(),結果被傳入到ObserveOnSubscriber的onNext()(第118行)。(如果不明白,請看RxJava 之二—— Lift()詳解)
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
這裡做了兩件事,
- 把執行的結果快取到一個佇列裡,這裡的on物件,不是Subscriber_1。
- 呼叫schedule()啟動傳入的執行緒所建立的worker
2.3、schedule()程式碼:
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
- recursiveScheduler 就是之前我們傳入的Scheduler,就是在observeOn()傳入的指定執行緒,例如:AndroidScheluders.mainThread()
2.4、我們看下在scheduler()中呼叫的call()方法程式碼,call()方法只能由scheduler()去呼叫執行
@Override
public void call() {
...
final Subscriber<? super T> localChild = this.child;
for (;;) {
...
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
...
}
if (emitted != 0L) {
request(emitted);
}
}
OK,在Scheduler啟動後, 我們在Observable.subscribe(a)傳入的a就是這裡的localChild(即Subscriber_1,是在第35行程式碼傳遞進來的) , 我們看到,在call中終於呼叫了它的onNext方法,把真正的結果傳了出去,此時是工作在observeOn()指定的執行緒。
那麼總結起來的結論就是:
- observeOn 對呼叫之前的序列默不關心,也不會要求之前的序列執行在指定的執行緒上
- observeOn 對之前的序列產生的結果先快取起來,然後再在指定的執行緒上,推送給最終的subscriber
下面給出兩次呼叫observeOn()的示意圖
複雜情況
我們經常多次使用subscribeOn()切換執行緒,那麼以後是否可以組合observeOn()和subscribeOn()達到自由切換的目的呢?
subscribeOn()改變的是subscribe()這句呼叫所在的執行緒,大多數情況,產生內容和消費內容是在同一執行緒的,所以改變了產生內容所在的執行緒,就改變了消費內容所在的執行緒。
對subscribeOn()的呼叫是自下向上,所以連續多次呼叫subscribeOn(),結果會被最上面的subscribeOn()覆蓋。(生成和消費都會被覆蓋)
observeOn()之上有subscribeOn()呼叫
observeOn()的工作原理是把消費結果先快取,再切換到新執行緒上讓原始消費者消費,它和生產者是沒有一點關係的,就算subscribeOn()呼叫了,也只是改變observeOn()這個消費者所在的執行緒,和OperatorObserveOn中儲存的原始消費者一點關係都沒有,它還是由observeOn()控制。observeOn()之下有subscribeOn()呼叫
這也不會改變observeOn()所指定的消費執行緒,因為observeOn()是自上而下呼叫,對subscribeOn()的呼叫是自下向上,在observeOn()指定的執行緒會覆蓋下面subscribeOn()指定執行緒來去消費
用一張圖來解釋當多個 subscribeOn() 和 observeOn() 混合使用時,執行緒排程是怎麼發生的(由於圖中物件較多,相對於上面的圖對結構做了一些簡化調整):
關注我的公眾號,輕鬆瞭解和學習更多技術