1. 程式人生 > >RxJava2.0 學習(2)----實際使用場景 2018年

RxJava2.0 學習(2)----實際使用場景 2018年

RxJava2.0 學習(2)----實際使用場景 2018年

看了 nanchen 大神得 Rxjava2.0 demo 自己做下總結

demo地址

0 執行緒切換

subscribeOn() 指定的就是發射事件的執行緒,多次呼叫 subscribeOn() 只有第一次的有效

observerOn 指定的就是訂閱者接收事件的執行緒,但多次指定訂閱者接收執行緒是可以的,也就是說每呼叫一次 observerOn(),下游的執行緒就會切換一次

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
            e.onNext(1);
            e.onComplete();
        }
    }).subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                }
            })
            .observeOn(Schedulers.io())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                }
            });

1 使用 Map 做簡單的網路請求

執行順序是 Map --> doOnNext --> subscribe (訂閱者)

ps:doOnNext 在 訂閱者收到訊息之前 做點什麼

        Observable.create(new ObservableOnSubscribe<Response>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
            Builder builder = new Builder()
                    .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")

                    .get();
            Request request = builder.build();
            Call call = new OkHttpClient().newCall(request);
            Response response = call.execute();
            e.onNext(response);
        }
    })
            .subscribeOn(Schedulers.io())
            .map(new Function<Response, MobileAddress>() {
                @Override
                public MobileAddress apply(@NonNull Response response) throws Exception {

                    Log.e(TAG, "map 執行緒:" + Thread.currentThread().getName() + "\n");
                    if (response.isSuccessful()) {
                        ResponseBody body = response.body();
                        if (body != null) {
                            Log.e(TAG, "map:轉換前:" + response.body());
                            return new Gson().fromJson(body.string(), MobileAddress.class);
                        }
                    }
                    return null;
                }
            }).observeOn(AndroidSchedulers.mainThread())

            .doOnNext(new Consumer<MobileAddress>() {
                @Override
                public void accept(@NonNull MobileAddress s) throws Exception {
                    Log.e(TAG, "doOnNext 執行緒:" + Thread.currentThread().getName() + "\n");
                    mRxOperatorsText.append("\ndoOnNext 執行緒:" + Thread.currentThread().getName() + "\n");
                    Log.e(TAG, "doOnNext: 儲存成功:" + s.getResult().toString() + "\n");
                    mRxOperatorsText.append("doOnNext: 儲存成功:" +  s.getResult().toString() + "\n");

                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<MobileAddress>() {
                @Override
                public void accept(@NonNull MobileAddress data) throws Exception {
                    Log.e(TAG, "subscribe 執行緒:" + Thread.currentThread().getName() + "\n");
                    mRxOperatorsText.append("\nsubscribe 執行緒:" + Thread.currentThread().getName() + "\n");
                    Log.e(TAG, "成功:" + data.toString() + "\n");
                    mRxOperatorsText.append("成功:" + data.toString() + "\n");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    Log.e(TAG, "subscribe 執行緒:" + Thread.currentThread().getName() + "\n");
                    mRxOperatorsText.append("\nsubscribe 執行緒:" + Thread.currentThread().getName() + "\n");

                    Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
                    mRxOperatorsText.append("失敗:" + throwable.getMessage() + "\n");
                }
            }); 

2 使用Rx2-Networking

Rx2AndroidNetworking 使用此框架 進行網路請求。

getObjectObservable(MobileAddress.class) 方便了 實體類結構。

        Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
            .build()
            .getObjectObservable(MobileAddress.class)
            .observeOn(AndroidSchedulers.mainThread()) // 為doOnNext() 指定在主執行緒,否則報錯
            .doOnNext(new Consumer<MobileAddress>() {
                @Override
                public void accept(@NonNull MobileAddress data) throws Exception {
                    Log.e(TAG, "doOnNext:"+Thread.currentThread().getName()+"\n" );
                    mRxOperatorsText.append("\ndoOnNext:"+Thread.currentThread().getName()+"\n" );
                    Log.e(TAG,"doOnNext:"+data.toString()+"\n");
                    mRxOperatorsText.append("doOnNext:"+data.toString()+"\n");
                }
            })
            .map(new Function<MobileAddress, ResultBean>() {
                @Override
                public ResultBean apply(@NonNull MobileAddress mobileAddress) throws Exception {
                    Log.e(TAG, "\n" );
                    mRxOperatorsText.append("\n");
                    Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" );
                    mRxOperatorsText.append("map:"+Thread.currentThread().getName()+"\n" );
                    return mobileAddress.getResult();
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<ResultBean>() {
                @Override
                public void accept(@NonNull ResultBean data) throws Exception {
                    Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName()+"\n" );
                    mRxOperatorsText.append("\nsubscribe 成功:"+Thread.currentThread().getName()+"\n" );
                    Log.e(TAG, "成功:" + data.toString() + "\n");
                    mRxOperatorsText.append("成功:" + data.toString() + "\n");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName()+"\n" );
                    mRxOperatorsText.append("\nsubscribe 失敗:"+Thread.currentThread().getName()+"\n" );
                    Log.e(TAG, "失敗:"+ throwable.getMessage()+"\n" );
                    mRxOperatorsText.append("失敗:"+ throwable.getMessage()+"\n");
                }
            });

使用ZIP 結合多個介面的資料再更新ui

將observable1 和 observable2同時請求 處理合並後叫給訂閱者

其中有一個請求異常就視為【失敗】

        Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
            .build()
            .getObjectObservable(MobileAddress.class);
    Observable<CategoryResult> observable2 = Network.getGankApi()
            .getCategoryData("Android",1,1);
    Observable.zip(observable1, observable1, new BiFunction<MobileAddress, CategoryResult, String>() {
        @Override
        public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
            return "合併後的資料為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    Log.e(TAG, "accept: 成功:" + s+"\n");
                    mRxOperatorsText.setText(s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    mRxOperatorsText.setText("失敗");

                    Log.e(TAG, "accept: 失敗:" + throwable+"\n");
                }
            });

使用 flatmap (連續請求)

(多個網路請求依次依賴,比如:
1、註冊使用者前先通過介面A獲取當前使用者是否已註冊,再通過介面B註冊;
2、註冊後自動登入,先通過註冊介面註冊使用者資訊,註冊成功後馬上呼叫登入介面進行自動登入。

例子所用API來自天狗網)

        Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
            .addQueryParameter("rows", 1 + "")
            .build()
            .getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,並解析到FootList
            .subscribeOn(Schedulers.io())        // 在io執行緒進行網路請求
            .observeOn(AndroidSchedulers.mainThread()) // 在主執行緒處理獲取食品列表的請求結果
            .doOnNext(new Consumer<FoodList>() {
                @Override
                public void accept(@NonNull FoodList foodList) throws Exception {
                    // 先根據獲取食品列表的響應結果做一些操作
                    Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                    mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                }
            })
            .observeOn(Schedulers.io()) // 回到 io 執行緒去處理獲取食品詳情的請求
            .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                @Override
                public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                    if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                        return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                .build()
                                .getObjectObservable(FoodDetail.class);
                    }
                    return null;

                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<FoodDetail>() {
                @Override
                public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                    Log.e(TAG, "accept: success :" + foodDetail.toString());
                    mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    Log.e(TAG, "accept: error :" + throwable.getMessage());
                    mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                }
            });

