1. 程式人生 > >RxJava系列2:RxJava簡單入門

RxJava系列2:RxJava簡單入門

一.擴充套件的觀察者模式

Observable和Subscriber能完成任何事情,你的Observable可以是一個數據庫查詢,Subscriber獲得查詢結果然後將其顯示在螢幕上。你的Observable可以是螢幕上的一個點選,Subscriber響應該事件並做處理。你的Observable可以從網路上請求資料,Subscriber將其顯示在介面上,這是一個可以處理任何事情的通用框架。

二.RxJava的優勢

清晰,簡單,更符合人的思維,尤其是跟lambda表示式結合更是如此
函式式風格,方面的建立事件流和資料流
非同步錯誤處理機制:傳統的try cach沒辦法處理非同步計算,Rx提供了合適的錯誤處理機制
輕鬆使用併發

三.RxJava應用場景

RxBinding節流(防止按鈕的重複點選)
輪詢
定時操作
RxPermissions
RxBus
RxJava與Retrofit結合處理網路請求
代替監聽/觀察模型
執行緒管理,提供執行緒池,執行緒切換(Schedulers)
解決巢狀回撥(flatMap)
提供延時,Timer處理(interval)

四.如何學習RxJava

響應式程式設計的主要組成部分是observable、operator和subscriber。一般響應式程式設計的資訊流如下所示:
observable->operator1->operator2->operator3->subscriber
也就是說,observable是事件的生產者,subscriber是事件最終的消費者。
因為subscriber通常在主執行緒中執行,因此設計上要求其程式碼儘可能簡單,只對事件作出響應(不對事件或資料進行修改),而修改事件的工作全部由operator執行。

observable產生資料,中間通過幾個operator,也就是操作符,這個操作符是用來轉換資料的。最終他將發給訂閱者subscriber。也就是說observable是事件的產生者,subscriber是事件的消費者。

Observable負責產生資料,subscriber負責消費資料。
中間的operator負責資料的轉化

至於大量的操作符,使用的時候去查詢就可以了。多在專案中使用才可以精通。

五.類關係圖

1.在RxJava中主要有4個角色:

• Observable
• Subject
• Observer
• Subscriber
Observable和Subject是兩個“生產”實體,Observer和Subscriber是兩個“消費”實體。說直白點Observable對應於觀察者模式中的被觀察者,而Observer和Subscriber對應於觀察者模式中的觀察者。Subscriber其實是一個實現了Observer的抽象類,後面我們分析原始碼的時候也會介紹到。Subject比較複雜,以後再分析。他們具體消費的是事件。

2.Rxjava的事件回撥方法(其實就對應著觀察者的update方法)

