1. 程式人生 > >rxJava和rxAndroid原始碼解析系列二之observer訂閱

rxJava和rxAndroid原始碼解析系列二之observer訂閱

建立完Observable物件後,以後一步ObservableObserveOn.subscribe(new Observer<String>() {.....})這一步又發生了什麼呢?

接著跟蹤原始碼。

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //主要是這行程式碼,也是實際訂閱執行的,接下來看看怎麼做的
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

                                                                                             <原始碼片段1>

 subscribeActual(observer);這個方法,是Observable的抽象方法,由子類實現,那是誰呢,ObservableObserveOn或AbstractObservableWithUpstream,經過查詢在ObservableObserveOn找到了這個實現

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
//scheduler是HandlerScheduler的物件,而HandlerScheduler是Scheduler的子類
        if (scheduler instanceof TrampolineScheduler) {//這個就會跳過
            source.subscribe(observer);
        } else {
            //那就從這裡開始執行
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

                                                                                      <原始碼片段2>

通過上一篇的分析source指向的是ObservableSubscribeOn的物件,scheduler是HandlerScheduler的物件,所以實際執行的是下面兩行程式碼

                          Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

先看第一行

   @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }

                                                                                <原始碼片段3>

HandlerWorker是HandlerScheduler的靜態內部類。HandlerWorker中也有ui執行緒的handler。

第二行程式碼先看new例項物件。ObserveOnObserver是ObservableObserveOn的靜態內部類,ObserveOnObserver是對observer回撥的裝飾和擴充套件,就不貼出程式碼了。好的,source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));怎麼執行的呢,這個source指向的是ObservableSubscribeOn的物件,ObservableSubscribeOn也是Observable物件的子類,所以subscribe方法和<原始碼片段1>一樣。那就看看它實際執行訂閱的程式碼啦

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

                                                                                  <原始碼片段4>

就三行程式碼,就把這個執行緒切換和回撥都完成啦?腦子裡突然蹦出一句歌詞,沒那麼簡單.....那就仔細分析一下吧,第一行程式碼就是把傳過來的ObserveOnObserver物件繼續封裝一下下,轉變成SubscribeOnObserver,而這個類就是ObservableSubscribeOn的內部類,也進行了裝飾和擴充套件。也沒什麼東東嘛,那就繼續第二句observer.onSubscribe(parent);observer是ObserveOnObserver它的例項物件,我們都知道啦ObserveOnObserver是ObservableObserveOn的靜態內部類,那就進去看看這個方法啦!

        @Override
        public void onSubscribe(Disposable d) {
//此時d就是傳入過來的ObserveOnObserver物件嘛
// this.upstream成員變數,找了一圈,構造的時候沒有它的事,為null
            if (DisposableHelper.validate(this.upstream, d)) {//if判斷返回true
                this.upstream = d;
                if (d instanceof QueueDisposable) {//if判斷返回false
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }
                //佇列
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //downstream就是Observer,也就是樣例程式碼中的new Observer(String){},就是最終回撥的嘛
                downstream.onSubscribe(this);
            }
        }

                                                                                   <原始碼片段5>

那就先看看這個嘍DisposableHelper.validate(this.upstream, d)

    public static boolean validate(Disposable current, Disposable next) {
        if (next == null) {//next=ObserveOnObserver的物件,不為null,跳過
            RxJavaPlugins.onError(new NullPointerException("next is null"));
            return false;
        }
        if (current != null) {//current=(this.upstream=null,跳過
            next.dispose();
            reportDisposableSet();
            return false;
        }
        return true;
    }

                                                                                        <原始碼片段6>

返回true, <原始碼片段5>繼續往下執行d instanceof QueueDisposable,這個判斷,d是ObservableSubscribeOn靜態內部類ObserveOnObserver的物件,那就看看這個是什麼型別的

  static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...............................省略
}

                                                                                      <原始碼片段7>

QueueDisposable又是啥呢

