1. 程式人生 > >淺析RxJava 1.x&2.x版本使用區別及原理(一):Observable、Flowable等基本元素原始碼解析

淺析RxJava 1.x&2.x版本使用區別及原理(一):Observable、Flowable等基本元素原始碼解析

RxJava開源框架的風靡程度在Github上無需多言,它帶來的響應式程式設計模式和執行緒隨意切換、巢狀請求、背壓等功能給了開發者耳目一新的體驗,更是成為了大多數APP中常用的RxJava+Okhttp/Retrofit+MVP/MVVM/Clean黃金組合中的一員。我猶記得知乎團隊在去年線下還開展過一次線下RxJava交流會,邀請了扔物線講解相關知識及體驗,可見各大廠對此的重視度。如非要列舉一個RxJava缺點,那就是學習曲線高,但是後期收益高。我記得去年初個人開發一款教育APP時使用的還是RxJava 1.x版本,而今年3月31號官方已經宣佈停止對1.x 版本的維護,在用過2.x版本後,可見對之前版本的改進。

特此,此係列文章來淺析RxJava 1.x&2.x版本使用區別及原理,分析原始碼的過程是艱難而又通過的,深入理解之後的豁然開朗更是無法比擬的。需要強調的是這幾篇文章重點部分在於分析原始碼,並不適合作為框架的初學篇,熟悉其用法後再觀此部落格討論學習。另外,此文章將著重於RxJava兩個版本用法的對比,由此為出發點分析兩者的原始碼及運作原理。

此係列文章將分成以下三個部分講解:

  • RxJava 1.x&2.x版本的基本元素(Observable、Subscriber、Flowable、Observer等)流程原始碼分析
  • RxJava 1.x&2.x版本的操作符(map、lift)原始碼分析
  • RxJava 1.x&2.x版本的執行緒變換原始碼分析

一. RxJava簡介

說到RxJava開源框架,首先第一反應是響應式程式設計,它是一種面向資料流變化傳播程式設計正規化

  • 資料流:只能以事先規定好的順序被讀取一次的一個數據序列。在計算機中是資料,在現實中可以是任意物件組成的有序佇列。
  • 變化傳播:類似於設計模式中的觀察者模式,一旦監視的事件狀態或其他發現變化,立即通知觀察者。
  • 程式設計正規化:是計算機程式設計的基本風格或典範模式,例如面向物件程式設計、面向過程程式設計。

可以把響應式程式設計比喻為流水線上的生產工作,資料流則是流水線上加工的一系列物品,變化傳播類似於加工過程中多個環節的轉化,而程式設計正規化就相當於不同的加工環節,其工作方式不同。如此,方便理解。

A library for composing asynchronous and event-based programs by using observable sequences

  • asynchronous非同步,說明RxJava是一個非同步的庫,基於回撥的。
  • event-based基於事件,說明RxJava是一個事件分發庫,訊息傳遞的庫。

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

更進一步解釋,它擴充套件了觀察者模式來支援資料/事件序列,並添加了運算子,使開發者可以宣告性地組合序列,同時抽象出對低階執行緒、同步、執行緒安全性和併發資料結構等事物的觀察。

二. RxJava 1.x基本元素原始碼分析

1. 基本元素用法介紹

下面通過一個簡單的RxJava 1.x的程式碼例項來了解 1.x版本中的重要元素:

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;

      Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext("test");
                    subscriber.onCompleted();
                }
            }
        });

        Subscription subscription = observable.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
            }
        });
  • 首先通過呼叫Observable的create() 方法建立了一個Observable物件,傳入的引數是OnSubscribe介面,故而在引數內實現其介面的call 方法。
  • 再呼叫Observable物件的訂閱subscribe 事件方法,其引數傳入Observer介面,故而在引數內實現其介面的onCompleted()onError(Throwable e)onNext(T t)方法。
  • 在建立Observable中實現OnSubscribe介面的call 方法中,注意其引數為Subscriber訂閱者,呼叫訂閱者的onNext方法傳入資料,再呼叫onCompleted 方法。

這裡寫圖片描述

