1. 程式人生 > >響應式程式設計在Android中的應用

響應式程式設計在Android中的應用

響應式程式設計簡介

  • 響應式程式設計是一種基於非同步資料流概念的程式設計模式。資料流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合併為一條新的流。
  • 響應式程式設計的一個關鍵概念是事件。事件可以被等待,可以觸發過程,也可以觸發其它事件。事件是唯一的以合適的方式將我們的現實世界對映到我們的軟體中:如果屋裡太熱了我們就開啟一扇窗戶。同樣的,當我們更改電子錶(變化的傳播)中的一些數值時,我們需要更新整個表格或者我們的機器人碰到牆時會轉彎(響應事件)。
  • 今天,響應式程式設計最通用的一個場景是UI:我們的移動App必須做出對網路呼叫、使用者觸控輸入和系統彈框的響應。在這個世界上,軟體之所以是事件驅動並響應的是因為現實生活也是如此。

響應式程式設計的具體實現-RxJava

基本概念

RxJava的四種角色

  • Observable
  • Observer
  • Subscriber
  • Subject

Observable和Subject是兩個“生產”實體,Observer和Subscriber是兩個“消費”實體。

熱Observable和冷Observable

從發射物的角度來看,有兩種不同的Observable:熱的和冷的。一個”熱”的Observable典型的只要一建立完就開始發射資料,因此所有後續訂閱它的觀察者可能從序列中間的某個位置開始接受資料(有一些資料錯過了)。一個”冷”的Observable會一直等待,直到有觀察者訂閱它才開始發射資料,因此這個觀察者可以確保會收到整個資料序列。

Observable建立符

  • Observable.create()
Observable.create(new Observable.OnSubscribe<Object>(){
    @Override
    public void call(Subscriber<? super Object> subscriber{
    }
});
  • Observable.from()
    from() 建立符可以從一個列表/陣列來建立Observable,並一個接一個的從列表/陣列中發射出來每一個物件,或者也可以從Java Future 類來建立Observable,併發射Future物件的 .get() 方法返回的結果值。傳入 Future 作為引數時,我們可以指定一個超時的值。Observable將等待來自 Future 的結果;如果在超時之前仍然沒有結果返回,Observable將會觸發 onError() 方法通知觀察者有錯誤發生了。
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);

Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
    System.out.println("Oh,no! Something wrong happened!");
    }
    @Override
    public void onNext(Integer item) {
    System.out.println("Item is " + item);
    }
});
  • Observable.just()
    just() 方法可以傳入一到九個引數,它們會按照傳入的引數的順序來發射它們。 just() 方法也可以接受列表或陣列,就像 from() 方法,但是它不會迭代列表發射每個值,它將會發射整個列表。通常,當我們想發射一組已經定義好的值時會用到它。但是如果我們的函式不是時變性的,我們可以用just來建立一個更有組織性和可測性的程式碼庫。
Observable<String> observableString = Observable.just(helloWorld
());
Subscription subscriptionPrint = observableString.subscribe(new
Observer<String>() {
    @Override
    public void onCompleted() {
    System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
    System.out.println("Oh,no! Something wrong happened!");
    }
    @Override
    public void onNext(String message) {
    System.out.println(message);
    }
});

helloWorld() 方法比較簡單,像這樣:

private String helloWorld(){
    return "Hello World";
}

Subject

Subject 既可以是 Observable,也可以是 Observer。
RxJava 提供四種不同的 Subject :

  • PublishSubject
  • BehaviorSubject
    BehaviorSubject會首先向他的訂閱者傳送截至訂閱前最新的一個數據物件(或初始值),然後正常傳送訂閱後的資料流。

    BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);

    在這個短例子中,我們建立了一個能發射整形(Integer)的BehaviorSubject。由於每當Observes訂閱它時就會發射最新的資料,所以它需要一個初始值。

  • ReplaySubject
    ReplaySubject 會快取它所訂閱的所有資料,向任意一個訂閱它的觀察者重發:

    ReplaySubject<Integer> replaySubject = ReplaySubject.create();
  • AsyncSubject

    當Observable完成時AsyncSubject只會釋出最後一個數據給已經訂閱的每一個觀察者。

    AsyncSubject<Integer> asyncSubject = AsyncSubject.create();

直接建立 Observable

在我們的第一個列子裡,我們將檢索安裝的應用列表並填充RecycleView的item來展示它們。我們也設想一個下拉重新整理的功能和一個進度條來告知使用者當前任務正在執行。

首先,我們建立Observable。我們需要一個函式來檢索安裝的應用程式列表並把它提供給我們的觀察者。我們一個接一個的發射這些應用程式資料,將它們分組到一個單獨的列表中,以此來展示響應式方法的靈活性。

private Observable<AppInfo> getApps(){
    return Observable.create(subscriber -> {
        List<AppInfoRich> apps = new ArrayList<AppInfoRich>();
        final Intent mainIntent = new Intent(Intent.ACTION_MAIN, null);
        mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
        List<ResolveInfo> infos = getActivity().queryIntentActivities(mainIntent, 0);
        for(ResolveInfo info : infos){
            apps.add(new AppInfoRich(getActivity(),info));
        }
        for (AppInfoRich appInfo:apps) {
            Bitmap icon = Utils.drawableToBitmap(appInfo.getIcon());
            String name = appInfo.getName();
            String iconPath = mFilesDir + "/" + name;
            Utils.storeBitmap(App.instance, icon,name);
            if (subscriber.isUnsubscribed()){
                return;
            }
            subscriber.onNext(new AppInfo(name, iconPath, appInfo.getLastUpdateTime()));
        }
        if (!subscriber.isUnsubscribed()){
            subscriber.onCompleted();
        }
    });
}

AppInfo為App資訊的實體類,包括上次更新時間、圖示、名字三個屬性,此處省略。

需要重點注意的是在發射新的資料或者完成序列之前要檢測觀察者的訂閱情況。這樣的話程式碼會更高效,因為如果沒有觀察者等待時我們就不生成沒有必要的資料項。

接下來,我們來定義下拉重新整理的方法:

private void refreshTheList() {
    getApps().toSortedList()
    .subscribe(new Observer<List<AppInfo>>() {
    @Override
    public void onCompleted() {
        Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(List<AppInfo> appInfos) {
        mRecyclerView.setVisibility(View.VISIBLE);
        mAdapter.addApplications(appInfos);
        mSwipeRefreshLayout.setRefreshing(false);
    }
    });
}

從列表建立 Observable

在這個例子中,我們將引入 from() 函式。使用這個特殊的“建立”函式,我們可以從一個列表中建立一個Observable。Observable將發射出列表中的每一個元素,我們可以通過訂閱它們來對這些發出的元素做出響應。

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps).subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

和第一個例子一個主要的不同是我們在 onCompleted() 函式中停掉進度條是因為我們一個一個的發射元素;
第一個例子中的Observable發射的是整個list,因此在 onNext() 函式中停掉進度條的做法是安全的。

具有特殊功能的建立符

  • just()

    你可以將一個函式作為引數傳給 just() 方法,你將會得到一個已存在程式碼的原始Observable版本。在一個新的響應式架構的基礎上遷移已存在的程式碼,這個方法可能是一個有用的開始點。

  • repeat()

    假如你想對一個Observable重複發射三次資料 :

    Observable.just(appOne,appTwo,appThree)
        .repeat(3)
        .subscribe();

    我們在 just() 建立Observable後追加了 repeat(3) ,它將會建立9個元素的序列,每一個都單獨發射。

  • defer()

    有這樣一個場景,你想在這宣告一個Observable但是你又想推遲這個Observable的建立直到觀察者訂閱時。看下面的 getInt() 函式:

    private Observable<Integer> getInt(){
        return Observable.create(subscriber -> {
            if(subscriber.isUnsubscribed()){
                return;
            }
            App.L.debug("GETINT");
            subscriber.onNext(42);
            subscriber.onCompleted();
        });
    }

    這比較簡單,並且它沒有做太多事情,但是它正好為我們服務。現在,我們可以建立一個新的Observable並且應用 defer() :

    Observable<Integer> deferred = Observable.defer(this::getInt);

    這次, deferred 存在,但是 getInt() create() 方法還沒有呼叫 : logcat日誌也沒有“GETINT”打印出來 :

    deferred.subscribe(number -> {
        App.L.debug(String.valueOf(number));
    });

    但是一旦我們訂閱了, create() 方法就會被呼叫並且我們也可以在logcat日誌中打印出兩個值:GETINT 和 42。

  • range()

    從一個指定的數字X開始發射N個數字。range() 函式用兩個數字作為引數:第一個是起始點,第二個是我們想發射數字的個數。

  • interval()

    interval() 函式在你需要建立一個輪詢程式時非常好用。interval() 函式的兩個引數:一個指定兩次發射的時間間隔,另一個是用到的時間單位。

  • timer()

    如果你需要一個一段時間之後才發射的Observable,你可以使用 timer()。