public interface QueueDisposable<T> extends QueueFuseable<T>, Disposable {
}

                                                                                      <原始碼片段8>

d instanceof QueueDisposable就只能返回fase了,if程式碼塊跳過。<原始碼片段5>繼續往下執行 downstream.onSubscribe(this);

這個downstream是個成員變數,什麼時候賦值的呢?

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

                                                                                     <原始碼片段9>

構造的時候賦值的,誰呼叫了呢, <原始碼片段2>中賦值的,傳入過來的observer是誰呢?就是這句話嘛,ObservableObserveOn.subscribe(new Observer<String>() {.....}),才呼叫了 <原始碼片段2>的程式碼段,這個不是最終回撥嘛,搜嘎,

               @Override
                            public void onSubscribe(Disposable d) {
 //當前執行緒不就是new Thread()嘛,原來如此                               

Log.i("MainActivity","onSubscribe"+Thread.currentThread().getId()+":"+Thread.currentThread().getName());
                            }

<原始碼片段4>中的 observer.onSubscribe(parent);結束了,那就繼續看<原始碼片段4>的最後一句程式碼parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));這句程式碼肯定會開闢一片新天地,它要肩負著執行緒切換和回撥嘛。這句話巢狀的有點多,那我們就把他繼續分解,先看看new SubscribeTask(parent),這個類也是ObservableSubscribeOn它的內部類

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
//ObservableSubscribeOn的source,這個source指向ObservableCreate
            source.subscribe(parent);
        }
    }

                                                                              <原始碼片段10>

這個就是封裝了一下,沒什麼東東,接著這句scheduler.scheduleDirect(new SubscribeTask(parent)));我們知道這個scheduler是ObservableSubscribeOn的成員變數,通過上一篇的介紹,這個scheduler是NewThreadScheduler的物件

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
//run就是SubscribeTask 的物件嘛
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

                                                                              <原始碼片段11>

返回個disposable,還要繼續跟進

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();//NewThreadScheduler的createWorker()方法建立NewThreadWorker物件

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//decoratedRun =run

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

                                                                                      <原始碼片段12>

DisposeTask這個類是什麼呢

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

.........................省略

   DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();//decoratedRun是SubscribeTask的物件,是ObservableSubscribeOn的內部類
            } finally {
                dispose();
                runner = null;
            }
        }
......................省略
}

                                                                               <原始碼片段13>

構建完之後,並沒有直接返回,而是執行這段程式碼 w.schedule(task, delay, unit);w是NewThreadWorker的物件,那就跟蹤這段程式碼吧

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
    /**
    *根據原始碼片段11、12、13得知一下
    *runnable是DisposeTask 它的run方法是呼叫SubscribeTask的run方法
    *delayTime=0L
    *unit為TimeUnit.NANOSECONDS
    */
        if (disposed) {//跳過
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

                                                                    <原始碼片段14>

那就是要繼續scheduleActual方法

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//decoratedRun =run

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {//parent為null,跳過
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {//delayTime 此時為0
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

                                                                         <原始碼片段15>

ScheduledRunnable什麼鬼,看看

public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {

......................省略

 public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
        super(3);
        this.actual = actual;
        this.lazySet(0, parent);
    }

    @Override
    public Object call() {
        // Being Callable saves an allocation in ThreadPoolExecutor
        run();//call()最終還是呼叫run方法,並且返回null
        return null;
    }

    @Override
    public void run() {
        lazySet(THREAD_INDEX, Thread.currentThread());
        try {
            try {
                actual.run();
            } catch (Throwable e) {
                // Exceptions.throwIfFatal(e); nowhere to go
                RxJavaPlugins.onError(e);
            }
        } finally {
            lazySet(THREAD_INDEX, null);
            Object o = get(PARENT_INDEX);
            if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
                ((DisposableContainer)o).delete(this);
            }

            for (;;) {
                o = get(FUTURE_INDEX);
                if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                    break;
                }
            }
        }
    }
......................省略
}

                                                                                   <原始碼片段16>