以上是RxJava 1.x版本的簡單使用,一個簡單的例子已展示出其庫的基本元素:

  • Observable
  • Observer
  • Subscriber
  • OnSubscriber
  • Subscription

2. 基本元素原始碼分析

在瞭解以上基本元素後,接下來進行原始碼分析,首先從最簡單的Subscription入手:

(1)Subscription 介面

Subscription是Observable呼叫subscribe訂閱事件方法後返回的一個介面,其內容也很簡單,兩個方法,一個解除訂閱的unsubscribe()方法,一個是判斷是否解除的isUnsubscribed() 方法,原始碼如下:

這裡寫圖片描述

(2)Observer 介面

Observer是Observable呼叫subscribe訂閱事件方法中傳入的引數,也是一個介面,三個待實現的方法,分別是回撥完成方法onCompleted()、 錯誤處理方法onError(Throwable e)、資料接收方法onNext(T t)

這裡寫圖片描述

(3)Subscriber 抽象類

在建立Observable時,需要給create傳入引數Observable.OnSubscribe介面,並實現其介面的call方法,此方法的引數型別就是Subscriber,而開發者也是在此方法中呼叫onNextonComplete因此可以推測Subscriber必然實現了Observer 介面。

仔細檢視原始碼,確實如此,它還實現了Subscription 介面中的unsubscribe()isUnsubscribed()兩個方法,簡單做了封裝,但並未詳細實現Observer 介面中的三個方法,因此只是一個抽象類。

這裡寫圖片描述

(4)OnSubscriber 介面

OnSubscriber是在建立Observable時傳入create 方法中的引數型別,也是一個介面。此介面又是繼承於Action1 介面,Action1 介面中有一個未實現的call方法,而Action1 介面又繼承於Action介面,Action介面是一個空實現,最後它又繼承於Fcuntion介面,Fcuntion介面也是一個空實現。

OnSubscriber -> Action1 -> Action -> Fcuntion

這裡寫圖片描述

(5)Observable

  • create方法

首先檢視Observable的靜態建立create方法,可見其只是多做了一層封裝,new這個物件時,將引數onSubscribe傳入構造方法中,而RxJavaHooks.onCreate(f)也只是多做了一個判斷,最終返回的還是onSubscribe。

這裡寫圖片描述

  • subscribe方法

再看subscribe 方法:

這裡寫圖片描述

先忽略掉一開始的判斷,直接檢視最後一行程式碼,這裡過載了另外一個subscribe 方法,引數為Subscriber型別,因此new ObserverSubscriber<T>(observer),這裡使用Subscriber將我們傳入的observer介面做了一層簡單的封裝,來檢視ObserverSubscriber的具體實現:

這裡寫圖片描述

可見,這裡使用Subscriber將我們傳入的observer介面做了一層簡單的封裝。還是回到它過載的另一個subscribe 方法,它才是研究的重點:

這裡寫圖片描述

由以上程式碼可知,從呼叫的引數為observer介面的subscribe 方法內做了一層封裝,呼叫了引數為subscriber抽象類的subscribe 方法,最終呼叫的是引數為subscriber、observable的靜態subscribe 方法。

3個subscribe 方法方法的層次呼叫,為的只是引數的封裝獲取,而最終在靜態subscribe 方法中的重點處理如下:

這裡寫圖片描述

第一行程式碼呼叫OnSubscriber介面的call方法,這意味著我們建立Observable時實現的call方法回被呼叫,那麼call方法中對資料傳遞、標誌完成的操作會執行,即實現的Observer介面中的onNext方法中接收到資料,onComplete()方法也被呼叫。最後返回Subscription。

3. 基本元素總結

這整個過程可以用一個簡單的打電話事例來理解,Observable即打電話的人,撥通號碼的過程相當於subscribe確定了接電話的人Subscriber,OnSubscribe則類似電話連線的線路,開啟資訊的傳遞。兩者開始通話時,狀態建立,Observer則監視整個狀態,完成通話資料傳遞、完成、錯誤過程。而當Subscriber取消通話時,兩者的關係解除,專門用於描述兩者關係的Subscription返回。

(1)Observable

  • 被觀察者
  • 可通過Observable建立一個可觀察到序列
  • 通過subscribe去註冊一個觀察者

