1. 程式人生 > >RxJava 2.x 教程及原始碼揭祕(一)入門理解及基本操作符

RxJava 2.x 教程及原始碼揭祕(一)入門理解及基本操作符

目錄

前言

Rxjava的介紹

Rxjava的優勢

Rxjava是觀察者模式

Rxjava是裝飾者模式

Observable

Rxjava的操作符

subScribeOn與observeOn切換執行緒

其他操作符

補充


前言

     本人從一年多前開始使用rxjava,可以說是見證了這個工具越來越流行,目前許多公司的android專案也都運用這個工具。Rxjava有著其獨特的優勢,也經常會被提及。對於這個東西,會用的人覺得他如此的好用,不理解的人卻覺得他非常難以上手。

     那麼本文就將對Rxjava做一次詳細的探索,相信跟隨著博主的思路帶著自己的思考,你一定也能掌握使用這個熱門技術的要領。不僅如此,我們更要深入瞭解其內部原理,做到知其然,並且知其所以然。那麼,下面我們一起來揭開Rxjava神祕的面紗。

 

Rxjava的介紹

Rxjava在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)。

從本質上看,這就是一個可以實現非同步操作的庫。

Rxjava的優勢

有的同學看完Rxjava的介紹不免產生了疑惑,非同步?那為什麼不用Handler,AsyncTask這些來實現呢?

Rxjava自然具備了其優勢,不然不可能會得到廣泛運用。

1.Rxjava符合響應式程式設計的思路,這讓他與另一款非常流行的網路請求框架Retrofit相輔相成,可以說android中Retrofit + OkHttp 是目前最主流的網路框架。

2.其另一個特點是支援函數語言程式設計,這保證了他的簡潔性。當你的業務需求越來越多,其他實現過程會越來越亂,產生迷之縮排。RxJava提供的一些方法,比如map,flatmap,可以很好的幫我們處理複雜的邏輯。Rxjava隨著程式邏輯變得越來越複雜,它依然能夠保持簡潔,這也是一個非常棒的優勢。

3.其實第三點嚴格來說也是第二點用一個優勢的不同體現,他對於任務的一些操作非常方便。比如非常方便我們執行緒的切換,無需手動建立子執行緒。

假設有這樣一個需求:介面上有一個自定義的檢視 imageCollectorView ,它的作用是顯示多張圖片,並能使用 addImage(Bitmap) 方法來任意增加顯示的圖片。現在需要程式將一個給出的目錄陣列 File[] folders 中每個目錄下的 png 圖片都加載出來並顯示在 imageCollectorView 中。需要注意的是,由於讀取圖片的這一過程較為耗時,需要放在後臺執行,而圖片的顯示則必須在 UI 執行緒執行。常用的實現方式有多種,我這裡貼出其中一種:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而如果使用 RxJava ,實現方式是這樣的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

那位說話了:『你這程式碼明明變多了啊!簡潔個毛啊!』大兄弟你消消氣,我說的是邏輯的簡潔,不是單純的程式碼量少(邏輯簡潔才是提升讀寫程式碼速度的必殺技對不?)。觀察一下你會發現, RxJava 的這個實現,是一條從上到下的鏈式呼叫,沒有任何巢狀,這在邏輯的簡潔性上是具有優勢的。當需求變得複雜時,這種優勢將更加明顯(試想如果還要求只選取前 10 張圖片,常規方式要怎麼辦?如果有更多這樣那樣的要求呢?再試想,在這一大堆需求實現完兩個月之後需要改功能,當你翻回這裡看到自己當初寫下的那一片迷之縮排,你能保證自己將迅速看懂,而不是對著程式碼重新捋一遍思路?)。

另外,如果你的 IDE 是 Android Studio ,其實每次開啟某個 Java 檔案的時候,你會看到被自動 Lambda 化的預覽,這將讓你更加清晰地看到程式邏輯:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

我們可以看到,通過Rxjava,所有程式碼都是鏈式展開的,一氣呵成,思路清晰,沒有了迷之縮

     這裡就有人會擡槓,就為了看起來簡潔,我要費這麼大的勁學這東西。這裡我要多嘮叨幾句,作為一個有追求的程式設計師,我們應該有程式碼潔癖的好習慣,自己寫的程式碼,除了能滿足業務需求以外,還要追求程式碼的可讀性,思考如何讓其後期更好維護擴充套件,思考是否滿足六大基本原則,面向物件思想¥#%@*¥%@#。。。

說了一大堆,其實,在移動端,很多功能是大家都能實現的,然而真正區分大家水平的,是看你程式碼實現的過程是否合理,執行起來效能是否滿足要求。好的程式設計師寫的程式碼傻子都能看懂,而水平差的寫出來的程式碼只有自己能看懂。

一個剛學程式設計的人,你去跟他扯架構框架,設計模式,他怎麼可能理解,很多時候,程式碼多打幾行,這些自然就理解了。

 

Rxjava是觀察者模式

初學者可能會問,什麼是觀察者模式?

採用註冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。舉個安卓中最典型的例子吧,我們空間的點選事件監聽就是一種觀察者模式,對設定 OnClickListener 來說, View 是被觀察者, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法達成訂閱關係。訂閱之後使用者點選按鈕的瞬間,Android Framework 就會將點選事件傳送給已經註冊的 OnClickListener 。

採取這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也能夠得到最高的反饋速度。

我們來對比下OnclickListener和RxJava

OnClickListener 觀察者模式

通用觀察者模式

Buton持有OnClickListener的引用,當Button觸發了點選事件,呼叫OnClickListene的OnClik方法,這就是觀察者的流程,相信集合上面圖片的對比,你已經知道了RxJava中各部分分別對應的角色。

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。ObservableObserver 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer

除了OnNext()之外,還有其他事件。

  • onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
  • onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
  • 在一個正確執行的事件序列中, onCompleted()onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted()onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個
  •  

在 RxJava 2.x 中,Observable 用於訂閱 Observer,不再支援背壓(1.x 中可以使用背壓策略),而 Flowable 用於訂閱 Subscriber , 是支援背壓(Backpressure)的。

Rxjava是裝飾者模式

rxjava的所有操作,基本上都是lift實現的,運用的是裝飾者模式,這裡由於涉及到原理部分,不在此處展開,詳情看這裡

Observable

在 RxJava 1.x 中,我們最熟悉的莫過於 Observable 這個類了,在 RxJava 2.x 中建立了 一個 ObservableSubscriber 都沒了,取而代之的是 ObservableEmmiter,俗稱發射器。此外,由於沒有了Subscriber的蹤影,我們建立觀察者時需使用 Observer。而 Observer 也不是我們熟悉的那個 Observer,又出現了一個 Disposable 引數。

 RxJava 的三部曲

 

 

** 第一步:初始化 Observable **
** 第二步:初始化 Observer **
** 第三步:建立訂閱關係 **
 


 

Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() { // 第三步:訂閱

            // 第二步:初始化Observer
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {      
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n" );
            }
        });

不難看出,RxJava 2.x 與 1.x 還是存在著一些區別的。首先,建立 Observable 時,回撥的是 ObservableEmitter ,字面意思即發射器,並且直接 throws Exception。其次,在建立的 Observer 中,也多了一個回撥方法:onSubscribe,傳遞引數為DisposableDisposable 相當於 RxJava 1.x 中的 Subscription, 用於解除訂閱。可以看到示例程式碼中,在 i 自增到 2 的時候,訂閱關係被切斷。

07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false
07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1
07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1
07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2
07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2
07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true
07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3
07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4

RxJava的流程是這樣的

關係靜圖

Rxjava的操作符

講了Rxjava的基本建立,那麼我們來了解下他的一些強大快捷的操作符,這也是為什麼Rxjava如此流行的一個原因

subScribeOn與observeOn切換執行緒

這兩者是我們執行緒切換的操作符,subScribeOn決定了被觀察者所在的執行緒也就是subscribe() 時所發生的執行緒,而observeOn決定觀察者所線上程也就是下游 Observer 回調發生的執行緒。。

執行緒切換需要注意的

RxJava 內建的執行緒排程器的確可以讓我們的執行緒切換得心應手,但其中也有些需要注意的地方。

  • 簡單地說,subscribeOn() 指定的就是發射事件的執行緒,observerOn 指定的就是訂閱者接收事件的執行緒。
  • 多次指定發射事件的執行緒只有第一次指定的有效,也就是說多次呼叫 subscribeOn() 只有第一次的有效,其餘的會被忽略。
  • 但多次指定訂閱者接收執行緒是可以的,也就是說每呼叫一次 observerOn(),下游的執行緒就會切換一次。

 

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                    }
                });

