1. 程式人生 > >Android異步框架RxJava 1.x系列(一) - 觀察者模式及實現

Android異步框架RxJava 1.x系列(一) - 觀察者模式及實現

from 預覽 目錄 ole 普通 清零 handler 使用 tps

Android異步框架RxJava 1.x系列(一) - 觀察者模式及實現

前言

RxJava 是一款基於 Java VM 實現的響應式編程擴展庫 - 基於觀察者模式的異步和事件處理框架。RxJava 官方目前同時維護了兩個版本,分別是 1.x2.x,區別是它們使用不同的 group idnamespaces

技術分享圖片

版本group idnamespaces
v1.x io.reactivex io.reactivex
v2.x io.reactivex.rxjava2 rx

本系列的文章將針對 RxJava 1.x 進行介紹,先給出 Github 的地址:

  • RxJava:github.com/ReactiveX/R…
  • RxAndroid:github.com/ReactiveX/R…

通過 Gradle 引入相關依賴:

compile ‘io.reactivex:rxjava:1.0.14‘ 
compile ‘io.reactivex:rxandroid:1.0.1‘ 
復制代碼

正文

1. RxJava的定義

一個精準的解釋如下:RxJava 是一個運行於 Java VM ,由可觀測序列組成的,異步、基於事件的函數庫。

2. RxJava的優點

換句話說,『同樣是做異步,為什麽人們用它,而不用現成的 AsyncTask / Handler / XXX / ... ?』

一個詞:簡潔。

異步操作很關鍵的一點是程序的簡潔性,因為在調度過程比較復雜的情況下,異步代碼經常會既難寫也難被讀懂。 Android 創造的 AsyncTask 和Handler,其實都是為了讓異步代碼更加簡潔。RxJava 的優勢也是簡潔,但它的簡潔的與眾不同之處在於,隨著程序邏輯變得越來越復雜,它依然能夠保持簡潔。

技術分享圖片

Android 開發中,假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView,它的作用是顯示多張圖片,並能使用 addImage(Bitmap) 方法來任意增加顯示的圖片。現在需要程序將一個給出的目錄數組 File[] folders

中每個目錄下的 png 圖片都加載出來並顯示在 imageCollectorView 中。

註意: 由於讀取圖片的過程較為耗時,需要放在後臺執行,而圖片的顯示則必須在 UI 線程執行。

常用的實現方式有多種,這裏給出其中一種:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
復制代碼

而如果使用 RxJava,實現方式是這樣的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });
復制代碼

可以發現,使用 RxJava 方式代碼量明顯大大增加,所謂簡潔從何而來?

這裏說的簡潔是指的邏輯上的。觀察一下你會發現,RxJava 的這個實現,是一條從上到下的鏈式調用,沒有任何嵌套,這在邏輯的簡潔性上是具有優勢的。當需求變得復雜時,這種優勢將更加明顯(試想如果還要求只選取前 10 張圖片,常規方式要怎麽辦?如果有更多這樣那樣的要求呢?再試想,在這一大堆需求實現完兩個月之後需要改功能,當你翻回這裏看到自己當初寫下的那一片迷之縮進,你能保證自己將迅速看懂,而不是對著代碼重新捋一遍思路?)。

另外,如果你的 IDEAndroid Studio,其實每次打開某個 Java 文件的時候,你會看到被自動 Lambda 化的預覽,這將讓你更加清晰地看到程序邏輯:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
復制代碼

所以,RxJava 有啥優點?就好在簡潔,優點就是把復雜邏輯,通過函數式編程模型穿成一條線。

3. 觀察者模式的擴展

RxJava 的異步實現,是通過一種擴展的觀察者模式來實現的。

3.1. 通用的觀察者模式

觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。

舉個例子,新聞裏喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕。在這個例子裏,警察是觀察者,小偷是被觀察者,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間。

程序的觀察者模式略有不同,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態),而是采用註冊( Register )或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某種狀態,你要在它變化的時候通知我。

采取這樣被動的觀察方式,既省去了反復檢索狀態的資源消耗,也能夠得到最高的反饋速度。

Android 開發中一個典型的例子是點擊監聽器 OnClickListener 。對設置 OnClickListener來說,View 是被觀察者,OnClickListener 是觀察者,二者通過 setOnClickListener() 方法達成訂閱關系。訂閱之後用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發送給已註冊的 OnClickListener 。

OnClickListener 的觀察者模式大致如下圖:

技術分享圖片

如圖所示,通過 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(這一過程沒有在圖上畫出)。當用戶點擊時,Button 自動調用 OnClickListeneronClick() 方法。

按照觀察者模式抽象出來的各個概念:

  • Button: 被觀察者
  • OnClickListener: 觀察者
  • setOnClickListener(): 訂閱
  • onClick(): 事件處理

就由專用的觀察者模式轉變成了通用的觀察者模式,如下圖:

技術分享圖片

3.2. RxJava的觀察者模式

RxJava 有四個基本概念:

  • Observable: 可觀察者,即被觀察者
  • Observer: 觀察者
  • Subscribe: 訂閱
  • Event: 事件處理

ObservableObserver 通過 subscribe() 方法實現訂閱關系,使得Observable 可以在需要的時候發出事件來通知 Observer

與傳統觀察者模式不同,RxJava 的事件回調方法除了普通事件 onNext() (相當於 onClick()) 之外,還定義了兩個特殊的事件:onCompleted()onError()

  • onCompleted(): 事件隊列完結

RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為事件完成標誌。

  • onError(): 事件隊列異常

在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。

在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個被調用,並且是事件序列中的最後一個執行。

