rxJava和rxAndroid原始碼解析系列二之observer訂閱
建立完Observable物件後,以後一步ObservableObserveOn.subscribe(new Observer<String>() {.....})這一步又發生了什麼呢?
接著跟蹤原始碼。
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); //主要是這行程式碼,也是實際訂閱執行的,接下來看看怎麼做的 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
<原始碼片段1>
subscribeActual(observer);這個方法,是Observable的抽象方法,由子類實現,那是誰呢,ObservableObserveOn或AbstractObservableWithUpstream,經過查詢在ObservableObserveOn找到了這個實現
@Override protected void subscribeActual(Observer<? super T> observer) { //scheduler是HandlerScheduler的物件,而HandlerScheduler是Scheduler的子類 if (scheduler instanceof TrampolineScheduler) {//這個就會跳過 source.subscribe(observer); } else { //那就從這裡開始執行 Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
<原始碼片段2>
通過上一篇的分析source指向的是ObservableSubscribeOn的物件,scheduler是HandlerScheduler的物件,所以實際執行的是下面兩行程式碼
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
先看第一行
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
<原始碼片段3>
HandlerWorker是HandlerScheduler的靜態內部類。HandlerWorker中也有ui執行緒的handler。
第二行程式碼先看new例項物件。ObserveOnObserver是ObservableObserveOn的靜態內部類,ObserveOnObserver是對observer回撥的裝飾和擴充套件,就不貼出程式碼了。好的,source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));怎麼執行的呢,這個source指向的是ObservableSubscribeOn的物件,ObservableSubscribeOn也是Observable物件的子類,所以subscribe方法和<原始碼片段1>一樣。那就看看它實際執行訂閱的程式碼啦
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
<原始碼片段4>
就三行程式碼,就把這個執行緒切換和回撥都完成啦?腦子裡突然蹦出一句歌詞,沒那麼簡單.....那就仔細分析一下吧,第一行程式碼就是把傳過來的ObserveOnObserver物件繼續封裝一下下,轉變成SubscribeOnObserver,而這個類就是ObservableSubscribeOn的內部類,也進行了裝飾和擴充套件。也沒什麼東東嘛,那就繼續第二句observer.onSubscribe(parent);observer是ObserveOnObserver它的例項物件,我們都知道啦ObserveOnObserver是ObservableObserveOn的靜態內部類,那就進去看看這個方法啦!
@Override
public void onSubscribe(Disposable d) {
//此時d就是傳入過來的ObserveOnObserver物件嘛
// this.upstream成員變數,找了一圈,構造的時候沒有它的事,為null
if (DisposableHelper.validate(this.upstream, d)) {//if判斷返回true
this.upstream = d;
if (d instanceof QueueDisposable) {//if判斷返回false
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
//佇列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//downstream就是Observer,也就是樣例程式碼中的new Observer(String){},就是最終回撥的嘛
downstream.onSubscribe(this);
}
}
<原始碼片段5>
那就先看看這個嘍DisposableHelper.validate(this.upstream, d)
public static boolean validate(Disposable current, Disposable next) {
if (next == null) {//next=ObserveOnObserver的物件,不為null,跳過
RxJavaPlugins.onError(new NullPointerException("next is null"));
return false;
}
if (current != null) {//current=(this.upstream=null,跳過
next.dispose();
reportDisposableSet();
return false;
}
return true;
}
<原始碼片段6>
返回true, <原始碼片段5>繼續往下執行d instanceof QueueDisposable,這個判斷,d是ObservableSubscribeOn靜態內部類ObserveOnObserver的物件,那就看看這個是什麼型別的
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...............................省略
}
<原始碼片段7>
QueueDisposable又是啥呢
public interface QueueDisposable<T> extends QueueFuseable<T>, Disposable {
}
<原始碼片段8>
d instanceof QueueDisposable就只能返回fase了,if程式碼塊跳過。<原始碼片段5>繼續往下執行 downstream.onSubscribe(this);
這個downstream是個成員變數,什麼時候賦值的呢?
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
<原始碼片段9>
構造的時候賦值的,誰呼叫了呢, <原始碼片段2>中賦值的,傳入過來的observer是誰呢?就是這句話嘛,ObservableObserveOn.subscribe(new Observer<String>() {.....}),才呼叫了 <原始碼片段2>的程式碼段,這個不是最終回撥嘛,搜嘎,
@Override
public void onSubscribe(Disposable d) {
//當前執行緒不就是new Thread()嘛,原來如此
Log.i("MainActivity","onSubscribe"+Thread.currentThread().getId()+":"+Thread.currentThread().getName());
}
<原始碼片段4>中的 observer.onSubscribe(parent);結束了,那就繼續看<原始碼片段4>的最後一句程式碼parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));這句程式碼肯定會開闢一片新天地,它要肩負著執行緒切換和回撥嘛。這句話巢狀的有點多,那我們就把他繼續分解,先看看new SubscribeTask(parent),這個類也是ObservableSubscribeOn它的內部類
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//ObservableSubscribeOn的source,這個source指向ObservableCreate
source.subscribe(parent);
}
}
<原始碼片段10>
這個就是封裝了一下,沒什麼東東,接著這句scheduler.scheduleDirect(new SubscribeTask(parent)));我們知道這個scheduler是ObservableSubscribeOn的成員變數,通過上一篇的介紹,這個scheduler是NewThreadScheduler的物件
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
//run就是SubscribeTask 的物件嘛
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
<原始碼片段11>
返回個disposable,還要繼續跟進
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();//NewThreadScheduler的createWorker()方法建立NewThreadWorker物件
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//decoratedRun =run
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
<原始碼片段12>
DisposeTask這個類是什麼呢
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
.........................省略
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();//decoratedRun是SubscribeTask的物件,是ObservableSubscribeOn的內部類
} finally {
dispose();
runner = null;
}
}
......................省略
}
<原始碼片段13>
構建完之後,並沒有直接返回,而是執行這段程式碼 w.schedule(task, delay, unit);w是NewThreadWorker的物件,那就跟蹤這段程式碼吧
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
/**
*根據原始碼片段11、12、13得知一下
*runnable是DisposeTask 它的run方法是呼叫SubscribeTask的run方法
*delayTime=0L
*unit為TimeUnit.NANOSECONDS
*/
if (disposed) {//跳過
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
<原始碼片段14>
那就是要繼續scheduleActual方法
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//decoratedRun =run
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {//parent為null,跳過
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {//delayTime 此時為0
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
<原始碼片段15>
ScheduledRunnable什麼鬼,看看
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
......................省略
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;
this.lazySet(0, parent);
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();//call()最終還是呼叫run方法,並且返回null
return null;
}
@Override
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
lazySet(THREAD_INDEX, null);
Object o = get(PARENT_INDEX);
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
((DisposableContainer)o).delete(this);
}
for (;;) {
o = get(FUTURE_INDEX);
if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
......................省略
}
<原始碼片段16>
所以 <原始碼片段15>執行 f = executor.submit((Callable<Object>)sr);executor好像是執行緒池,那就看看NewThreadWorker中的建構函式有這樣的程式碼
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
<原始碼片段17>
NewThreadWorker是NewThreadScheduler的createWorker方法建立,threadFactory是個執行緒工廠。excutor的產生如下
/**
* Creates a ScheduledExecutorService with the given factory.
* @param factory the thread factory
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
<原始碼片段18>
果然excutor是個執行緒池,那麼 <原始碼片段15> f = executor.submit((Callable<Object>)sr);這個方法不就是往裡面加入一個ScheduledRunnable物件讓執行緒池執行嘛。不知不覺中我們已經切換了執行緒,厲害啦!既然切換到執行緒,那就執行runable方法。
經過從ObservableSubscribeOn.SubscribeTask到最終的ScheduledRunnable,已經懵逼的朋友可以從 <原始碼片段4>看,然後再看到這個位置就明白啦。引用關係圖如下
結合這上圖,以及<原始碼片段4>到 <原始碼片段18>的執行,最終會呼叫ObservableSubscribeOn.SubscribeTask的run方法
裡面又一句程式碼source.subscribe(parent);根據上篇的Observable的關係引用圖可以知道source就是ObservableCreate物件阿。
就是這個繼續訂閱阿,發生線上程裡面。Observable的subscribe實際執行的是subscribeActual嘛,那就看看ObservableCreate對這個方法的具體實現
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
<原始碼片段19>
subscribeActual傳入的物件就是ObservableSubscribeOn.SubscribeTask的parent,這個parent就是ObservableSubscribeOn的靜態內部類物件SubscribeOnObserver,不清楚的朋友可以看一下關係引用圖或者從頭閱讀這篇部落格。先看一下這個類CreateEmitter
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
....................省略
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
....................省略
}
<原始碼片段20>
ObservableEmitter<T>東西值得注意
public interface ObservableEmitter<T> extends Emitter<T> {
.....................省略
}
<原始碼片段21>
Emitter<T>介面
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
<原始碼片段22>
Emitter<T>介面比Observer<T> 介面少一個方法void onSubscribe(@NonNull Disposable d);可以看出CreateEmitter對Observer進行了進一步裝飾和擴充套件,<原始碼片段19>中的observer.onSubscribe(parent);那就到SubscribeOnObserver裡面看看了
@Override
public void onSubscribe(Disposable d) {
//構造的時候自動建立的this.upstream
DisposableHelper.setOnce(this.upstream, d);
}
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
<原始碼片段23>
那就是 DisposableHelper.setOnce(this.upstream, d);執行的情況啦
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");//判空操作
if (!field.compareAndSet(null, d)) {//field的原子值肯定為null,所以if判斷跳過
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;//返回true
}
<原始碼片段24>
好吧, <原始碼片段19>中的程式碼 observer.onSubscribe(parent);結束,繼續<原始碼片段19>中的下一行程式碼source.subscribe(parent);這個source就是寫一篇文章demo的時候Obserable.create(ObservableOnSubscribe)的引數嘛,就是自己實現的嘛,ObservableOnSubscribe就是demo自己寫的東西嘛,也就是回撥計劃嘛。看到了吧,ObservableOnSubscribe的subscribe方法是執行在NewThreadWorker的執行緒池裡面的一個執行緒中。
總結:Obserable的訂閱就是不斷地建立Observer嘛(裝飾Observer),且是一個向最開始建立的Obserable傳遞Observer的過程。
當然涉及到了執行緒的切換,在這一篇不是核心,重點關注的是Observable的訂閱,下一篇講述子執行緒和主執行緒的切換。