(2)Observer

  • 用於接收資料的觀察者
  • 作為Observable的subscribe方法的引數

(3)Subscription

  • 訂閱,用於描述被觀察者和觀察者之間的關係
  • 用於取消訂閱和獲取當前的訂閱狀態

(4)OnSubscribe

  • 當訂閱時會出發此介面的呼叫
  • 在Observable內部,實際作用時向訂閱者釋出資料

(5)Subscriber

  • 實現了Observer和Subscription介面
  • 只有自己才能阻止自己

這裡寫圖片描述

三. RxJava 2.x基本元素原始碼分析

此處涉及到一個背壓問題,在RxJava 1.x版本中有的Observable可以處理,有的不行,而在2.x版本中直接將此分離出來,Observable不處理背壓相關問題,而是由Flowable這樣一個新的類來處理,後續詳解。

1. 基本元素介紹

以下是RxJava 2.x的簡單使用:


import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

                Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        if (!e.isDisposed()) {
                            e.onNext("test");
                            e.onComplete();
                        }
                    }
                }).subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }
                    @Override
                    public void onNext(String value) {
                        System.out.println("onNext:" + value);
                    }
                    @Override
                    public void onError(Throwable e) {

                    }
                    @Override
                    public void onComplete() {
                        System.out.println("onCompleted");
                    }
                });

這裡寫圖片描述

兩個版本的使用方式都類似,不再贅述,直接分析其區別:

  • 在通過呼叫Observable的create() 方法建立物件時,傳入的引數是ObservableOnSubscribe介面,實現的是其介面的subscribe 方法,方法內的引數是ObservableEmitter不再是1.x版本的OnSubscribe介面(call 方法)。
  • 在呼叫Observable物件的訂閱subscribe 事件方法,其引數傳入的Observer介面,多了一個需要實現onSubscribe(Disposable d)方法,方法內的引數是Disposable

RxJava 2.x中的基本元素如下:

  • Observable
  • Observer
  • Disposable(Subscription)
  • ObservableOnSubscriber(OnSubscriber)
  • Emitter

在原始碼分析之前先提出一個問題,Rxjava 2.x的使用相比較1.x 版本有一個明顯之處是呼叫subscribe方法時需要實現的方法多了一個onSubscribe,從上面列印日誌可知,其方法的執行是要優先於建立Observable時實現的subscribe方法,為何?通過以下原始碼分析答案便可水落石出。

2. 原始碼分析

RxJava 2.x版本相較於 1.x版本的使用類似,但又有稍許變化,此處只著重講解不同的部分:

(1)Observer介面

多了一個void onSubscribe(Disposable d);方法,用於觀察者取消事件訂閱,來檢視Disposable介面組成:(注意:2.x版本新增的Disposable可以做到切斷訂閱事件的操作,讓觀察者Observer不再接收上游事件,避免記憶體洩漏)

這裡寫圖片描述

介面中兩個方法,一個dispose方法,另一個事檢測是否dispose方法,其結構與Subscription類似。

(2)ObservableOnSubscribe介面

是建立Observable時傳入的介面引數,在2.x版本中單獨獨立出來了。為觀察者提供了取消訂閱連線的功能,該介面中的subscribe方法用於接收ObservableEmitter的例項,該例項允許用安全的方式取消事件。

這裡寫圖片描述

(3)ObservableEmitter介面

前兩個介面我們一直都在強調2.x版本新增的Disposable切斷訂閱事件,使得觀察者不再接收上游事件的功能,可預先此介面也是為它所用,作用是設定Emitter的disposable和cancellable

這裡寫圖片描述

繼續檢視Emitter介面的組成,會發現其中包含的三個方法竟然與Observer介面完全相同,其中緣由後續講解。

這裡寫圖片描述

(4)Observable

  • create方法

在簡單瞭解以上結構後,直襲老窩,檢視Observable的create方法:

這裡寫圖片描述

此方法中可以得出兩個資訊,第一個是呼叫了RxJavaPlugins的靜態onAssembly方法,第二個是傳入此方法的引數,將ObservableOnSubscribe介面通過ObservableCreate做了一次封裝。首先來了解onAssembly方法:

這裡寫圖片描述