使用 concat (進行快取資料 和 網路新資料 處理)

使用場景:讓使用者回到 載入過的頁面時有想重新整理資料,不那麼突然。

(先讀取快取資料並展示UI再獲取網路資料重新整理UI)

isFromNet 用來標記是否有本地資料

e.onNext(data); 看情況是否需要下一步

e.onComplete(); 沒快取的話 進入下一個請求

   Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
            Log.e(TAG, "create當前執行緒:"+Thread.currentThread().getName() );
            FoodList data = CacheManager.getInstance().getFoodListData();

            // 在操作符 concat 中,只有呼叫 onComplete 之後才會執行下一個 Observable
            if (data != null){ // 如果快取資料不為空,則直接讀取快取資料,而不讀取網路資料
                isFromNet = false;
                Log.e(TAG, "\nsubscribe: 讀取快取資料:" );
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        mRxOperatorsText.append("\nsubscribe: 讀取快取資料:\n");
                    }
                });
                e.onNext(data);
            }else {
                isFromNet = true;
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        mRxOperatorsText.append("\nsubscribe: 讀取網路資料:\n");
                    }
                });
                Log.e(TAG, "\nsubscribe: 讀取網路資料:" );
                e.onComplete();
            }


        }
    });

    Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
            .addQueryParameter("rows",10+"")
            .build()
            .getObjectObservable(FoodList.class);


    // 兩個 Observable 的泛型應當保持一致

    Observable.concat(cache,network)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<FoodList>() {
                @Override
                public void accept(@NonNull FoodList tngouBeen) throws Exception {
                    Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                    if (isFromNet){
                        mRxOperatorsText.append("accept : 網路獲取資料設定快取: \n");
                        Log.e(TAG, "accept : 網路獲取資料設定快取: \n"+tngouBeen.toString() );
                        CacheManager.getInstance().setFoodListData(tngouBeen);
                    }

                    mRxOperatorsText.append("accept: 讀取資料成功:" + tngouBeen.toString()+"\n");
                    Log.e(TAG, "accept: 讀取資料成功:" + tngouBeen.toString());
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
                    Log.e(TAG, "accept: 讀取資料失敗:"+throwable.getMessage() );
                    mRxOperatorsText.append("accept: 讀取資料失敗:"+throwable.getMessage()+"\n");
                }
            });

}

