1. 程式人生 > >RxJava 響應式程式設計初探

RxJava 響應式程式設計初探

前言

最近在學習Spring Cloud Hystrix框架,發現裡面的程式碼寫法跟傳統的程式設計思路很不一樣,充滿了回撥和Lamda風格,於是去深入瞭解了一下,這便是本文的主題 RxJava 響應式程式設計。

RxJava

我們一般寫的程式,以流程為核心的,每一行程式碼實際上都是機器實際上要執行的指令,稱之為命令式程式。而RxJava這樣的程式設計風格,稱為函式響應式程式設計函式響應式程式設計是以資料流為核心,處理資料的輸入,處理以及輸出的,這種思路寫出來的程式碼就會跟機器實際執行的指令大相徑庭。既然涉及到響應,便會想到觀察者模式,的確,RxJava 是基於Java觀察者設計模式的。

RxJava 最核心的兩個東西是Observable(被觀察者,事件源)和Subscriber(觀察者)。Observable 發出一系列事件,Subscriber 處理這些事件。

一個 Observable 可以發出零個或者多個事件,知道結束或者出錯。每發出一個事件,就會呼叫它的Subscriber的onNext方法,最後呼叫Subscriber.onNext()或者Subscriber.onError()結束。

這裡跟觀察者模式有一點明顯不同,那就是如果一個 Observable 沒有任何的 Subscriber,那麼這個 Observable 是不會發出任何事件的。

基本用法

        Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                System.out.println("====Hello World!====");
                subscriber.onNext("Hello World!");
subscriber.onCompleted(); } });

上述程式碼中我們建立了一個 Observable 物件,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(上述程式碼中,觀察者 Subscriber 將會被呼叫一次 onNext() 和一次 onCompleted())。被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

        Subscriber<String> mySubscriber = new Subscriber<String>() {
            public void onCompleted() {

            }

            public void onError(Throwable throwable) {

            }

            public void onNext(String s) {
                System.out.println(s);
            }
        };

上文中提到,只有 Observable 物件而沒有 Subscriber 物件,那麼這個 Observable 是不會發出任何事件的。上述程式碼中,我們建立了一個 Subscriber 物件,並且重寫了 onNext()、onError()、onCompleted() 三個方法。

        myObservable.subscribe(mySubscriber);

通過 subscribe 方法將 myObservable 物件和 mySubscriber 物件關聯起來,這樣就完成了subscriber對observable的訂閱。

Observable 簡化用法

上述程式碼僅僅為了列印一個“hello world”難免讓人覺得囉嗦,RxJava 其實提供了很多便捷的函式來幫助我們減少程式碼。

  • 使用just( ),將為你建立一個 Observable 並自動為你呼叫 onNext( ) 發射資料:
        Observable.just("Hello World!").subscribe(mySubscriber);
  • 使用from( ),遍歷集合,傳送每個item:
        List<String> list = new ArrayList<String>();
        list.add("Hello");
        list.add("World!");
        Observable.from(list).subscribe(mySubscriber);
  • 使用defer( ),有觀察者訂閱時才建立Observable,並且為每個觀察者建立一個新的Observable:
        Observable<String> myObservable = Observable.defer(new Func0<Observable<String>>() {
            public Observable<String> call() {
                return Observable.just("Hello","World!");
            }
        });

Subscriber 簡化用法

大部分時候我們並不關心 OnComplete 和 OnError,我們只需要在 onNext 的時候做一些處理,這時候就可以使用Action1類。

        Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        };

        Observable.just("Hello","World!").subscribe(onNextAction);

RxJava 高階部分

map 操作符

map 操作符就是用於變換Observable物件的,把一個事件轉換為另一個事件,map操作符返回一個Observable物件,這樣就可以實現鏈式呼叫,在一個Observable物件上多次使用map操作符,最終將最簡潔的資料傳遞給Subscriber物件。

        Observable.just("1+2").map(new Func1<String, Integer>() {
            public Integer call(String str){
                String[] strs = str.split("\\+");
                return Integer.valueOf(strs[0])+Integer.valueOf(strs[1]);
            }
        }).subscribe(new Action1<Integer>() {
            public void call(Integer integer) {
                System.out.print(integer);
            }
        });

上述程式碼將一個代表數值表示式的字串轉換為最後的計算結果並輸出。

執行緒控制(Scheduler)

使用 RxJava,你可以使用 subscribeOn() 指定觀察者程式碼執行的執行緒,使用 observerOn() 指定訂閱者執行的執行緒

RxJava 已經內建了幾個 Scheduler ,它們已經適合大多數的使用場景:

  • Schedulers.immediate(): 直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler。

  • Schedulers.newThread(): 總是啟用新執行緒,並在新執行緒執行操作。

  • Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。

  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

有了以上這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對執行緒進行控制了。

  • subscribeOn(): 指定 subscribe() 所發生的執行緒,即 Observable.OnSubscribe 被啟用時所處的執行緒。或者叫做事件產生的執行緒。

  • observeOn(): 指定 Subscriber 所執行在的執行緒。或者叫做事件消費的執行緒。

    observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscribe);

總結

本文大致介紹了RxJava的一些基本用法,真正的RxJava絕不是這麼三言兩語能夠說清的,需要使用者在實際使用過程中去理解它。