此方法中的一個關鍵成員變數onObservableAssembly,它最初被賦值為null,為外界提供了set方法,因此當我們剛開始呼叫時f 被判斷為null,直接將source返回。再來檢視new ObservableCreate<T>(source) 具體構成:

可見ObservableCreate類被final 修飾,繼承於Observable,值得注意的是它實現了一個從父類繼承而來的subscribeActual 方法,此處暫時不講解,它主要作用於Observable的subscribe方法,結合此方法再講。

這裡寫圖片描述

  • subscribe方法

再次回到Observable,檢視subscribe方法:

這裡寫圖片描述

首先檢視到observer = RxJavaPlugins.onSubscribe(this, observer);,在Observable的create中也出現了RxJavaPlugins相關用法,而此處它的作用也是類似,就是將傳入的引數observer返回,重點在於後面的subscribeActual(observer);,也就是剛介紹ObservableCreate實現的subscribeActual 方法。

回到上上張圖,subscribeActual 方法中首先借助CreateEmitter將傳入的引數observe介面做了一次封裝,接著呼叫observer的onSubscribe(parent)方法,在我們實現該方法時的引數是disposable,而這裡傳入的是CreateEmitter引數,因此可以推斷CreateEmitter必然實現了Disposable介面。接下來的source.subscribe(parent);,即呼叫ObservableOnSubscribe介面的subscribe方法,傳入實現了Disposable介面的CreateEmitter,正式觸發事件的訂閱流程!

這裡寫圖片描述

到了這裡,必須強調了一下,再次回顧RxJava 1.x版本中Observable的subscribe處理,通過呼叫建立Observable傳入的OnSubscribe介面的call方法正式觸發訂閱事件,後續Observe介面中onNextonComplete方法才回被呼叫。而RxJava 2.x版本中的處理亦如是,只不過OnSubscribe介面換成了ObservableOnSubscribe介面,call方法換成了subscribe方法,引數由subscriber更換成了ObservableEmitter,這些變換也是RxJava 2.x的改進,新增的Disposable切斷訂閱事件,使得觀察者不再接收上游事件的功能,來避免記憶體洩漏。由此可見以上處理過程,RxJava 2.x 與 1.x的區別不大。

以上訂閱流程基本已經結束,還有幾個關鍵點再來補充補充。之前我們推測CreateEmitter必然實現了Disposable介面,不僅如此,對比上圖再檢視兩版本的最後一步原始碼,RxJava 1.x版本呼叫OnSubscribe介面傳入的引數是實現了observer介面的subscribe, 2.x版本傳入的則是實現了Disposable介面的CreateEmitter,Disposable是 2.x新增的概念,暫且不予考慮。按理說這兩者性質應當相同,1.x版本中的subscribe實現了observer介面,有onNextonCompleteonError基本方法,繼續猜測CreateEmitter是否也應該繼承了2.x版本中的observer介面,需要注意的是2.x版本中的observer介面中多了一個subscribe方法,對於一些運算子所需的基本介面而言,這個方法並不符合需求,只需要onNextonCompleteonError基本方法。

  • 為何observer介面中onSubscribe方法最先執行,ObservableOnSubscribe介面中的subscribe方法在後?

以上的原始碼正好解釋了程式碼執行的一個順序,首先執行的是Subscribe介面中的onSubscribe方法,再是ObservableOnSubscribe介面中的subscribe方法,由於在此方法中做了資料傳遞、標誌完成等操作,因此Subscribe介面中的onNextonComplete方法會被呼叫。

  • CreateEmitter

結合之前介紹的建立Observable傳入ObservableOnSubscribe介面後,實現的subscribe方法中的引數是ObservableEmitter介面,1.x版本中是Subscribe抽象類,而ObservableEmitter繼承於的Emitter介面中正好有onNextonCompleteonError基本方法,因此CreateEmitter必然於Emitter有所關聯!至此,分析出了CreateEmitter實現了Disposable、Emitter介面,檢視詳細原始碼:

這裡寫圖片描述

可見我們分析並無誤,此類中也對onNextonCompleteonError方法做了一些基本封裝,我們在建立Observable傳入ObservableOnSubscribe介面時實現的subscribe方法中,雖然使用的是其引數ObservableEmitter呼叫onNext傳入資料,但追溯到此根源,最終還是交由CreateEmitter來處理,即我們實現的observer介面來呼叫onNext等方法。

  • DisposableHelper

分析完Emitter介面後,CreateEmitter的核心還沒有結束,它確實是一座寶庫。接下來是它對Disposable的處理,它實現的方法中出現了一個關鍵部分,即藉助了DisposableHelper來處理Disposable相關事務:

這裡寫圖片描述

檢視DisposableHelper,它是一個列舉類,實現了Disposable介面,內有DISPOSED,作為單例模式存在。因此它實現的判斷isDisposed方法,直接將引數與DISPOSED比較即可,原始碼如下:

這裡寫圖片描述

3. 基本元素總結

在以上基本元素的對比上,兩個版本其實沒有很大的區別,只是部分寫法做了改變,2.x版本中將精華ObservableCreate獨立出來,但其核心內容還是與1.x 相同,呼叫傳進來的OnSubscribe介面中的subscribe(call)方法,並傳入observe介面到其中。而2.x中還特地使用了CreateEmitter對傳入的observe介面做了包裝,因此我們手動呼叫onNext方法時,實際上就是通過observe介面呼叫這些方法。

(1)Observable

  • 被觀察者,不支援背壓
  • 可通過Observable建立一個可觀察到序列
  • 通過subscribe去註冊一個觀察者

(2)Observer

  • 用於接收資料的觀察者
  • 作為Observable的subscribe方法的引數

(3)Disposable

  • 和RxJava1的Subscription作用相當
  • 用於取消訂閱和獲取當前的訂閱狀態

(4)ObservableOnSubscriber

  • 當訂閱時會出發此介面的呼叫
  • 在Observable內部,實際作用時向訂閱者釋出資料

(5)Emitter

  • 一個發射資料的介面,和Observer 的方法類似
  • 本質是對Observer和Subscribe的包裝

這裡寫圖片描述

4. 背壓

概念

  • 非同步環境下產生的問題:同步環境下會等待一件事處理完後再進行下一步,而非同步環境下是處理完一件事,未等它得出結果接著處理下一步,在獲得結果之後進行回撥,再處理結果。
  • 傳送和處理速度不統一:例如生產者生產出的產品放置到快取佇列中,供消費者消費。若生產者生產的速度大於消費者消耗的速度,則會出現快取佇列溢位的問題。
  • 是一種流速控制及解決策略:例如背壓中的丟棄策略,一旦發現快取佇列已滿,為了整個過程順利進行,則會丟棄最新產生的產品,避免溢位,因此背壓也是一種流速控制的解決策略

背壓Flowable方法的簡單使用如下:(注意Flowable專門用於背壓使用,在onSubscribe方法中需要手動做一個響應式拉取,即s.request(Long.MAX_VALUE);,否則不會呼叫onNext 方法)

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;

                Flowable.create(new FlowableOnSubscribe<String>() {
                    @Override
                    public void subscribe(FlowableEmitter<String> e) throws Exception {
                        if (!e.isCancelled()) {
                            e.onNext("Flowable test");
                            e.onComplete();
                        }
                    }
                }, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Long.MAX_VALUE);
                        System.out.println("onSubscribe");
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext:" + s);
                    }
                    @Override
                    public void onError(Throwable t) {
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("onCompleted");
                    }
                });
            }

這裡寫圖片描述

  • 首先建立通過呼叫Flowable的create方法建立例項,傳入兩個引數,第一個是OnSubscribe介面例項,並實現其subscribe方法,第二引數則是背壓特有的背壓策略;
  • 呼叫Flowable的subscribe方法,你會發現不同於之前,傳入的並非是observer介面,而是Subscriber介面,實現onSubscribeonNextonCompleteonError方法。

以上是RxJava 2.x版本中背壓的簡單使用,基本元素如下:

  • Flowable
  • Subscriber
  • Subscription
  • OnSubscriber
  • Emitter