RxJava 的觀察者模式大致如下圖:

技術分享圖片

4. RxJava的基本使用

基於以上的概念,RxJava 的基本使用有 3 個步驟:

4.1. 創建Obsever

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 接口的聲明方式:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error: " + e.getMessage());
    }
};
復制代碼

除了 Observer 接口之外,RxJava 還內置了一個實現了 Observer 的抽象類:SubscriberSubscriberObserver 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error: " + e.getMessage());
    }
};
復制代碼

實質上,在 RxJavasubscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber再使用。所以如果你只想使用基本功能,選擇 ObserverSubscriber 是完全一樣的。它們的區別對於使用者來說主要有兩點:

  • onStart()

這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用。可以用於做一些準備工作,例如數據的清零或重置。這是一個可選方法,默認情況下它的實現為空。

需要註意的是,如果對準備工作的線程有要求(例如: 彈出一個顯示進度的對話框,這必須在主線程執行),onStart() 就不適用了。因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的章節中看到。

  • unsubscribe()

這是 Subscriber 所實現的另一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀態。

unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用。這個引用如果不能及時被釋放,將有內存泄露的風險。

註意:在不再使用的時候盡快在合適的地方(例如: onPause()onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免內存泄露的發生。

4.2. 創建Obsevable

4.2.1. Obsevable.create()

Observable 即被觀察者,它決定什麽時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來創建一個 Observable ,並為它定義事件觸發規則。示例如下:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});
復制代碼

可以看到,這裏傳入了一個 OnSubscribe 對象作為參數。OnSubscribe 會被存儲在返回的 Observable 對象中。

它的作用相當於一個計劃表,當 Observable 被訂閱的時候,OnSubscribecall() 方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted())。

這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

4.2.2. Obsevable.just(T...)

create() 方法是 RxJava 最基本的創建事件序列的方法。基於這個方法,RxJava 還提供了一些方法用於快捷創建事件隊列,例如 just() 方法:

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted()
復制代碼

4.2.3. Obsevable.from(T[])和from(Iterable<? extends T>)

將傳入的數組或 Iterable 拆分成具體對象後,依次發送給觀察者,示例如下:

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted()
復制代碼

4.3. Subscribe關聯

創建了 ObservableObserver 之後,再用 subscribe() 方法將它們關聯起來,整條鏈子就可以工作了。代碼很簡單:

observable.subscribe(observer);
// 或者
observable.subscribe(subscriber);
復制代碼

可能會註意到,subscribe() 這個方法有點怪:它看起來是『observable 訂閱了 observer / subscriber』,而不是『observer / subscriber 訂閱了 observable』。這看起來就像『雜誌訂閱了讀者』一樣顛倒了對象關系。

這讓人讀起來有點別扭,不過如果把 API 設計成 『observer.subscribe(observable) / subscriber.subscribe(observable)』,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了,比較起來明顯是得不償失的。

Observable.subscribe(Subscriber) 的內部實現是這樣的(核心代碼):

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
復制代碼

可以看到subscriber() 做了3件事:

(a). 調用Subscriber.onStart()

這個方法在前面已經介紹過,是一個可選的準備方法。

(b). 調用Observable中的OnSubscribe.call(Subscriber)

事件發送的邏輯開始運行。從這也可以看出,在RxJava中,Observable並不是在創建的時候就立即開始發送事件,而是在它被訂閱的時候,即當subscribe()方法執行的時候。

(c). 返回Subscription

將傳入的Subscriber作為Subscription返回。這是為了方便後面的unsubscribe()。

整個過程中對象間的關系如下圖:

技術分享圖片

或者可以看動圖:

技術分享圖片

除了 subscribe(Observer)subscribe(Subscriber)subscribe() 還支持不完整定義的回調,RxJava 會自動根據定義創建出 Subscriber。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動創建 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創建 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創建 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
復制代碼

簡單解釋一下這段代碼中出現的 Action1Action0

  • Action0

Action0RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的。由於 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝對象,將 onCompleted() 的內容打包起來將自己作為一個參數傳入 subscribe() 以實現不完整定義的回調。

  • Action1

Action1 也是一個接口,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個參數。與 Action0 同理,由於 onNext(T obj)onError(Throwable error) 也是單參數無返回值的,因此 Action1 可以將 onNext(obj)onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回調。

事實上,雖然 Action0Action1API 中使用最廣泛,但 RxJava 提供了多個 ActionX 形式的接口 (例如: Action2, Action3),它們可以被用以包裝不同的無返回值的方法。

4.4. 場景示例

4.4.1. 打印字符串數組

將字符串數組 names 中的所有字符串依次打印出來:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });
復制代碼

4.4.2. 由ID取得圖片顯示

int drawableRes = ...;
ImageView imageView = ...;

Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});
復制代碼

正如上面兩個例子這樣,創建出 ObservableSubscriber,再用 subscribe() 將它們串起來,一次 RxJava 的基本使用就完成了,非常簡單!

然而。

技術分享圖片

小結

RxJava 的默認規則中,事件的發出和消費都是在同一個線程的。也就是說,如果只用上面的方法,實現出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『後臺處理,前臺回調』的異步機制,因此異步對於 RxJava 是至關重要的。而要實現異步,則需要用到 RxJava 的另一個核心的概念 Scheduler,後續將給出詳細介紹。


歡迎關註技術公眾號: 零壹技術棧

技術分享圖片

本帳號將持續分享後端技術幹貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分布式和微服務,架構學習和進階等學習資料和文章。

Android異步框架RxJava 1.x系列(一) - 觀察者模式及實現