1. 程式人生 > >RxJava2.0學習筆記(簡介,執行緒控制,常見操作符)

RxJava2.0學習筆記(簡介,執行緒控制,常見操作符)

文章轉載自:大神的簡書

要在android中使用RxJava2,先新增Gradle配置:

    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

RxJava簡介:

先假設有兩根水管:

觀察者模式

產生事件的水管稱作上游,即RxJava中的Observable,接受事件的水管稱作下游,即RxJava中的Observer,之間的連線關係就是subscribe()。這個關係用RxJava來表示就是:

//建立一個上游 Observable:
Observable<Integer> observable = Observable.create(new
ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }); //建立一個下游 Observer Observer<Integer> observer = new
Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "error"
); } @Override public void onComplete() { Log.d(TAG, "complete"); } }; //建立連線 observable.subscribe(observer);

注意:只有當上下游建立連線,也就是subscribe()方法執行的時候才開始傳送事件。
此外,把這段程式碼連起來就是RxJava中的鏈式操作了。即省去了不必要的物件建立。

ObservableEmitter顧名思義為發射器,用來發出三種類型的事件,通過呼叫其物件的onNext(),onError(),onComplete()三個方法來發出Next,Error,Complete三個事件,需要注意以下規則:

  • 上游可以傳送無限個onNext,下游也可以無限接受onNext
  • 上游傳送了一個onError和OnComplete事件之後,可以繼續傳送剩下的事件,而下游接收到這兩個事件之一,就會停止接收事件。
  • 上游可以不傳送onError和OnComplete事件
  • 注意:onError和OnComplete事件的傳送必須互斥並且唯一,不能傳送多個,也不能同時傳送兩種事件

Disposable字面意思是一次性用品,可以看作是管道之間的開關,當呼叫其dispose()方法後,會從管道中間切開,導致下游無法接收到事件。注意:上游不會因為呼叫該方法停止傳送事件。

來看個例子, 我們讓上游依次傳送1,2,3,complete,4,在下游收到第二個事件之後, 切斷水管, 看看執行結果:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
                i++;
                if (i == 2) {
                    Log.d(TAG, "dispose");
                    mDisposable.dispose();
                    Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "complete");
            }
        });

執行結果為:

12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4

可以看到傳送第二個事件以後水管被切斷了,但是上游並沒有因為傳送了OnComplete事件而停止傳送,並且下游的onSubscribe()方法是最先呼叫的。

subscribe()方法還有多個過載方法:

    public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}
  • 不帶任何引數的表示無論上游傳送什麼下游不管,隨意發。
  • 帶有一個Consumer型別的引數的表示下游只關心上游傳送的onNext事件,其他事件不管。

也就可以這樣寫:

        subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "onNext: " + integer);
            }
        });

RxJava執行緒控制

通常情況下上下游是在同一執行緒內工作的,比如在主執行緒中建立一個Observable傳送事件,則上游在主執行緒中工作,同樣在主執行緒中建立的Observer也在主執行緒中接收事件。看下面的例子:

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1);                                                                  
        }                                                                                       
    });                                                                                         

    Consumer<Integer> consumer = new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: " + integer);                                                   
        }                                                                                       
    };                                                                                          

    observable.subscribe(consumer);                                                             
}

結果為:

D/TAG: Observable thread is : main
D/TAG: emit 1                     
D/TAG: Observer thread is :main   
D/TAG: onNext: 1

一般我們是在子執行緒中執行耗時操作,在主執行緒中更新UI,如下圖:

這裡寫圖片描述

黃色管子代表子執行緒,藍色代表主執行緒,要想做到兩個事件在不同的執行緒中執行,需要用到RxJava內建的執行緒排程器。

在剛才的例子中,將最後的subscribe()方法修改一下:

observable.subscribeOn(Schedulers.newThread())                                              
            .observeOn(AndroidSchedulers.mainThread())                                          
            .subscribe(consumer);

執行結果:

D/TAG: Observable thread is : RxNewThreadScheduler-2  
D/TAG: emit 1                                         
D/TAG: Observer thread is :main                       
D/TAG: onNext: 1

可以看到執行緒改變了,其中RxNewThread是RxJava內建的一種執行緒,subscribeOn()指定了上游所在的執行緒,observeOn()指定了下游所在的執行緒。

注意:對上游多次呼叫subscribeOn()指定執行緒,只有第一次指定起作用,而對於下游多次呼叫observeOn()指定執行緒,每次都會改變下游所在的執行緒。

在RxJava中,內建了很多執行緒供我們選擇,並且這些執行緒由執行緒池來維護的,所以效率比較高:

  • Schedulers.io()代表IO操作的執行緒,通常用於網路、讀寫檔案等IO密集的操作
  • Schedulers.computation()代表CPU計算密集的執行緒,例如有大量計算得操作
  • Schedulers.newThread代表常規的執行緒
  • AndroidSchedulers.mainThread代表Android主執行緒

操作符

map

map的作用是對於上游傳送的每個事件都應用於一個函式,使得每個事件都按照相應的函式去變化。如圖:

這裡寫圖片描述

圖中的Map函式將上游的圓形變換為了下游的正方形,程式碼表示為:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

執行結果為:

 D/TAG: This is result 1 
 D/TAG: This is result 2 
 D/TAG: This is result 3

上游的整型數字經過map操作後在下游變成了String型別。
可以看出發來的事件都可以轉換為想要的型別,Object,集合等等。

