1. 程式人生 > >從零學習RxJava2.0-簡單入門

從零學習RxJava2.0-簡單入門

前言

  • 函數語言程式設計:函數語言程式設計是種程式設計方式,它將電腦運算視為函式的計算。函式程式語言最重要的基礎是λ演算(lambda calculus),而且λ演算的函式可以接受函式當作輸入(引數)和輸出(返回值),和指令式程式設計相比,函數語言程式設計強調函式的計算比指令的執行重要。和過程化程式設計相比,函數語言程式設計裡函式的計算可隨時呼叫。

RXJava

  • 當我們的非同步網路請求用的越來越多的時候,rxjava是一種能依舊讓我們的邏輯保持清晰的操作,他的原理就是建立一個Observable物件來幹活,然後使用各種操作符建立起來的鏈式操作,就如同流水線一樣,把你的資料一步一步加工,最終達到想要的效果。
  • rxjava的非同步操作是通過擴充套件的觀察者模式來實現的,rxjava有四個角色,Observable,Observer,Subscriber和Subject,其中Observable和Observer通過Subscriber方法實現訂閱關係,Observable就可以在需要的時候通知Observer。
  • 其實RxJava是通過擴充套件的觀察者模式來實現的,不瞭解觀察者模式的童鞋可以移駕這裡
  • emmmmm,還是等會程式碼解釋

使用前新增依賴

implementation "io.reactivex.rxjava2:rxjava:2.x.y"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
  • 具體版本直接上git官方檢視就ok
  • 其中RxAndroid是RxJava在android平臺上的擴充套件,它包含了一些能夠簡化Android開發的工具,比如特殊的排程器

一.使用入門

  • 基本用法分三步

1.建立Observer(觀察者)

  • 他決定事件觸發時的行為,先看看怎麼建立
Observer<String> mObserver = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
public void onNext(String s) { Log.d(TAG, "onNext: 我是張三,他給我發的訊息是 message = " +s ); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } };
  • 先不說他的具體執行步驟,我們先往下看

2.建立被觀察者(Observable)

Observable<String> mObservable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            //執行一些其他操作
            //.............
            //執行完畢,觸發回撥,通知觀察者
            emitter.onNext("這是第一條訊息");
            emitter.onNext("我來發射資料");
            emitter.onComplete();
        }
    });
  • 再往下看,實現訂閱

3.訂閱(subscribe)

mObservable.subscribe(mObserver);
  • 訂閱的方式很簡單,就是用被觀察者呼叫subscribe方法來訂閱觀察者

4.運作方式

  • 在我們訂閱的時候,實際上就是呼叫的被觀察者在建立的時候我們實現的ObservableOnSubscribe介面的匿名類的subscriber方法
  • 不過此時並不是立即呼叫我們實現的subscribe方法裡面的邏輯,先呼叫的是觀察者的onSubscribe方法
  • 然後再去執行Subscribe方法裡面的邏輯
  • 而Subscribe方法的引數emitter實際上就是我們在訂閱的時候傳入的觀察者例項
  • 所以我們在被觀察者裡面呼叫的emitter.onNext(“這是第一條訊息”);實際上就是回撥的是觀察者的onNext方法
  • 當執行異常的時候,觀察者的onError方法就會被呼叫,同時事件佇列自動終止,不允許再有事件發出
  • complete方法是在事件完畢之後執行,不過在這種方式之下我們需要手動去呼叫,就像我上面寫的

5.其他的建立被觀察者的方式

(1).Just方式

mObservable = Observable.just("這是just方式建立的被觀察者傳送的事件");
  • 這裡的just引數將直接被作為Onserver的onNext方法的引數傳入
  • 所以在mObservable.subscribe(mObserver);之後,log打印出來的資訊就是
onSubscribe: 
onNext: 我是張三,他給我發的訊息是 message  = 這是just方式建立的被觀察者傳送的事件
onComplete: 
  • 這裡不清楚為什麼這裡的complete方法得到了執行,百思不得其解
  • 這裡的just方式是過載了十個方法,分別對應一個引數,兩個引數,…一直到十個引數

(2).formLiterable方式