所以   <原始碼片段15>執行 f = executor.submit((Callable<Object>)sr);executor好像是執行緒池,那就看看NewThreadWorker中的建構函式有這樣的程式碼

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

                                                                                <原始碼片段17>

NewThreadWorker是NewThreadScheduler的createWorker方法建立,threadFactory是個執行緒工廠。excutor的產生如下

 /**
     * Creates a ScheduledExecutorService with the given factory.
     * @param factory the thread factory
     * @return the ScheduledExecutorService
     */
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }

                                                                                <原始碼片段18>

果然excutor是個執行緒池,那麼 <原始碼片段15> f = executor.submit((Callable<Object>)sr);這個方法不就是往裡面加入一個ScheduledRunnable物件讓執行緒池執行嘛。不知不覺中我們已經切換了執行緒,厲害啦!既然切換到執行緒,那就執行runable方法。

經過從ObservableSubscribeOn.SubscribeTask到最終的ScheduledRunnable,已經懵逼的朋友可以從 <原始碼片段4>看,然後再看到這個位置就明白啦。引用關係圖如下

結合這上圖,以及<原始碼片段4>到  <原始碼片段18>的執行,最終會呼叫ObservableSubscribeOn.SubscribeTask的run方法

裡面又一句程式碼source.subscribe(parent);根據上篇的Observable的關係引用圖可以知道source就是ObservableCreate物件阿。

就是這個繼續訂閱阿,發生線上程裡面。Observable的subscribe實際執行的是subscribeActual嘛,那就看看ObservableCreate對這個方法的具體實現

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

                                                                            <原始碼片段19>

subscribeActual傳入的物件就是ObservableSubscribeOn.SubscribeTask的parent,這個parent就是ObservableSubscribeOn的靜態內部類物件SubscribeOnObserver,不清楚的朋友可以看一下關係引用圖或者從頭閱讀這篇部落格。先看一下這個類CreateEmitter

 static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

....................省略
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

....................省略
}

                                                                        <原始碼片段20>

ObservableEmitter<T>東西值得注意

public interface ObservableEmitter<T> extends Emitter<T> {
.....................省略
}

                                                                    <原始碼片段21>

Emitter<T>介面

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

                                                                             <原始碼片段22>

Emitter<T>介面比Observer<T> 介面少一個方法void onSubscribe(@NonNull Disposable d);可以看出CreateEmitter對Observer進行了進一步裝飾和擴充套件,<原始碼片段19>中的observer.onSubscribe(parent);那就到SubscribeOnObserver裡面看看了

   @Override
        public void onSubscribe(Disposable d) {
//構造的時候自動建立的this.upstream
            DisposableHelper.setOnce(this.upstream, d);
        }
    SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }

                                                                            <原始碼片段23>

那就是 DisposableHelper.setOnce(this.upstream, d);執行的情況啦

    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");//判空操作
        if (!field.compareAndSet(null, d)) {//field的原子值肯定為null,所以if判斷跳過
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;//返回true
    }

                                                              <原始碼片段24>

好吧,   <原始碼片段19>中的程式碼 observer.onSubscribe(parent);結束,繼續<原始碼片段19>中的下一行程式碼source.subscribe(parent);這個source就是寫一篇文章demo的時候Obserable.create(ObservableOnSubscribe)的引數嘛,就是自己實現的嘛,ObservableOnSubscribe就是demo自己寫的東西嘛,也就是回撥計劃嘛。看到了吧,ObservableOnSubscribe的subscribe方法是執行在NewThreadWorker的執行緒池裡面的一個執行緒中。

總結:Obserable的訂閱就是不斷地建立Observer嘛(裝飾Observer),且是一個向最開始建立的Obserable傳遞Observer的過程。

當然涉及到了執行緒的切換,在這一篇不是核心,重點關注的是Observable的訂閱,下一篇講述子執行緒和主執行緒的切換。