Rxjava(執行緒類)--subscribeOn原理
阿新 • • 發佈:2018-12-30
例項:
System.out.println("<<<<<< main threadid = " + Thread.currentThread().getId()); Observable.just(1).doOnSubscribe(new Action0() { @Override public void call() { System.out.println("<<<<<<doOnSubscribe thread id = " + Thread.currentThread().getId()); } }).subscribeOn(Schedulers.from(JobExecutor.getInstance())).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("<<<<<<subscribe thread id = " + Thread.currentThread().getId()); } });
輸出:
<<<<<< main threadid = 1
<<<<<<doOnSubscribe thread id = 11
<<<<<<subscribe thread id = 11
我們看一下subscribeOn的實現
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
這裡我們不是ScalarSynchronousObservable,走下面一個分支
建立了一個OperatorSubscribeOn,然後呼叫create方法,後面訂閱的時候會呼叫它的call方法
呼叫scheduler.createWorker();建立一個Worker,並呼叫他的schedule,由前面分析可知,最終會呼叫這裡的call方法(這裡已經進行了執行緒的切換了)public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }); }
在call方法裡面,會建立一個Subscriber,並通過
source.unsafeSubscribe(s);
訂閱它public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
return Subscriptions.unsubscribed();
}
}
這裡的onSubscribe就是OnSubscribeLift呼叫他的call方法
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
呼叫operator的call方法,這些流程在http://blog.csdn.net/new_abc/article/details/52932705已經分析過,執行了doOnSubscribe中的回撥。這後面這些都是在新創建出的執行緒中執行的。