List<String> list = new ArrayList<>();
for(int i = 0 ; i < 10 ;i++){
     list.add("我是formIterable方式建立的第  " + i+ "   條訊息");
}
mObservable = Observable.fromIterable((Iterable<String>)list);
  • 訂閱之後,得到的log為
onSubscribe: 
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  0   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  1   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  2   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  3   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  4   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  5   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  6   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  7   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  8   條訊息
onNext: 我是張三,他給我發的訊息是 message  = 我是formIterable方式建立的第  9   條訊息
onComplete: 
  • 所以說,這裡的list集合其實就相當於我們上面在just裡面傳入的多條引數
  • 而且, Collection介面是Iterable介面的子介面,所以所有Collection介面的實現類都可以作為Iterable物件直接傳入fromIterable()方法。

(3).defer方式

mObservable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
    @Override
    public ObservableSource<? extends String> call() throws Exception {
        return Observable.just("我是defer方式建立的被觀察者","我也是defer方式建立的被觀察者");
    }
});
  • 打印出的log資訊為
onSubscribe: 
onNext: 我是張三,他給我發的訊息是 message  = 我是defer方式建立的被觀察者
onNext: 我是張三,他給我發的訊息是 message  = 我也是defer方式建立的被觀察者
onComplete: 

(4).interval方式

Observable<Long> o = Observable.interval(4, TimeUnit.SECONDS);
o.subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "interval方式建立 onSubscribe: ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "interval方式建立 onNext: 他給我發的數字是 " + aLong);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "interval方式建立 onError: ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "interval方式建立 onComplete: ");
    }
});
  • 這種方式的作用時就像定時器一樣,按照我們在interval裡面設定的時長間隔傳送從0開始的整數序列
  • 因為他所需要的泛型是長整型,所以這裡我重新建立了一個被觀察者,並且在訂閱的時候重新建立了一個觀察者,由於我在interval裡面寫的是4,TimeUnit.SECONDS,所以他將每隔四秒傳送一個整型值,所以打出的log為
interval方式建立 onSubscribe: 
interval方式建立 onNext: 他給我發的數字是 0
interval方式建立 onNext: 他給我發的數字是 1
interval方式建立 onNext: 他給我發的數字是 2
interval方式建立 onNext: 他給我發的數字是 3
interval方式建立 onNext: 他給我發的數字是 4
interval方式建立 onNext: 他給我發的數字是 5
interval方式建立 onNext: 他給我發的數字是 6
  • 這個方法正常情況是不會主動執行complete方法的,因為他會一直髮送下去

(5).range方式

Observable<Integer> observable = Observable.range(1,5);
observable.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: 這次他傳送的數字是 " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: ");
    }
});
  • 還是因為他只接受整型或長整型泛型的Observer,所以我重新建立了觀察者
  • 其中range的第一個引數表示開始傳送的數字,第二個引數表示傳送的個數,看一下log的資訊
onSubscribe: 
onNext: 這次他傳送的數字是 1
onNext: 這次他傳送的數字是 2
onNext: 這次他傳送的數字是 3
onNext: 這次他傳送的數字是 4
onNext: 這次他傳送的數字是 5
onComplete: 
  • 其中,第二個引數不可以為負值,當為0時就不列印

(7).Timer方式

Observable<Long> observable1 = Observable.timer(2, TimeUnit.SECONDS);
observable1.subscribe(mLongObserver);
  • 啊,這裡我沒再去建立一個Long泛型的Observer,而是在外面建立了一個私有變數,啊,這些不管
  • 這種方式的timer的引數為延遲第一個引數為第二個引數為單位的時間,傳送一個東西(額,我也不知道發什麼東西,反正就是延遲這麼長時間,呼叫觀察者的onNext方法)
  • 看一下他的log資訊
onSubscribe: 
onNext: 0
onComplete: 

(8).repeat方式

  • 這種方式就是將以上幾種方式一直重複,比如說我重複TImer
Observable<Long> observable2 = Observable.timer(3, TimeUnit.SECONDS).repeat();
observable2.subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onNext(Long integer) {
        Log.d(TAG, "onNext: "  + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: ");
    }
});
  • 打印出的log為