FlatMap

FlatMap作用是將上游傳送事件的一個Observable變換為多個傳送事件的Observables,再講它們傳送的事件合併到一個Observable中傳送。如圖:
這裡寫圖片描述

分解動作:
這裡寫圖片描述

圖中的FlatMap將圓形轉換為三角形和正方形,並且對於每個顏色的事件都建立了一個管道,在管道中進行變換,再將轉換過的合併傳送到下游。

注意:FlatMap並不能保證事件傳送的順序,如果想要保證順序需要使用concatMap。

程式碼demo如下:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

程式碼中將上游傳過來的3個數字每個迴圈三遍新增到新建立的String管道,並且加了10ms延遲,所以輸出結果可以看出不是順序的。

將上面程式碼中的flatMap改為concatMap可以發現,結果是順序的了,接收到的事件和傳送的事件順序一致。

zip

zip函式的作用是將多個Observable傳送的事件組合到一起,然後將組合的事件作為整體傳送到下游,它嚴格按照發送的順序組合,並且傳送的事件個數只與多個Observable中資料較少的那個相同。如圖:
這裡寫圖片描述

這裡寫圖片描述

通過分解動作可以看出:

  1. 組合嚴格按照各個Observable傳送的事件順序,對應事件組合,不會出現交叉組合。
  2. 最終傳送的組合事件個數只與Observables中資料最少的相同,因為沒有事件能夠取出來,就無法組合了。

demo如下:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {            
    @Override                                                                                         
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                      
        Log.d(TAG, "emit 1");                                                                         
        emitter.onNext(1);                                                                            
        Log.d(TAG, "emit 2");                                                                         
        emitter.onNext(2);                                                                            
        Log.d(TAG, "emit 3");                                                                         
        emitter.onNext(3);                                                                            
        Log.d(TAG, "emit 4");                                                                         
        emitter.onNext(4);                                                                            
        Log.d(TAG, "emit complete1");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                   

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {              
    @Override                                                                                         
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                       
        Log.d(TAG, "emit A");                                                                         
        emitter.onNext("A");                                                                          
        Log.d(TAG, "emit B");                                                                         
        emitter.onNext("B");                                                                          
        Log.d(TAG, "emit C");                                                                         
        emitter.onNext("C");                                                                          
        Log.d(TAG, "emit complete2");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                     

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                  
    @Override                                                                                         
    public String apply(Integer integer, String s) throws Exception {                                 
        return integer + s;                                                                           
    }                                                                                                 
}).subscribe(new Observer<String>() {                       
    @Override                                                                                         
    public void onSubscribe(Disposable d) {                                                           
        Log.d(TAG, "onSubscribe");                                                                    
    }                                                                                                 

    @Override                                                                                         
    public void onNext(String value) {                                                                
        Log.d(TAG, "onNext: " + value);                                                               
    }                                                                                                 

    @Override                                                                                         
    public void onError(Throwable e) {                                                                
        Log.d(TAG, "onError");                                                                        
    }                                                                                                 

    @Override                                                                                         
    public void onComplete() {                                                                        
        Log.d(TAG, "onComplete");                                                                     
    }                                                                                                 
});

執行結果:

D/TAG: onSubscribe     
D/TAG: emit 1          
D/TAG: emit 2          
D/TAG: emit 3          
D/TAG: emit 4          
D/TAG: emit complete1  
D/TAG: emit A          
D/TAG: onNext: 1A      
D/TAG: emit B          
D/TAG: onNext: 2B      
D/TAG: emit C          
D/TAG: onNext: 3C      
D/TAG: emit complete2  
D/TAG: onComplete

可以看出組合傳送了,但是第一個Observable傳送完以後才開始組合,這是因為兩個Observable在同一個執行緒中,執行緒的執行肯定有先後順序,採用上面的subscribeOn()方法將兩個Observable放到不同的執行緒中執行,這樣就可以實現一一結合了,結果如下:

D/TAG: onSubscribe    
D/TAG: emit A         
D/TAG: emit 1         
D/TAG: onNext: 1A     
D/TAG: emit B         
D/TAG: emit 2         
D/TAG: onNext: 2B     
D/TAG: emit C         
D/TAG: emit 3         
D/TAG: onNext: 3C     
D/TAG: emit complete2 
D/TAG: onComplete

可以發現Observable1中還有兩個事件沒有傳送,因為已經沒有組合物件了,傳送也沒有意義,前面是因為在一個執行緒中,所以會一次性發送完,在不同執行緒中,即使不傳送complete,那兩個事件也不會發送。

zip實踐

一個介面需要展示使用者的一些資訊,需要從兩個伺服器中抓取資料,要用到zip操作符來組合抓取的資訊。

首先分別定義兩個請求介面:

public interface Api {
    @GET
    Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

    @GET
    Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}

然後用zip來打包請求:

Observable<UserBaseInfoResponse> observable1 =                                            
        api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());      

Observable<UserExtraInfoResponse> observable2 =                                           
        api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());    

Observable.zip(observable1, observable2,                                                  
        new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {         
            @Override                                                                     
            public UserInfo apply(UserBaseInfoResponse baseInfo,                          
                                  UserExtraInfoResponse extraInfo) throws Exception {     
                return new UserInfo(baseInfo, extraInfo);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) throws Exception {                      
                //do something;                                                           
            }                                                                             
        });