上車RxJava2(二)
筆者在上一篇部落格介紹了 RxJava 的基本使用和它的執行緒控制,這篇部落格介紹RxJava一個很核心、很牛逼的功能-操作符。
RxJava的操作符有很多,主要分為以下幾大類:建立操作符、變換操作符、過濾操作符、組合操作符、錯誤處理符、輔助操作符、條件和布林操作符等等。每一種型別操作符下又有很多個具體的操作符,筆者在這篇部落格介紹其中幾個操作符的使用,如果還想學習其他操作符的使用,文章末尾有傳送門哦
上一篇部落格的實戰部分,用到了幾個操作符,比如create、just、from,它們都是建立操作符,用於建立Observable的操作符。它比較簡單,在本篇部落格不再介紹。
變換操作符
RxJava一個很牛逼的地方是它提供了對事件序列進行變換的支援,所謂變換,就是將事件序列中的物件或整個序列進行加工處理,轉換成不同的事件或事件序列。
變化操作符有:FlatMap、Map、Buffer、Scan、GroupBy、Window
變換操作符主要介紹一下兩個:
- Map — 對映,通過對序列的每一項都應用一個函式變換Observable發射的資料,實質是對序列中的每一項執行一個函式,函式的引數就是這個資料項
- FlatMap — 扁平對映,將Observable發射的資料變換為Observables集合,然後將這些Observable發射的資料平坦化的放進一個單獨的Observable,可以認為是一個將巢狀的資料結構展開的過程。
概念過於抽象,先看個圖吧