輸出:

07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2

例項程式碼中,分別用 Schedulers.newThread()Schedulers.io() 對發射執行緒進行切換,並採用 observeOn(AndroidSchedulers.mainThread()Schedulers.io() 進行了接收執行緒的切換。可以看到輸出中發射執行緒僅僅響應了第一個 newThread,但每呼叫一次 observeOn() ,執行緒便會切換一次,因此如果我們有類似的需求時,便知道如何處理了。

RxJava 中,已經內建了很多執行緒選項供我們選擇,例如有:

  • Schedulers.io() 代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作;
  • Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作;
  • Schedulers.newThread() 代表一個常規的新執行緒;
  • AndroidSchedulers.mainThread() 代表Android的主執行緒

這些內建的 Scheduler 已經足夠滿足我們開發的需求,因此我們應該使用內建的這些選項,而 RxJava 內部使用的是執行緒池來維護這些執行緒,所以效率也比較高。

其他操作符

map實現關係轉換

map 操作符可以將一個 Observable 物件通過某種關係轉換為另一個Observable 物件。在 2.x 中和 1.x 中作用幾乎一致,不同點在於:2.x 將 1.x 中的 Func1Func2 改為了 FunctionBiFunction

採用 map 操作符進行網路資料解析

想必大家都知道,很多時候我們在使用 RxJava 的時候總是和 Retrofit 進行結合使用,而為了方便演示,這裡我們就暫且採用 OkHttp3 進行演示,配合 mapdoOnNext ,執行緒切換進行簡單的網路請求:
1)通過 Observable.create() 方法,呼叫 OkHttp 網路請求;
2)通過 map 操作符集合 gson,將 Response 轉換為 bean 類;
3)通過 doOnNext() 方法,解析 bean 中的資料,並進行資料庫儲存等操作;
4)排程執行緒,在子執行緒中進行耗時操作任務,在主執行緒中更新 UI ;
5)通過 subscribe(),根據請求成功或者失敗來更新 UI 。

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:轉換前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext: 儲存成功:" + s.toString() + "\n");
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
                    }
                });

flatMap()實現鏈式程式設計

flatMap(): 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。 首先假設這麼一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

很簡單。那麼再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區別在於,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現:

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);

依然很簡單。那麼如果我不想在 Subscriber 中使用 for 迴圈,而是希望 Subscriber 中直接傳入單個的 Course 物件呢(這對於程式碼複用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉化,而我現在的要求是一對多的轉化。那怎麼才能把一個 Student 轉化成多個 Course 呢?

這個時候,就需要用 flatMap() 了:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

從上面的程式碼可以看出, flatMap()map() 有一個相同點:它也是把傳入的引數轉化之後返回另一個物件。但需要注意,和 map() 不同的是, flatMap() 中返回的是個 Observable 物件,並且這個 Observable 物件並不是被直接傳送到了 Subscriber 的回撥方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件物件建立一個 Observable 物件;2. 並不傳送這個 Observable, 而是將它啟用,於是它開始傳送事件;3. 每一個創建出來的 Observable 傳送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回撥方法。這三個步驟,把事件拆成了兩級,通過一組新建立的 Observable 將初始的物件『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。

它可以把一個發射器 Observable 通過某種方法轉換為多個 Observables,然後再把這些分散的 Observables裝進一個單一的發射器 Observable。但有個需要注意的是,flatMap 並不能保證事件的順序,如果需要保證,需要用到我們下面要講的 ConcatMap

flatMap() 示意圖:

flatMap() 示意圖

flatMap 的實際應用

實現多個網路請求依次依賴

舉個例子。使用者註冊成功後需要自動登入,我們只需要先通過註冊介面註冊使用者資訊,註冊成功後馬上呼叫登入介面進行自動登入即可。

我們的 flatMap 恰好解決了這種應用場景,flatMap 操作符可以將一個發射資料的 Observable 變換為多個 Observables ,然後將它們發射的資料合併後放到一個單獨的 Observable,利用這個特性,我們很輕鬆地達到了我們的需求。

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,並解析到FootList
                .subscribeOn(Schedulers.io())        // 在io執行緒進行網路請求
                .observeOn(AndroidSchedulers.mainThread()) // 在主執行緒處理獲取食品列表的請求結果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根據獲取食品列表的響應結果做一些操作
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                        mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 執行緒去處理獲取食品詳情的請求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                        mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                        mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                    }
                });

zip 實現多個介面資料共同更新 UI

在實際應用中,我們極有可能會在一個頁面顯示的資料來源於多個介面,這時候我們的 zip 操作符為我們排憂解難。

zip 操作符可以將多個 Observable 的資料結合為一個數據源再發射出去。

Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android",1,1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                return "合併後的資料為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 失敗:" + throwable+"\n");
                    }
                });

 

concat決定邏輯順序

concat 可以做到不交錯的發射兩個甚至多個 Observable 的發射事件,並且只有前一個 Observable 終止(onComplete) 後才會訂閱下一個 Observable

 

採用 concat 操作符先讀取快取再通過網路請求獲取資料

想必在實際應用中,很多時候(對資料操作不敏感時)都需要我們先讀取快取的資料,如果快取沒有資料,再通過網路請求獲取,隨後在主執行緒更新我們的UI。

concat 操作符簡直就是為我們這種需求量身定做。

利用 concat 的必須呼叫 onComplete 後才能訂閱下一個 Observable 的特性,我們就可以先讀取快取資料,倘若獲取到的快取資料不是我們想要的,再呼叫 onComplete() 以執行獲取網路資料的 Observable,如果快取資料能應我們所需,則直接呼叫 onNext(),防止過度的網路請求,浪費使用者的流量。

Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create當前執行緒:"+Thread.currentThread().getName() );
                FoodList data = CacheManager.getInstance().getFoodListData();

                // 在操作符 concat 中,只有呼叫 onComplete 之後才會執行下一個 Observable
                if (data != null){ // 如果快取資料不為空,則直接讀取快取資料,而不讀取網路資料
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 讀取快取資料:" );
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取快取資料:\n");
                        }
                    });

                    e.onNext(data);
                }else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取網路資料:\n");
                        }
                    });
                    Log.e(TAG, "\nsubscribe: 讀取網路資料:" );
                    e.onComplete();
                }


            }
        });

        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows",10+"")
                .build()
                .getObjectObservable(FoodList.class);


        // 兩個 Observable 的泛型應當保持一致

        Observable.concat(cache,network)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList tngouBeen) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                        if (isFromNet){
                            mRxOperatorsText.append("accept : 網路獲取資料設定快取: \n");
                            Log.e(TAG, "accept : 網路獲取資料設定快取: \n"+tngouBeen.toString() );
                            CacheManager.getInstance().setFoodListData(tngouBeen);
                        }

                        mRxOperatorsText.append("accept: 讀取資料成功:" + tngouBeen.toString()+"\n");
                        Log.e(TAG, "accept: 讀取資料成功:" + tngouBeen.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
                        Log.e(TAG, "accept: 讀取資料失敗:"+throwable.getMessage() );
                        mRxOperatorsText.append("accept: 讀取資料失敗:"+throwable.getMessage()+"\n");
                    }
                });

有時候我們的快取可能還會分為 memory 和 disk ,實際上都差不多,無非是多寫點 Observable ,然後通過 concat 合併即可。

採用 interval 操作符實現心跳間隔任務

這裡就簡單的意思一下輪訓。

private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : "+aLong );
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: 設定文字 :"+aLong );
                        mRxOperatorsText.append("accept: 設定文字 :"+aLong +"\n");
                    }
                });
    }

    /**
     * 銷燬時停止心跳
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }

 

 


補充

到這裡,你就瞭解了Rxjava的基本特性以及它的一些操作技巧,相信認真看完,你也就能感受到Rxjava的強大之處。

而且,Rxjava配合Retrofit來使用,更能發揮它響應式程式設計的特點,此外,RxBinding,RxBus也是一些不錯的擴充套件,這裡由於篇幅有限不做展開。

當然,作為一個有抱負的程式設計師,我們不應僅僅滿足於會用,還要去了解他的內部實現原理,從中看出其包涵的一些程式設計思想,去學習他們對設計模式的一些運用。這裡由於篇幅太長了,關於Rxjava的原理部分我會另寫一篇,帶你透徹理解Rxjava是如何實現執行緒的切換,以及如何通過map,flatmap這些操作符來包裝我們的業務邏輯。


本文部分借鑑 https://www.jianshu.com/p/0cd258eecf60