過濾Observables

過濾序列

RxJava讓我們使用 filter() 方法來過濾我們觀測序列中不想要的值。

我們從發出的每個元素中過濾掉開頭字母不是C的 :

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo.getName().startsWith("C");
    }
})

我們傳一個新的 Func1 物件給 filter() 函式,即只有一個引數的函式。 Func1 有一個 AppInfo 物件來作為它的引數型別並且返回 Boolean 物件。只要條件符合 filter() 函式就會返回 true 。此時,值會發射出去並且所有的觀察者都會接收到。

filter() 函式最常用的用法之一時過濾 null 物件:

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo != null;
    }
})

它幫我們免去了在 onNext() 函式呼叫中再去檢測 null 值,讓我們把注意力集中在應用業務邏輯上。

獲取我們需要的資料

當我們不需要整個序列時,而是隻想取開頭或結尾的幾個元素,我們可以用 take() 或 takeLast() 。

  • take()

    take() 函式用整數N來作為一個引數,從原始的序列中發射前N個元素,然後完成:

    Observable.from(apps)
        .take(3)
        .subscribe(...);
  • takeLast()

    如果我們想要最後N個元素,我們只需使用 takeLast() 函式:

    Observable.from(apps)
        .takeLast(3)
        .subscribe(...);

有且僅有一次

  • distinct()

    就像 takeLast() 一樣, distinct() 作用於一個完整的序列,然後得到重複的過濾項,它需要記錄每一個發射的值。如果你在處理一大堆序列或者大的資料記得關注記憶體使用情況。

    Observable<AppInfo> fullOfDuplicates = Observable.from(apps)
        .take(3)
        .repeat(3);
    fullOfDuplicates.distinct()
        .subscribe(...);
  • ditinctUntilChanged()

    如果在一個可觀測序列發射一個不同於之前的一個新值時讓我們得到通知這時候該怎麼做?ditinctUntilChanged() 過濾函式能做到這一點。它能輕易的忽略掉所有的重複並且只發射出新的值。

First and last

first() 方法和 last() 方法很容易弄明白。它們從Observable中只發射第一個元素或者最後一個元素。這兩個都可以傳 Func1 作為引數。
與 first() 和 last() 相似的變數有: firstOrDefault() 和 lastOrDefault() 。這兩個函式當可觀測序列完成時不再發射任何值時用得上。在這種場景下,如果Observable不再發射任何值時我們可以指定發射一個預設的值。

Skip and SkipLast

skip() 和 skipLast() 函式與 take() 和 takeLast() 相對應。它們用整數N作引數,從本質上來說,它們不讓Observable發射前N個或者後N個值。

ElementAt

如果我們只想要可觀測序列發射的第五個元素該怎麼辦? elementAt() 函式僅從一個序列中發射第n個元素然後就完成了。
如果我們想查詢第五個元素但是可觀測序列只有三個元素可供發射時該怎麼辦?我們可以使用 elementAtOrDefault() 。

Sampling

在Observable後面加一個 sample() ,我們將建立一個新的可觀測序列,它將在一個指定的時間間隔裡由Observable發射最近一次的數值:

Observable<Integer> sensor = [...]
sensor.sample(30,TimeUnit.SECONDS)
    .subscribe(...);

如果我們想讓它定時發射第一個元素而不是最近的一個元素,我們可以使用 throttleFirst() 。

Timeout

我們可以使用 timeout() 函式來監聽源可觀測序列,就是在我們設定的時間間隔內如果沒有得到一個值則發射一個錯誤。我們可以認為 timeout() 為一個Observable的限時的副本。如果在指定的時間間隔內Observable不發射值的話,它監聽的原始的Observable時就會觸發 onError() 函式。

Subscription subscription = getCurrentTemperature()
    .timeout(2,TimeUnit.SECONDS)
    .subscribe(...);

Debounce

