Rxjava執行緒變換原理
阿新 • • 發佈:2018-12-19
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler;//傳進來的scheduler final Observable<T> source;//原Observable public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override //當新的Observable訂閱時呼叫該方法 傳進去的是原subscriber public void call(final Subscriber<? super T> subscriber) { //createWorker()建立執行緒池 final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); // Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; //新的Observable訂閱時呼叫OperatorSubscribeOn的call方法,call中進行原Observable的訂閱,線上程中執行原Subscriber的onnext() source.unsafeSubscribe(s); } }); } }
observeOn原理
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }
OperatorObserveOn的call方法,lift中呼叫這個方法生成新的Subscriber
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
//通過ObserveOnSubscriber做執行緒變換
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
ObserveOnSubscriber的onNext()
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}