1. 程式人生 > >Rxjava(執行緒類)--subscribeOn原理

Rxjava(執行緒類)--subscribeOn原理

例項:

       System.out.println("<<<<<< main threadid = " + Thread.currentThread().getId());

        Observable.just(1).doOnSubscribe(new Action0() {
            @Override
            public void call() {
                System.out.println("<<<<<<doOnSubscribe thread id = " + Thread.currentThread().getId());
            }
        }).subscribeOn(Schedulers.from(JobExecutor.getInstance())).subscribe(new
                                                                                                                   Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("<<<<<<subscribe thread id = " + Thread.currentThread().getId());
            }
        });

輸出:

<<<<<< main threadid = 1
<<<<<<doOnSubscribe thread id = 11
<<<<<<subscribe thread id = 11

我們看一下subscribeOn的實現
  public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

這裡我們不是ScalarSynchronousObservable,走下面一個分支

建立了一個OperatorSubscribeOn,然後呼叫create方法,後面訂閱的時候會呼叫它的call方法

  public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
呼叫scheduler.createWorker();建立一個Worker,並呼叫他的schedule,由前面分析可知,最終會呼叫這裡的call方法(這裡已經進行了執行緒的切換了)

在call方法裡面,會建立一個Subscriber,並通過

source.unsafeSubscribe(s);
訂閱它
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                RxJavaHooks.onObservableError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r; // NOPMD
            }
            return Subscriptions.unsubscribed();
        }
    }
這裡的onSubscribe就是OnSubscribeLift呼叫他的call方法
public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
呼叫operator的call方法,這些流程在http://blog.csdn.net/new_abc/article/details/52932705已經分析過,執行了doOnSubscribe中的回撥。這後面這些都是在新創建出的執行緒中執行的。