注意:以上使用Flowable時,包括在實現subscribe方法中的資料傳遞等操作時,如果沒有在Subscriber介面中的onSubscribe方法中設定響應式拉取,即s.request(Long.MAX_VALUE);onNext方法中列印的log不會顯示!為何?閱讀以下原始碼將獲得解釋。

5. 背壓原始碼分析

(1)Subscriber介面

介面中的方法與2.x版本中的observer介面中方法相同,即onSubscribeonNextonCompleteonError,原始碼如下:

這裡寫圖片描述

(2)Subscription

在檢視Subscriber介面中的onSubscribe方法,發現其引數型別為Subscription,注意此介面與1.x版本不同!其中request方法是通過響應式拉取解決背壓問題,cancel()方法則是停止傳送資料,清空所有資源。 原始碼如下:

這裡寫圖片描述

(3)FlowableOnSubscribe

再看到呼叫Flowable的create方法時傳入的引數FlowableOnSubscribe,類似於Observable的ObservableOnSubscribe,介面中只有一個subscribe方法,引數為FlowableEmitter,原始碼為:

這裡寫圖片描述

此處的FlowableEmitter同樣類似於Obsa vableEmitter,都繼承於Emitter介面,內含熟悉的三個onNextonCompleteonError方法,上述意見介紹過,此處不再貼上原始碼。

(4)Flowable

首先Flowable抽象類實現了Publisher介面,其中只有一個subscribe方法,這與無背壓版的Observable類似。依舊先從create方法開始分析:

  • create方法

這裡寫圖片描述

原始碼如上,太熟悉的格式了,這裡依舊呼叫的是RxJavaPlugins.onAssembly方法,內部還是回將引數直接返回,所以重點在於引數new FlowableCreate<T>(source, mode)),與Observable類似,都有一個XXXCreate類,再預測其內部必有一個subscribeActual方法,來檢視FlowableCreate原始碼組成:

這裡寫圖片描述

果不其然,格式完全相同,只不過原先實現的observer介面變成了背壓獨有的Subscriber介面,因此方法中的引數變成了Subscriber。此時暫不介紹,等講解至Flowable的subscribe方法時綜合講解。

  • subscribe方法

這裡寫圖片描述

熟悉的邏輯操作,簡直就是將Observable那一套搬過來了,RxJavaPlugins.onSubscribe最後還是將引數Subscriber返回,重點還是在於呼叫Flowable的抽象方法subscribeActual,即呼叫了繼承Flowable的FlowableCreatesubscribeActual方法,檢視其具體實現:

這裡寫圖片描述

由此可見背壓的策略有MISSING、ERROR、DROP、LATEST這幾種,而不同的策略對應不同的BaseEmitter,與Observable做法相同,再包裝傳入的Subscribe引數,接著呼叫Subscribe介面的onSubscribe方法,即t.onSubscribe(emitter);,傳入指定策略對應的Emitter,此時我們使用Flowable例項去subscribe時實現的Subscriber介面中的方法onSubscribe被回撥。接著原始碼中source.subscribe(emitter);,即呼叫FlowableOnSubscribe介面中唯一的subscribe方法,並傳入emitter,意味著訂閱流程開啟,程式碼中建立Flowable時實現的subscribe方法被呼叫,裡面呼叫的onNext等方法依次執行。

  • 方法執行順序解釋

此處的原始碼也解釋了程式碼執行的一個順序,首先執行的是Subscribe介面中的onSubscribe方法,再是FlowableOnSubscribe介面中的subscribe方法,由於在此方法中做了資料傳遞、標誌完成等操作,因此Subscribe介面中的onNextonComplete方法會被呼叫。正因如此,我們才回在最先執行的onSubscribe方法中進行響應式拉取的設定。

  • 背壓策略BaseEmitter

以上基本流程的原始碼分析基本結束,接下來檢視它是如何結局背壓問題,首先來檢視所有背壓策略的父類BaseEmitter,原始碼如下:

這裡寫圖片描述

其實它的作用同Observable中的CreateEmitter相同,都是實現了onNextonCompleteonError相關介面,在其中做了簡單處理,使得最後我們在程式碼中通過FlowableEmitter呼叫onNext相關方法,實際上是通過我們傳入並實現的Subscribe介面本身自己呼叫。但是不同之處在於,BaseEmitter還實現了背壓特有的request(long n)方法,檢視原始碼:

這裡寫圖片描述

首先判斷傳入的引數是否有效(不得小於0),接著呼叫BackpressureHelper的add方法, 傳入BaseEmitter例項和指定的數值,檢視其方法實現:

這裡寫圖片描述

如上所示,首先獲取預設值r,為0,接著呼叫addCap方法,其實就是返回n,最後將AtomicLong型別的requested設定為n,將其返回。你可能感到疑惑,這樣做有什麼意義,以DROP丟棄策略為例,來檢視DropAsyncEmitter具體實現的onNext方法:

這裡寫圖片描述

答案呼之欲出,首先呼叫get方法取出對應的值,之前說過預設值時為0,這意味著如果我們不在最先執行的onSubscribe方法中設定這個值,那麼此處獲取的值為0,它並不會執行actual變數(實質為Subscribe介面例項)的onNext方法!

以上也解決了一開始丟擲的疑問,為何如果沒有在Subscriber介面中的onSubscribe方法中設定響應式拉取,即s.request(Long.MAX_VALUE);onNext方法中列印的log則不會顯示。

6. 背壓總結

在瞭解了RxJava 2.x版本的基本元素使用後,Flowable背壓學習也不算太難,皆是換湯不換藥,通過呼叫onSubscribe介面中的方法,並傳入observer介面,來實現整個訂閱流程的開啟。

(1)Flowable

  • 被觀察者,支援背壓
  • 可通過Observable建立一個可觀察到序列
  • 通過subscribe去註冊一個觀察者

(2)Subscriber

  • 一個單獨介面(並非是1.x版本中的抽象類),和Observer的方法類似
  • 作為Flowable的subscribe方法的引數

(3)Subscription

  • 訂閱,和1.x版本有所不同
  • 支援背壓,有用於背壓的request方法

(4)FlowableOnSubscribe

  • 當訂閱時會出發此介面的呼叫
  • 在Flowable內部,實際作用時向訂閱者釋出資料

(5)Emitter

  • 一個發射資料的介面,和Observer 的方法類似
  • 本質是對Observer和Subscribe的包裝

這裡寫圖片描述

四. 小結

以上將Rxjava 1.x和2.x版本的基本元素原始碼流程發那些完了,類、介面直接的方法互相呼叫可能稍有繁瑣,最後藉助總結部分的UML圖來理解,整體分析下來,你會發現,RxJava 2.x相較於1.x 的改動沒有很大,最值得令人讚頌的便是將背壓單獨獨立出來,避免與Observable混淆在一起,這也導致兩者涉及的介面也隨之增加,在理解2.x 的Observable訂閱套路後,Flowable的套路簡直就是照搬照套,只是多了對背壓策略BaseEmitter的處理,整體結構不變。

原始碼分析完後,可以解惑在分析2.x 原始碼之前存在的兩個疑問:

  • 為何observer介面中onSubscribe方法最先執行,ObservableOnSubscribe介面中的subscribe方法在後?
  • 為何沒有在Subscriber介面中的onSubscribe方法中設定響應式拉取,即s.request(Long.MAX_VALUE);onNext方法不會被呼叫?

再將兩個版本基本元素原理的易混淆點總結:

  • Observable物件create方法:
    • RxJava 1.x 中呼叫Observable的create方法建立例項時,是直接new 一個Observable物件,直接傳入OnSubscribe介面。
    • 2.x 版本是new 一個ObservableCreate物件(繼承於Observable),類似於多做了一層封裝,與之對應的OnSubscribe介面作為其成員變數傳入。
  • Observable物件subscribe方法:
    • RxJava 1.x 內部是通過過載subscribe方法,最終核心處就是呼叫OnSubscribe介面的call方法,傳入Subscriber型別引數正式觸發訂閱事件。
    • 2.x 版本最終呼叫的是一開始建立Observable物件的subscribeActual方法,方法內的最終核心處也是呼叫OnSubscribe介面的subscribe方法,傳入ObservableEmitter型別引數正式觸發訂閱事件。

以上基本的元素原始碼分析後,對兩個版本的瞭解也稍有深入。

若有錯誤,虛心指教~