使用 debounce ()

RxView 使用了的 Rxbanding (不是重點)

.debounce(2,TimeUnit.SECONDS) 點選事件,在2秒內重複點選,及時重新算。2秒後觸發。

        RxView.clicks(mRxOperatorsBtn)
                .debounce(2,TimeUnit.SECONDS) // 過濾掉髮射頻率小於2秒的發射事件
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        clickBtn();
                    }
                });
    }

    private void clickBtn() {
        Rx2AndroidNetworking.get("http://120.77.35.147:8080/goods/homepage")
//                .addQueryParameter("rows",2+"") // 只獲取兩條資料
                .build()
                .getObjectObservable(ShopMallFristBean.class)
                .subscribeOn(Schedulers.io())  // 在 io 執行緒進行網路請求
                .observeOn(AndroidSchedulers.mainThread()) // 在主執行緒進行更新UI等操作
                .subscribe(new Consumer<ShopMallFristBean>() {
                    @Override
                    public void accept(@NonNull ShopMallFristBean foodList) throws Exception {
                        Log.e(TAG, "accept: 獲取資料成功:"+foodList.toString()+"\n" );
                        mRxOperatorsText.append("accept: 獲取資料成功:"+foodList.toString()+"\n" );
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 獲取資料失敗:"+throwable.getMessage() +"\n");
                        mRxOperatorsText.append("accept: 獲取資料失敗:"+throwable.getMessage() +"\n");
                    }
                }); 

使用 interval 實現心跳機制

Disposable 一次性資源;
interval(1, TimeUnit.SECONDS) 每個一秒傳送一次事件
doOnNext 提前處理事件

        mDisposable = 
            Flowable.interval(1, TimeUnit.SECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    Log.e(TAG, "accept: doOnNext : "+aLong );
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    Log.e(TAG, "accept: 設定文字 :"+aLong );
                    mRxOperatorsText.append("accept: 設定文字 :"+aLong +"\n");
                }
            });

學習後,掌握了基本的使用方法,但是還有一些細節上的問題,有疑惑。。。 等候面消化消化在來分享,歡迎評論交流~~