debounce() 函式過濾掉由Observable發射的速率過快的資料;如果在一個指定的時間間隔過去了仍舊沒有發射一個,那麼它將發射最後的那個。

下圖展示了多久從Observable發射一次新的資料, debounce() 函式開啟一個內部定時器,如果在這個時間間隔內沒有新的據發射,則新的Observable發射出最後一個數據:

 debounce() 函式示意圖

變換Observables

*map家族

RxJava提供了幾個mapping函式: map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有這些函式都作用於一個可觀測序列,然後變換它發射的值,最後用一種新的形式返回它們。

  • Map

    RxJava的 map 函式接收一個指定的 Func 物件然後將它應用到每一個由Observable發射的值上。

    Observable.from(apps)
        .map(new Func1<AppInfo,AppInfo>(){
            @Override
            public Appinfo call(AppInfo appInfo){
                String currentName = appInfo.getName();
                String lowerCaseName = currentName.toLowerCase();
                appInfo.setName(lowerCaseName);
                return appInfo;
            }
        })
        .subscribe(...);

    正如你看到的,像往常一樣建立我們發射的Observable之後,我們追加一個 map 呼叫,我們建立一個簡單的函式來更新 AppInfo物件並提供一個名字小寫的新版本給觀察者。

  • FlatMap

    在複雜的場景中,我們有一個這樣的Observable:它發射一個數據序列,這些資料本身也可以發射Observable。RxJava的 flatMap() 函式提供一種鋪平序列的方式,然後合併這些Observables發射的資料,最後將合併後的結果作為最終的Observable。

     flatMap() 函式示意圖

    當我們在處理可能有大量的Observables時,重要是記住任何一個Observables發生錯誤的情況, flatMap() 將會觸發它自己的 onError() 函式並放棄整個鏈。重要的一點提示是關於合併部分:它允許交叉。正如上圖所示,這意味著 flatMap() 不能夠保證在最終生成的Observable中源Observables確切的發射順序。

  • ConcatMap

    RxJava的 concatMap() 函式解決了 flatMap() 的交叉問題,提供了一種能夠把發射的值連續在一起的鋪平函式,而不是合併它們,如下圖所示:

    這裡寫圖片描述

  • FlatMapIterable

    作為*map家族的一員, flatMapInterable() 和 flatMap() 很像。僅有的本質不同是它將源資料兩兩結成對並生成Iterable,而不是原始資料項和生成的Observables。

  • SwitchMap

    switchMap() 和 flatMap() 很像,除了一點:每當源Observable發射一個新的資料項(Observable)時,它將取消訂閱並停止監視之前那個資料項產生的Observable,並開始監視當前發射的這一個。

  • Scan

    RxJava的 scan() 函式可以看做是一個累積函式。 scan() 函式對原始Observable發射的每一項資料都應用一個函式,計算出函式的結果值,並將該值填充回可觀測序列,等待和下一次發射的資料一起使用。

    作為一個通用的例子,給出一個累加器:

    Observable.just(1,2,3,4,5)
        .scan((sum,item) -> sum + item)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("RXJAVA", "Sequence completed.");
            }
            @Override
            public void onError(Throwable e) {
                Log.e("RXJAVA", "Something went south!");
            }
            @Override
            public void onNext(Integer item) {
                Log.d("RXJAVA", "item is: " + item);
            }
        });

    我們得到的結果是:

    RXJAVA: item is: 1
    RXJAVA: item is: 3
    RXJAVA: item is: 6
    RXJAVA: item is: 10
    RXJAVA: item is: 15
    RXJAVA: Sequence completed.

GroupBy

RxJava提供了一個有用的函式從列表中按照指定的規則: groupBy() 來分組元素。下圖中的例子展示了 groupBy() 如何將發射的值根據他們的形狀來進行分組。

這裡寫圖片描述

這個函式將源Observable變換成一個發射Observables的新的Observable。它們中的每一個新的Observable都發射一組指定的資料。

為了建立一個分組了的已安裝應用列表,我們在 loadList() 函式中引入了一個新的元素:

Observable<GroupedObservable<String,AppInfo>> groupedItems = Observable.from(apps)
            .groupBy(new Func1<AppInfo,String>(){
                @Override
                public String call(AppInfo appInfo){
                    SimpleDateFormat formatter = new SimpleDateFormat("MM/yyyy");
                    return formatter.format(new Date(appInfo.getLastUpdateTime()));
                }
            });

現在我們建立了一個新的Observable, groupedItems ,它將會發射一個帶有 GroupedObservable 的序列。 GroupedObservable 是一個特殊的Observable,它源自一個分組的key。在這個例子中,key就是 String ,代表的意思是 Month/Year 格式化的最近更新日期。

Buffer

RxJava中的 buffer() 函式將源Observable變換一個新的Observable,這個新的Observable每次發射一組列表值而不是一個一個發射。

buffer() 函式有幾種變體。其中有一個是允許你指定一個 skip 值:此後每 skip 項資料,用count項資料填充緩衝區。另一個是buffer() 帶一個 timespan 的引數,會建立一個每隔timespan時間段就會發射一個列表的Observable。

Window

RxJava的 window() 函式和 buffer() 很像,但是它發射的是Observable而不是列表。

正如 buffer() 一樣, window() 也有一個 skip 變體。

Cast

cast() 函式是 map() 操作符的特殊版本。它將源Observable中的每一項資料都轉換為新的型別,把它變成了不同的 Class 。

組合Observables

Merge

在”非同步的世界“中經常會建立這樣的場景,我們有多個來源但是又只想有一個結果:多輸入,單輸出。RxJava的 merge() 方法將幫助你把兩個甚至更多的Observables合併到他們發射的資料項裡。下圖給出了把兩個序列合併在一個最終發射的Observable。

這裡寫圖片描述

正如你看到的那樣,發射的資料被交叉合併到一個Observable裡面。注意如果你同步的合併Observable,它們將連線在一起並且不會交叉。

Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
mergedObserbable.subscribe(...);

注意錯誤時的toast訊息,你可以認為每個Observable丟擲的錯誤都將會打斷合併。如果你需要避免這種情況,RxJava提供了 mergeDelayError() ,它能從一個Observable中繼續發射資料即便是其中有一個丟擲了錯誤。當所有的Observables都完成時, mergeDelayError() 將會發射 onError()。

ZIP

在一種新的可能場景中處理多個數據來源時會帶來:多從個Observables接收資料,處理它們,然後將它們合併成一個新的可觀測序列來使用。RxJava有一個特殊的方法可以完成: zip() 合併兩個或者多個Observables發射出的資料項,根據指定的函式Func* 變換它們,併發射一個新值。下圖展示了 zip() 方法如何處理髮射的“numbers”和“letters”然後將它們合併一個新的資料項:

這裡寫圖片描述