map操作符.png
結合這張圖,map可以簡單的理解為通過對映關係把一個物件轉換成另外一個物件。比如註冊的時候,把使用者這個物件(包含使用者名稱和密碼)。作為引數通過網路請求進行註冊,對返回的json資料解析完後又是一個結果物件,通過map可以將請求物件轉換成結果物件。是不是很神奇且牛批。
下面用一個註冊使用者的栗子先介紹Map的使用
(1)請求介面
public interface HttpRequest { @GET("zhuce/rxjava.php/") Call<HttpResult> getRegisteResult(@Query("name") String name, @Query("pass") String pass); }
(2)請求物件類
public class UserInfo { private String name; private String pass; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPass() { return pass; } public void setPass(String pass) { this.pass = pass; } }
(3)結果物件類
public class HttpResult { private String result; public String getResult() { return result; } public void setResult(String result) { this.result = result; } }
(4)map操作符的使用
public void mapOperation(View view) { Retrofit retrofit = new Retrofit.Builder() .baseUrl(baseUrl) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); final HttpRequest httpRequest = retrofit.create(HttpRequest.class); Observable.just(getUserInfo()) .map(new Function<UserInfo, HttpResult>() { @Override public HttpResult apply(UserInfo userInfo) throws Exception { HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName() ,userInfo.getPass()).execute().body(); returnhttpResult; } }).subscribeOn(Schedulers.io()) .subscribe(new Consumer<HttpResult>() { @Override public void accept(HttpResult httpResult) throws Exception { //Log.d(TAG, "accept: " + httpResult.getResult()); mText.setText(httpResult.getResult()); } }); }
先使用just這個建立符建立一個Observable,並獲取使用者的資訊。之後使用map轉換操作符
map(new Function<UserInfo, HttpResult>(){...});
map裡需要一個Function物件,它有兩個泛型引數,前者是轉換前的物件、後者是轉換後的物件
這樣在回撥的apply方法裡就可以實現網路請求操作
HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName() ,userInfo.getPass()).execute().body();
不要忘記切換到io執行緒中操作
.subscribeOn(Schedulers.io())
最後就是訂閱了,需要建立一個觀察者消費該事件,進行ui的更新
.subscribe(new Consumer<HttpResult>() { @Override public void accept(HttpResult httpResult) throws Exception { mText.setText(httpResult.getResult()); } }); }
FlatMap變換操作符
FlatMap 和 map 有一個相同點:它也是把傳入的引數轉化之後返回另一個物件。
不同的是, flatMap返回的是個 Observable 物件,並且這個 Observable 物件 並不是 被直接傳送到了 Subscriber 的回撥方法中

FlatMap.png
flatMap過程原理如下
- 使用傳入的事件物件建立一個 Observable 物件
- 並不傳送這個 Observable, 而是將它啟用,於是它開始傳送事件
- 每一個創建出來的 Observable 傳送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回撥方法。
這三個步驟,把事件拆成了兩級,通過一組新建立的 Observable 將初始的物件鋪平(flat)之後通過統一路徑分發了下去。
改寫之前註冊的例子
Observable.just(getUserInfo()).flatMap(new Function<UserInfo, ObservableSource<HttpResult>>() { @Override public ObservableSource<HttpResult> apply(UserInfo userInfo) throws Exception { HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName(), userInfo.getPass()).execute().body(); returnObservable.just(httpResult); } }).subscribeOn(Schedulers.io()) .subscribe(new Consumer<HttpResult>() { @Override public void accept(HttpResult httpResult) throws Exception { mText.setText(httpResult.getResult()); } });
可以看到基本沒有什麼改動,最明顯的不同就是在返回的結果物件中
flatMap(new Function<UserInfo, ObservableSource<HttpResult>>()
不直接是 HTTPResult 作為引數 而是一個ObservableSource。這樣看並沒有比map有優勢啊,
但是如果有這樣一個需求,註冊完之後自動登入。用map就不能直接將這兩個事件匯在一起最後接收處理。
這個時候就可以用在使用一個flatMap,它對註冊的結果物件作為引數進行登入請求,再轉換成登入結果物件。
這樣兩個事件被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回撥方法。
Observable.just(getUserInfo()) .flatMap(new Function<UserInfo, ObservableSource<HttpResult>>() { @Override public ObservableSource<HttpResult> apply(UserInfo userInfo) throws Exception { HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName(), userInfo.getPass()).execute().body(); returnObservable.just(httpResult); } }) .flatMap(new Function<HttpResult, ObservableSource<LoginResult>>() { @Override public ObservableSource<LoginResult> apply(HttpResult httpResult) throws Exception { if(httpResult.getResult().equals("used")){ UserInfo userInfo = getUserInfo(); LoginResult loginResult = httpRequest.getLoginResult(userInfo.getName(), userInfo.getPass()).execute().body(); return Observable.just(loginResult); } } }) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<LoginResult>() { @Override public void accept(LoginResult loginResult) throws Exception { mText.setText(loginResult.getResult()); } });
這樣就可以實現註冊完直接登入的需求。把註冊和登入這兩個事件平鋪完之後統一發放下去
我們建立Retrofit物件的時候添加了和Rxjava的適配,有什麼作用呢?
addCallAdapterFactory(RxJavaCallAdapterFactory.create())
它能讓Retrofit和Rxjava能夠無縫適配,讓我們更專注於事件的走向。比如之前的程式碼可以變的更簡潔
(1)把請求介面方法返回值改為Observable
@GET("zhuce/rxjava.php/") Observable<HttpResult> getRegisteResult(@Query("name") String name, @Query("pass") String pass);
(2)只需要進行請求,它會直接返回一個Observable物件,不需要自己去建立該物件並返回,不要太方便
Observable.just(getUserInfo()) .flatMap(new Function<UserInfo, ObservableSource<HttpResult>>() { @Override public ObservableSource<HttpResult> apply(UserInfo userInfo) throws Exception { returnhttpRequest.getRegisteResult(userInfo.getName(),userInfo.getPass()); } }) //....
(3)執行後丟擲異常
Unable to create call adapter for io.reactivex.Observable<com.example.rxjavademo.bean.HttpResult>
adapter-rxjava目前還不支援Rxjava 2.x。但是jakewharton大神自己寫了一個庫讓Retrofit來支援Rxjava 2.x
(4)引入庫
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
將設配工廠程式碼改為
addCallAdapterFactory(RxJava2CallAdapterFactory.create())
再次執行不再拋異常
過濾操作符
過濾操作符的目的是在一系列的Observable中過濾一些事件後再發出。
過濾操作符有:Debounce、Filter、Distinct、First、Take.....
比如經常看見的一個需求:在文字框輸入關鍵字,對關鍵字進行過濾搜尋(可以聯想淘寶搜尋框)。
普通的做法是,通過一個文字框監聽文字內容的變化後進行網路請求,再展示返回的資料
mEdSearch.addTextChangedListener(new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { } @Override public void onTextChanged(CharSequence s, int start, int before, int count) { //文字變化 --請求網路---更新結果 } @Override public void afterTextChanged(Editable s) { } });
這會有兩個問題:
- 往往我們需要的是一定量的關鍵字再過濾,而不是文字內容一變化就搜尋,這會造成很多流量的浪費
- 有可能因為網路的波動,會引起第一次搜尋結果晚於第二次搜尋結果到達,結果不可控。
通過過濾操作符可以解決以上兩個問題:
(1)引入RxBinding庫
這個例項需要引入RxBinding庫
implementation 'com.jakewharton.rxbinding3:rxbinding:3.0.0-alpha2'
(2)使用過濾操作符的debounce

過濾操作符debounce.png
過濾操作符debounce 它會在定義的時間內監測最後的變化結果。
比如下面的例子表示它會獲取在監測到文字變化後 2000ms 內的最後一次結果
RxTextView.textChanges(mEdSearch).debounce(2000,TimeUnit.MILLISECONDS)
(3)使用過濾操作符filter

過濾操作符filter.png
過濾操作符filter,它會根據你的過濾規則返回一個布林值,如果為false就不會將事件傳遞下去
比如下面的例子表示只有當輸入的文字內容不為空,才可以繼續之後的事件
.filter(new Predicate<CharSequence>() { @Override public boolean test(CharSequence charSequence) throws Exception { returncharSequence.toString().length()>0; } })
(4)獲取網路請求過濾的結果
switchMap(new Function<CharSequence, ObservableSource<List<String>>>() { @Override public ObservableSource<List<String>> apply(CharSequence charSequence) throws Exception { //模擬網路請求過濾的結果 List<String> list = new ArrayList<>(); list.add("abc"); list.add("abcc"); return Observable.just(list); } })
(5)把事件結果傳遞給 Subscribe
.observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> stringList) throws Exception { StringBuilder str = new StringBuilder(); for(String s : stringList){ str.append(s+"\n"); } mText.setText(str.toString()); } });
完整程式碼如下
RxTextView.textChanges(mEdSearch).debounce(2000,TimeUnit.MILLISECONDS) .subscribeOn(AndroidSchedulers.mainThread())//UI操作要在主執行緒 .filter(new Predicate<CharSequence>() { @Override public boolean test(CharSequence charSequence) throws Exception { returncharSequence.toString().length()>0; } }) .subscribeOn(Schedulers.io())//swtichmap裡的網路請求放在io執行緒 .switchMap(new Function<CharSequence, ObservableSource<List<String>>>() { @Override public ObservableSource<List<String>> apply(CharSequence charSequence) throws Exception { //模擬網路請求過濾的結果 List<String> list = new ArrayList<>(); list.add("abc"); list.add("abcc"); return Observable.just(list); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> stringList) throws Exception { StringBuilder str = new StringBuilder(); for(String s : stringList){ str.append(s+"\n"); } mText.setText(str.toString()); } });
總結
本篇部落格筆者介紹了變化操作符和過濾操作符中的幾個,想學習其他的用法,可以傳送到Rxjava的中文文件
英文好的可以傳送到官方文件
由於操作符太多,寫著寫著發現才寫了兩類操作符的幾個,文章就有點過長了。還是需要再來一篇部落格寫剩下的操作符,可能還需要兩篇[啊哈哈]