1. 程式人生 > >RxJava系列第一彈——RxJava入門篇

RxJava系列第一彈——RxJava入門篇

概述:RxJava系列文章第一篇 RxJava入門篇 介紹RxJava是什麼和基本使用。

一 RxJava是什麼

1 來自官網的介紹

a library for composing asynchronous and event-based programs using observable sequences for the Java VM 這是來自RxJava在GitHub的自我介紹 翻譯過來就是 一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫。

2 來自個人的剖析

我想大多人第一次見識這段話的時候都是懵逼的,當然包括我自己。所以不管你都是看英文,還是看中文。這段話太難理解了。為了更好的理解這段話,我們對這段話進行一次剖析拿到這幾個詞 “可觀測” “非同步”“事件”“庫”。

3 關鍵詞:“可觀測” “非同步” “事件” “庫”

① “可觀測”,當你看到這個詞的時候,你的腦海裡想到了什麼? 是的,沒錯。就是觀察者模式。現在我們來聊聊觀察者模式 舉一個生活中給的例子 你媽媽和兒子的栗子 ,你看著你的媽媽 一直不說話 突然媽媽手裡有了麵包,兒子就說“我要吃”。這裡 兒子就是觀察者 ,媽媽就是被觀察者,麵包就是 被觀察者 發生的改變。“我要吃”就是觀察者作出的響應。如下圖:

通過“觀察”這個工作建立關係
通過觀察建立關係

被觀察者發生改變 觀察者作出響應
被觀察者發生改變 觀察者作出響應

RxJava就是一種拓展的觀察者模式 。讓我們先來記住幾個名詞:“Observer”(帶er就是什麼人,那這裡就是觀察的人,即觀察者),“Observable”(帶able 就是可以什麼什麼的,那這裡就是可以被觀察的人,即被觀察者)“subscribe”(訂閱,即觀察這個動作)如下圖:
這裡寫圖片描述

② “非同步”

在安卓開發中,我們總會聽到這樣一句話,在主執行緒中不能執行耗時操作,在子執行緒中不能操作UI。這樣就引出了一個“非同步這個關鍵詞”,通常我們的做法是使用Asynctask或者handler來執行非同步的耗時操作通過介面回撥在主執行緒中拿到資料修改UI。然後你發現大片的程式碼,大片的巢狀,還有回撥 。讓我們來感謝RxJava的誕生,有了RxJava ,媽媽再也不用擔心我的非同步操作了。在RxJava中有“執行緒排程”這樣的說法。可以自由的切換方法執行的執行緒。讓我們來看看這個神奇的東東: observeOn(未知執行緒) ,subscribeOn(未知執行緒)。這就是在RxJava中使用的執行緒排程器。我們需要在這兩個的方法中 指定排程到哪個執行緒。讓我們記住這兩個名詞吧,在本篇的例子中會更詳細的解釋非同步和執行緒排程。

③ “事件”

“事件”這個詞,其實也挺抽象的,好吧,讓我們回到媽媽和兒子的栗子。在這個模型中,媽媽拿出一個麵包,這個行為 就是 被觀察者創造出的一個事件,我們先稱為“麵包事件”而麵包就是事件的引數。 兒子想吃麵包那麼就需要通過媽媽通過事件傳遞的方法將“麵包事件”傳遞給兒子。那麼在RxJava中有哪些事件的傳遞的方法呢。這裡就不介紹了,在後面的栗子中,我們就會詳細的介紹。

這裡寫圖片描述

④ “庫”

看到這裡我只想說一句“酷”,這就意味著我們使用RxJava是非常簡單的。作為Android開發者,我們只需要在gradle中新增依賴就可以了。

4 是時候來一句總結了

RxJava是什麼呢?說了這麼久,相信大家對RxJava有了一定的瞭解。那麼是時候來一句總結了:RxJava是一個 通過拓展的觀察者模式 使用執行緒排程器來解決非同步操作 進行事件傳遞 的 庫。(總結的不好,你來打我啊,哈哈)

二 RxJava怎麼用

RxJava到現在已經有 RxJava1.x 和 RxJava2.0 兩個版本了。我們分別來看看這個版本是怎麼使用的。

1 RxJava1.x的用法:

① 引用庫

compile 'io.reactivex:rxjava:1.0.10'

② 建立 觀察者 Observer

       Observer<String> observer=new Observer<String>() {
           @Override
           public void onNext(String s) {
               Log.i("onNext","傳遞麵包事件");
           }
           @Override
           public void onCompleted() {
               Log.i("onCompleted","傳遞麵包事件完成");
           }

           @Override
           public void onError(Throwable e) {
               Log.i("onError","傳遞麵包事件出錯了");
           }
       };

在建立觀察者的方法中,我們可以看到 有三個傳遞事件的方法:onNext (向下傳遞事件時呼叫) onCompleted(事件傳遞完成時呼叫) onError(事件傳遞出錯時呼叫)。

③ 建立 被觀察者 Observable

        Observable observable = Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("麵包事件");
            }
        });

這裡可以看到 通過create 方法傳入了一個 onSubscribe物件 作為引數。當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber 將會被呼叫onNext()方法,將“麵包事件”進行傳遞。由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞

④ 建立觀察關係

 observable.subscribe(observer);

被觀察者 呼叫 subscribe 方法 傳入 觀察者 observer 建立訂閱關係。

⑤ 訂閱結果

a 只調用 onNext()方法時

  subscriber.onNext("麵包事件");
com.rxjava1xdemo I/onNext: 麵包事件

可以清楚的看到 在 觀察者的onNext()方法中 成功的打印出來 “麵包事件” 這樣就完成的了從 被觀察者 到 觀察者的事件傳遞。

b 呼叫 兩次 onNext()方法時

subscriber.onNext("麵包事件");
subscriber.onNext("麵包事件2");
03-17 08:57:21.691 2513-2513/com.rxjava1xdemo I/onNext: 麵包事件
03-17 08:57:21.691 2513-2513/com.rxjava1xdemo I/onNext: 麵包事件2

可以清楚看到 觀察者的呼叫了兩次onNext()方法。可以說明 被觀察者可以發射很多次事件。
c 呼叫 onNext() 和 onCompleted()時

subscriber.onNext("麵包事件");
subscriber.onCompleted();
subscriber.onNext("麵包事件2");
03-17 09:01:37.708 2513-2513/com.rxjava1xdemo I/onNext: 麵包事件
03-17 09:01:37.708 2513-2513/com.rxjava1xdemo I/onCompleted: 傳遞麵包事件完成

可以看到 當呼叫 onCompleted()方法時 在其之後再呼叫onNext()方法 將不會在進行事件傳遞。

d 當呼叫onError()方法時

subscriber.onNext("麵包事件");
subscriber.onError(new NullPointerException());
subscriber.onNext("麵包事件2");
03-17 09:04:45.273 2513-2513/com.rxjava1xdemo I/onNext: 麵包事件
03-17 09:04:45.273 2513-2513/com.rxjava1xdemo I/onError: 傳遞麵包事件出錯了

可以看到 當呼叫 onError()方法時 其後在呼叫onNext()方法 不在進行事件傳遞。

e 當 onCompleted()方法和onError() 都呼叫時。

onError方法先呼叫時。

subscriber.onNext("麵包事件");
subscriber.onError(new NullPointerException());
subscriber.onCompleted();
subscriber.onNext("麵包事件2");
03-17 09:07:43.065 2513-2513/com.rxjava1xdemo I/onNext: 麵包事件
03-17 09:07:43.065 2513-2513/com.rxjava1xdemo I/onError: 傳遞麵包事件出錯了

可以看到只調用了 onError()方法。

onCompleted方法先呼叫時

subscriber.onNext("麵包事件");
subscriber.onCompleted();
subscriber.onError(new NullPointerException());
subscriber.onNext("麵包事件2");
03-17 09:09:01.728 2513-2513/com.rxjava1xdemo I/onNext: 麵包事件
03-17 09:09:01.728 2513-2513/com.rxjava1xdemo I/onCompleted: 傳遞麵包事件完成

可以看到只調用了 onCompleted()方法

總結 1 onNext()方法可以多次呼叫,但是隻能在onError()方法或者onCompleted()方法之前呼叫。2 onError()方法和onCompleted()方法同時只能出現一個。當同時出現出現時,只會呼叫 第一個時間出現的。

⑥ RxJava1.X 觀察者Observer的其他寫法

除了 Observer 介面之外,RxJava 還內建了一個實現了 Observer 的抽象類:Subscriber。

      Subscriber<String> subscriber=new Subscriber<String>() {
            @Override
            public void onStart() {
                super.onStart();
            }

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        };
        return subscriber;

可以看到 Subscriber 是一個抽象類 它實現了 Observer介面 並 增加了一個 onStart(); 對Observer進行了一點拓展。但是他們的使用方法還是一樣的。同樣可以通過 subscribe()方法 實現訂閱關係。

observable.subscribe(subscriber);

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支援不完整定義的回撥,RxJava 會自動根據定義創建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

在這裡 action1 和 action0 都是介面 因為 onNext()和onError()方法都是有參的 所以實現了 action1 介面 而 onCompleted()方法是無參的 所以實現了 action0介面 。

⑦ RxJava1.X 被觀察者 Observable 的其他建立方式 。

create() 方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷建立事件佇列
⑴ 使用 just 方式 將傳入的引數 發射出來

Observable<String> observable=Observable.just("麵包事件1","麵包事件2","麵包事件3");
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 麵包事件1
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 麵包事件2
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 麵包事件3
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onCompleted: 傳遞麵包事件完成

⑵ 使用 from 方式 將傳入的陣列或 Iterable 拆分成具體物件後,依次傳送出來。

String[] actions = {"麵包事件1", "麵包事件2", "麵包事件3"};
Observable observable = Observable.from(actions);
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 麵包事件1
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 麵包事件2
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 麵包事件3
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onCompleted: 傳遞麵包事件完成

2 RxJava2.0的用法

① 引用庫

compile 'io.reactivex.rxjava2:rxjava:2.0.7'

② 建立 觀察者 Observer

  Observer observer=new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.i("onNext",d.isDisposed()+"");
            }

            @Override
            public void onNext(String s) {
                Log.i("onNext",s);
            }


            @Override
            public void onError(Throwable e) {
                Log.i("onError",e.toString());
            }

            @Override
            public void onComplete() {
                Log.i("onComplete","onComplete");
            }
        };    

在建立觀察者的方法中,我們可以看到 有四個傳遞事件的方法:onSubscribe (是否解除訂閱關係,相當於發射事件的開關) onNext (向下傳遞事件時呼叫) onCompleted(事件傳遞完成時呼叫) onError(事件傳遞出錯時呼叫)。

③ 建立 被觀察者 Observable

        Observable observable=Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("麵包事件1");
                e.onNext("麵包事件2");
                e.onNext("麵包事件3");
                e.onComplete();
            }
        });

這裡可以看到 通過create 方法傳入了一個 ObservableOnSubscribe物件 作為引數。它有一個subscribe的方法 在這裡 ObservableEmitter 事件發射器的意思。通過這個發射器 發射事件。也就是呼叫 onNext onCompleted onError方法。

④ 建立觀察關係

 observable.subscribe(observer);

被觀察者 呼叫 subscribe 方法 傳入 觀察者 observer 建立訂閱關係。

⑤ 訂閱結果

03-17 11:02:01.561 25410-25410/com.rxjava20demo I/onNext: false
03-17 11:02:01.561 25410-25410/com.rxjava20demo I/onNext: 麵包事件1
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onNext: 麵包事件2
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onNext: 麵包事件3
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onComplete: onComplete

⑥ 實現訂閱關係的其他寫法

我們在用subscribe(subscriber)方式實現訂閱關係時 可以看到 subscribe還有其他的過載方法:

我們可以看到幾個熟悉的詞(標記的詞) onNext onError onCompleted 。
也就是在subscribe方法裡 可以傳入不同的引數。從而呼叫不同的方法。

        observable.subscribe(new Consumer() {
              @Override
              public void accept(@NonNull Object o) throws Exception {

              }
          }, new Consumer<Throwable>() {
              @Override
              public void accept(@NonNull Throwable throwable) throws Exception {

              }
          }, new Action() {
              @Override
              public void run() throws Exception {

              }
          });

這裡的過載方法 就相當於 subscribe(onNext ,onError,onCompleted ) 。

三 RxJava1.X 與RxJava2.0的區別

1 在使用create方式建立Observable時 傳入的引數不同。

RxJava1.X:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() 

RxJava2.0:

Observable observable=Observable.create(new ObservableOnSubscribe()

2 subscribe方法 過載的引數不同

RxJava1.X

RxJava2.0

可以明顯看到 RxJava2.0中 不在支援 Subscriber的實現類引數。
不再支援 Action1 的實現類引數。

四 總結

在RxJava系列的第一篇中 我們引入了 RxJava的概念。剖析了一些關鍵字,通過這些關鍵字,對RxJava進行了一層基本的認識。然後就是RxJava1.X的api用法。和RxJava2.0Api的用法。最後進行了 兩個版本的比較。相信大家已經基本會使用RxJava了。再後續第二篇 我使用RxJava來實現一些經典場景。敬請期待吧。