onSubscribe: 
onNext: 0
onNext: 0
onNext: 0
onNext: 0
onNext: 0
onNext: 0
  • 那麼他就會一直重複Timer這個動作,至於其他的也一樣,只需要給後面加一個repeat就可以

6.簡單部分補充

  • 我們在使用訂閱方法的時候,常常看到有好幾個過載的subscriber方法
public final Disposable subscribe() {}
表示觀察者不對被觀察者傳送的事件作出任何響應(但被觀察者還是可以繼續傳送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
表示觀察者只對被觀察者傳送的Next事件作出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
表示觀察者只對被觀察者傳送的Next事件 & Error事件作出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
表示觀察者只對被觀察者傳送的Next事件、Error事件 & Complete事件作出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
表示觀察者只對被觀察者傳送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出響應
public final void subscribe(Observer<? super T> observer) {}
表示觀察者對被觀察者傳送的任何事件都作出響應
  • 我們可以看到,最後一種是我們基本使用的那種
  • 第一種表示當呼叫之後,只調用被觀察者實現的subscribe方法中除過事件之外的其他語句
  • 比方說
mObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //執行一些其他操作
                //.............
                //執行完畢,觸發回撥,通知觀察者
                Log.d(TAG, "subscribe: 我要傳送第一條訊息啦");
                emitter.onNext("這是第一條訊息");
                Log.d(TAG, "subscribe: 我要傳送第二條訊息啦");
                emitter.onNext("我來發射資料");
            }
        });
  • 如果我們的被觀察者這樣寫,那麼當mObservable.subscriber()之後,只會執行兩個log語句,而不會指向其他的兩個語句
  • 第二個到第四個方法,代表我們只實現簡單的觀察者,這裡我寫全的最後一個方法的呼叫方式為
Observable<String> observable = Observable.just("我是訊息");
observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "accept: " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "accept: Error");
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "run: complete");
    }
}, new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "accept: Subscriber");
    }
});
  • 其中,subscriber方法第一第二第四個引數分別實現Consumer介面,第三個引數實現Action介面
  • 當我按照上面所寫的話,打印出來的log為
accept: Subscriber
accept: 我是訊息
run: complete
  • 可以看出,第一個引數表示我們觀察者的onNext方法呼叫,第二個相當於onError方法呼叫,第三個引數代表complete方法呼叫,第四個引數相當於subscriber方法呼叫
  • 由四個不同的過載方法比較得出,我們可以只實現第一個引數的方法,那麼他相當於只執行觀察者的onNext方法,以此類推

7.中斷觀察者與被觀察者的連線

  • 通過這種方式我們可以中斷觀察者與被觀察者的連線,也就是說被觀察者你愛發不發,我都可以通過這種方式來不去接收你的訊息
Observable<String> observable = Observable.just("我是訊息1","我是訊息2","我是訊息3","我是訊息4");
        observable.subscribe(new Observer<String>() {
            //這是攔截器
            private Disposable mDisposable;
            private int cnt = 0;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: 攔截器賦值");
                mDisposable = d;
            }

            @Override
            public void onNext(String s) {
                cnt++;
                Log.d(TAG, "onNext: 我收到了第 "+ cnt +" 條訊息 == "+s);
                if(cnt  >  2){
                    Log.d(TAG, "onNext: 好了,我已經中斷連線了");
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
  • 打印出來的log資訊為
onSubscribe: 攔截器賦值
onNext: 我收到了第 1 條訊息 == 我是訊息1
onNext: 我收到了第 2 條訊息 == 我是訊息2
onNext: 我收到了第 3 條訊息 == 我是訊息3
onNext: 好了,我已經中斷連線了
  • 在中斷之後就不會執行complete方法了

8.簡單小結

  • 可以看到,RxJava的整個運作流程大概是這樣的
  • 被觀察者 (Observable) 通過 訂閱(Subscribe) 按順序傳送事件 給觀察者 (Observer), 觀察者(Observer) 按順序接收事件 & 作出對應的響應動作
  • 同時我們也可以分開分別實現帶四個引數的subscriber方法,將我們的觀察者的四個方法分開
  • 同時,在合適的地方採取合適的中斷連線等等