Observable.zip(observableApp, tictoc, (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);

zip() 函式有三個引數:兩個Observables和一個 Func2 。

Join

前面兩個方法, zip() 和 merge() 方法作用在發射資料的範疇內,在決定如何操作值之前有些場景我們需要考慮時間的。RxJava的 join() 函式基於時間視窗將兩個Observables發射的資料結合在一起。

這裡寫圖片描述

為了正確的理解上一張圖,我們解釋下 join() 需要的引數:

  • 第二個Observable和源Observable結合。
  • Func1 引數:在指定的由時間視窗定義時間間隔內,源Observable發射的資料和從第二個Observable發射的資料相互配合返回的Observable。
  • Func1 引數:在指定的由時間視窗定義時間間隔內,第二個Observable發射的資料和從源Observable發射的資料相互配合返回的Observable。
  • Func2 引數:定義已發射的資料如何與新發射的資料項相結合。

combineLatest

RxJava的 combineLatest() 函式有點像 zip() 函式的特殊形式。正如我們已經學習的, zip() 作用於最近未打包的兩個Observables。相反, combineLatest() 作用於最近發射的資料項:如果 Observable1 發射了A並且 Observable2 發射了B和C, combineLatest() 將會分組處理AB和AC,如下圖所示:

這裡寫圖片描述

And,Then和When

在將來還有一些 zip() 滿足不了的場景。如複雜的架構,或者是僅僅為了個人愛好,你可以使用And/Then/When解決方案。它們在RxJava的joins包下,使用Pattern和Plan作為中介,將發射的資料集合併到一起。

這裡寫圖片描述

Switch

給出一個發射多個Observables序列的源Observable, switch() 訂閱到源Observable然後開始發射由第一個發射的Observable發射的一樣的資料。當源Observable發射一個新的Observable時, switch() 立即取消訂閱前一個發射數
據的Observable(因此打斷了從它那裡發射的資料流)然後訂閱一個新的Observable,並開始發射它的資料。

StartWith

RxJava的 startWith() 是 concat() 的對應部分。正如 concat() 向發射資料的Observable追加資料那樣,在Observable開始發射他們的資料之前,startWith() 通過傳遞一個引數來先發射一個數據序列。

Schedulers-解決Android主執行緒問題

Schedulers

排程器以一種最簡單的方式將多執行緒用在你的Apps的中。它們時RxJava重要的一部分並能很好地與Observables協同工作。它們無需處理實現、同步、執行緒、平臺限制、平臺變化而可以提供一種靈活的方式來建立併發程式。

RxJava提供了5種排程器:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()
Schedulers.io()

這個排程器時用於I/O操作。它基於根據需要,增長或縮減來自適應的執行緒池。我們將使用它來修復我們之前看到的 StrictMode 違規做法。由於它專用於I/O操作,所以並不是RxJava的預設方法;正確的使用它是由開發者決定的。

重點需要注意的是執行緒池是無限制的,大量的I/O排程操作將建立許多個執行緒並佔用記憶體。一如既往的是,我們需要在效能和簡捷兩者之間找到一個有效的平衡點。

Schedulers.computation()

這個是計算工作預設的排程器,它與I/O操作無關。它也是許多RxJava方法的預設排程器: buffer() , debounce() , delay() , interval() , sample() , skip()。

Schedulers.immediate()

這個排程器允許你立即在當前執行緒執行你指定的工作。它是 timeout() , timeInterval() ,以及 timestamp() 方法預設的排程器。

Schedulers.newThread()

這個排程器正如它所看起來的那樣:它為指定任務啟動一個新的執行緒。

Schedulers.trampoline()

當我們想在當前執行緒執行一個任務時,並不是立即,我們可以用 .trampoline() 將它入隊。這個排程器將會處理它的佇列並且按序執行佇列中每一個任務。它是 repeat() 和 retry() 方法預設的排程器。

非阻塞I/O操作

使用 Schedulers.io() 建立非阻塞的版本:

public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
    Schedulers.io().createWorker().schedule(() -> {
        blockingStoreBitmap(context, bitmap, filename);
    });
}

SubscribeOn and ObserveOn

我們學到了如何在一個排程器上執行一個任務。但是我們如何利用它來和Observables一起工作呢?RxJava提供了 subscribeOn() 方法來用於每個Observable物件。 subscribeOn() 方法用 Scheduler 來作為引數並在這個Scheduler上執行Observable呼叫。

首先,我們需要一個新的 getApps() 方法來檢索已安裝的應用列表:

private Observable<AppInfo> getApps() {
    return Observable.create(subscriber -> {
        List<AppInfo> apps = new ArrayList<>();
        SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
        Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
        String serializedApps = sharedPref.getString("APPS", "");
        if (!"".equals(serializedApps)) {
            apps = new Gson().fromJson(serializedApps,appInfoType);
        }
        for (AppInfo app : apps) {
            subscriber.onNext(app);
        }
        subscriber.onCompleted();
    });
}

然後,我們所需要做的是指定 getApps() 需要在排程器上執行:

getApps().subscribeOn(Schedulers.io())
    .subscribe(new Observer<AppInfo>() { [...]

最後,我們只需在 loadList() 函式新增幾行程式碼,那麼每一項就都準備好了:

getApps()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [...]

observeOn() 方法將會在指定的排程器上返回結果:如例子中的UI執行緒。 onBackpressureBuffer() 方法將告訴Observable發射的資料如果比觀察者消費的資料要更快的話,它必須把它們儲存在快取中並提供一個合適的時間給它們。

處理耗時的任務

一個與I/O無關的耗時的任務:

getObservableApps(apps)
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [...]

總結

RxJava提供了一種以面向時序的方式考慮資料的機會:所有事情都是持續變化的,資料在更新,事件在觸發,然後你就可以建立事件響應式的、靈活的、執行流暢的App。

謹記可觀測序列就像一條河:它們是流動的。你可以“過濾”(filter)一條河,你可以“轉換”(transform)一條河,你可以將兩條河合併(combine)成一個,然後依然暢流如初。最後,它就成了你想要的那條河。

“Be Water,my friend” - Bruce Lee