與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext() (相當於 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
• onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
• onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
• 在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。

3.Action1 Action0

Action extends Function

public interface Action0 extends Action {void call();
}

public interface Action2<T1, T2> extends Action {void call(T1 t1, T2 t2);
}

public interface Func0<R> extends Function, Callable<R> {
    @Override
    R call();
}

function
public interface Function {

}

public interface Func1<T, R> extends Function {
    R call(T t);
}

簡單解釋一下這段程式碼中出現的 Action1 和 Action0。 Action0 是 RxJava 的一個介面,它只有一個方法 call(),這個方法是無參無返回值的;由於 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝物件,將 onCompleted() 的內容打包起來將自己作為一個引數傳入 subscribe() 以實現不完整定義的回撥。這樣其實也可以看做將 onCompleted() 方法作為引數傳進了subscribe(),相當於其他某些語言中的『閉包』。 Action1 也是一個介面,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個引數;與 Action0 同理,由於 onNext(T obj) 和 onError(Throwable error) 也是單引數無返回值的,因此 Action1可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回撥。事實上,雖然 Action0 和 Action1在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX 形式的介面 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法。

4.Subscriber vs Observer

subscriber = observer + subcription
Subscriber ,它跟 Observer 介面幾乎完全一樣,只是多了兩個方法
onStart() : 它會在 subscribe 剛開始,而事件還未傳送之前被呼叫,可以用於做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實現為空。需要注意的是,如果對準備工作的執行緒有要求(例如彈出一個顯示進度的對話方塊,這必須在主執行緒執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的執行緒被呼叫,而不能指定執行緒。
unsubscribe() : 用於取消訂閱。在這個方法被呼叫後,Subscriber 將不再接收事件。一般在這個方法呼叫前,可以使用 isUnsubscribed() 先判斷一下狀態。 要在不再使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)呼叫 unsubscribe() 來解除引用關係,以避免記憶體洩露的發生。
雖然多了兩個方法,但是基本實現方式跟Observer是一樣的,所以暫時可以不考慮兩者的區別。不過值得注意的是:
實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。

5.執行緒控制

在RxJava中,Scheduler相當於執行緒控制器,可以通過它來指定每一段程式碼執行的執行緒。
RxJava已經內建了幾個Scheduler,下面是總結:
Schedulers.immediate() : 直接在當前執行緒執行,相當於不指定執行緒。這是預設的Scheduler。
Schedulers.newThread() : 總是啟用新執行緒,並在新執行緒執行操作。
Schedulers.io() : I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的Scheduler。行為模式和newThread()差不多,區別在於io()的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下io()比newThread()更有效率。不要把計算工作放在io()中,可以避免建立不必要的執行緒。
Schedulers.computation() : 計算所使用的Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個Scheduler使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在computation()中,否則 I/O 操作的等待時間會浪費 CPU。
AndroidSchedulers.mainThread() ,Android專用執行緒,指定操作在Android主執行緒執行。
那我們如何切換執行緒呢?RxJava中提供了兩個方法:subscribeOn() 和 observeOn() ,兩者的不同點在於:
subscribeOn() : 指定subscribe()訂閱所發生的執行緒,即 call() 執行的執行緒。或者叫做事件產生的執行緒。(指定被觀察者執行的執行緒)
observeOn() : 指定Observer所執行在的執行緒,即onNext()執行的執行緒。或者叫做事件消費的執行緒。(制定觀察者執行的執行緒)

??call和onnext到底是怎麼對應到觀察者模式的

Observable<String>  myObservable  = Observable.create(new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> subscriber) {
      subscriber.onNext("Hello, world!"); //發射一個"Hello, world!"的String
      subscriber.onCompleted();//發射完成,這種方法需要手動呼叫onCompleted,才會回撥Observer的onCompleted方法
  }});

可以看到,這裡傳入了一個 OnSubscribe 物件作為引數。OnSubscribe 會被儲存在返回的 Observable 物件中,它的作用相當於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber將會被呼叫一次 onNext() 和一次 onCompleted())。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
對映到原始碼就是:Observable的subscribe方法。

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

基本實現

建立Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 介面的實現方式:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 介面之外,RxJava 還內建了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。

建立Observable

Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併為它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("world");
        subscriber.onNext("waiter");
        subscriber.onCompleted();
    }
});

這裡傳入了一個 OnSubscribe 物件作為引數。OnSubscribe 會被儲存在返回的 Observable 物件中,它的作用相當於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber 將會被呼叫三次 onNext() 和一次 onCompleted())。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

Subscribe(訂閱)

建立了 Observable 和 Observer 之後,再用 subscribe() 方法將它們聯結起來,整條鏈子就可以工作了。程式碼形式很簡單:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支援不完整定義的回撥,RxJava 會自動根據定義創建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

Action0 是 RxJava 的一個介面,它只有一個方法 call(),這個方法是無參無返回值的;由於 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝物件,將 onCompleted() 的內容打包起來將自己作為一個引數傳入 subscribe() 以實現不完整定義的回撥。這樣其實也可以看做將 onCompleted() 方法作為引數傳進了 subscribe(),相當於其他某些語言中的『閉包』。 Action1 也是一個介面,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個引數;與 Action0 同理,由於 onNext(T obj) 和 onError(Throwable error) 也是單引數無返回值的,因此 Action1可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回撥。事實上,雖然 Action0 和 Action1在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX 形式的介面 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法。

六.來個栗子

Observable<String> myObservable = Observable.create(  
    new Observable.OnSubscribe<String>() {  
        @Override  
        public void call(Subscriber<? super String> sub) {  
            //sub就是觀察者,傳送一個字串
            sub.onNext("Hello, world!");  
            sub.onCompleted();  
        }  
    }  
); 

Subscriber<String> mySubscriber = new Subscriber<String>() {  
    @Override  
    public void onNext(String s) { System.out.println(s); }  

    @Override  
    public void onCompleted() { }  

    @Override  
    public void onError(Throwable e) { }  
};

myObservable.subscribe(mySubscriber);  

參考資料: