Rxjava2-Android-Samlpes(一)
-
最近工作不那麼忙,計劃重新搭建一個MVP架構!基於
Rxjava2-Rxandroid
首先掌握RxJava2的使用的方式! -
此文章是根據老外ofollow,noindex">amitshekhariitbhu 的
RxJava2-Android-Samples
Demo 改裝而成,是個翻譯版本,足夠應付對RxJava2的全部姿勢! -
GitHub地址:RxJava2-Android-Samples
Map Zip Filter FlatMap Take Reduce Skip Buffer Concat Replay Merge SwitchMap
Operators
- 1、 簡單的一個順序執行的Demo
/* * 一個一個地發出兩個值的簡單例子 */ private void doSomeWork() { getObservable() // 在後臺執行緒上執行 .subscribeOn(Schedulers.io()) // 在主執行緒上被通知 .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); } private Observable<String> getObservable() { return Observable.just("1", "2","3","4","5","6"); }
- d.dispose();todo 如果這個方法放開的話,就不會往下面走了
private Observer<String> getObserver() { return new Observer<String>() { /** * *為觀察者提供取消(處理)的方法。 *連線(通道)和可觀察到的兩個 *同步(從{{Link Lang-OnNeXT(object)})和非同步方式。 */ @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); //處理資源,操作應該是冪等的。 //d.dispose();todo 如果這個方法放開的話,就不會往下面走了 } @Override public void onNext(String value) { Log.d(TAG, " onNext : value : " + value); } @Override public void onError(Throwable e) { Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, " onComplete"); } }; }
- 輸出結果
11-30 10:03:17.883 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onSubscribe : false 11-30 10:03:17.923 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onNext : value : 1 11-30 10:03:17.931 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onNext : value : 2 11-30 10:03:17.938 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onNext : value : 3 11-30 10:03:17.944 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onNext : value : 4 11-30 10:03:17.950 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onNext : value : 5 11-30 10:03:17.955 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onNext : value : 6 11-30 10:03:17.961 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:onComplete
-
2、通過
map
運算子 處理網咯請求的Demo
,就比如說我去網路上請求個ApiUser
回來,然後轉化成我想要的User
類
private void doSomeWork() { getObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .map(new Function<List<ApiUser>, List<User>>() { //通過前面的東西,如何獲取後面的東西 @Override public List<User> apply(List<ApiUser> apiUsers) { return Utils.convertApiUserListToUserList(apiUsers); } }) .subscribe(getObserver()); } private Observable<List<ApiUser>> getObservable() { return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() { @Override public void subscribe(ObservableEmitter<List<ApiUser>> e) { if (!e.isDisposed()) { // List<ApiUser> 得到這個 物件 e.onNext(Utils.getApiUserList()); e.onComplete(); } } }); }
-
處理得到的
List<User>
//處理得到的 List<User> private Observer<List<User>> getObserver() { return new Observer<List<User>>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(List<User> userList) { Log.d(TAG, " onNext : " + userList.size()); } @Override public void onError(Throwable e) { Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, " onComplete"); } };
- 輸出結果
11-30 10:22:26.566 16586-16586/com.rxjava2.android.samples D/MapExampleActivity:onSubscribe : false 11-30 10:22:26.619 16586-16586/com.rxjava2.android.samples D/MapExampleActivity:onNext : 3 11-30 10:22:26.624 16586-16586/com.rxjava2.android.samples D/MapExampleActivity:onComplete
zip
private Observable<List<User>> getCricketFansObservable() { return Observable.create(new ObservableOnSubscribe<List<User>>() { @Override public void subscribe(ObservableEmitter<List<User>> e) { if (!e.isDisposed()) { e.onNext(Utils.getUserListWhoLovesCricket()); e.onComplete(); } } }); } private Observable<List<User>> getFootballFansObservable() { return Observable.create(new ObservableOnSubscribe<List<User>>() { @Override public void subscribe(ObservableEmitter<List<User>> e) { if (!e.isDisposed()) { e.onNext(Utils.getUserListWhoLovesFootball()); e.onComplete(); } } }); }
- 處理
private void doSomeWork() { // 獲取喜歡足球名單的人員獲取板球球迷的名單 Observable.zip(getCricketFansObservable(), getFootballFansObservable(), // 有點kotlin的啊 第一個對應的是沙面位置的第一個,最後是期望 new BiFunction<List<User>, List<User>, List<User>>() { @Override public List<User> apply(List<User> cricketFans, List<User> footballFans) { return Utils.filterUserWhoLovesBoth(cricketFans, footballFans); } }) // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
- 結果監聽
private Observer<List<User>> getObserver() { return new Observer<List<User>>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(List<User> userList) { textView.append(" onNext"); textView.append(AppConstant.LINE_SEPARATOR); for (User user : userList) { textView.append(" firstname : " + user.firstname); textView.append(AppConstant.LINE_SEPARATOR); } Log.d(TAG, " onNext : " + userList.size()); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } };
- 輸出,過程就是找出兩個類的共性,然後放到主執行緒操作它的結果
11-30 10:28:16.326 16586-16586/com.rxjava2.android.samples D/ZipExampleActivity:onSubscribe : false 11-30 10:28:16.353 16586-16586/com.rxjava2.android.samples D/ZipExampleActivity:onNext : 1 11-30 10:28:16.358 16586-16586/com.rxjava2.android.samples D/ZipExampleActivity:onComplete
- 4、對一些耗時操作的問題,可以使用使用容器 Disposable ,在活動被破壞後不要傳送事件
//一次性容器,可以容納多個其他一次性物品,並提供O(1)新增和移除複雜性。 private final CompositeDisposable disposables = new CompositeDisposable(); @Override protected void onDestroy() { super.onDestroy(); //在活動被破壞後不要傳送事件 disposables.clear(); // do not send event after activity has been destroyed }
disposables.add(sampleObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<String>() { @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onNext(String value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext value : " + value); } }));
- sampleObservable
static Observable<String> sampleObservable() { return Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() { // Do some long running operation // 做一些長時間執行的操作 SystemClock.sleep(2000); return Observable.just("one", "two", "three", "four", "five"); } }); }
- 輸出結果
11-30 10:32:47.735 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:onNext value : one 11-30 10:32:47.748 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:onNext value : two 11-30 10:32:47.755 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:onNext value : three 11-30 10:32:47.762 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:onNext value : four 11-30 10:32:47.770 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:onNext value : five 11-30 10:32:47.775 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:onComplete
- 5、使用取算符,它只發出所需數量的值。這裡只有5箇中的3個
private void doSomeWork() { getObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .take(3) .subscribe(getObserver()); } private Observable<Integer> getObservable() { return Observable.just(1, 2, 3, 4, 5); }
- 監聽
private Observer<Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(Integer value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果
11-30 10:33:43.235 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:onSubscribe : false 11-30 10:33:43.254 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:onNext value : 1 11-30 10:33:43.259 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:onNext value : 2 11-30 10:33:43.265 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:onNext value : 3 11-30 10:33:43.271 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:onComplete
- 6、延遲兩秒執行
private void doSomeWork() { getObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); } private Observable<? extends Long> getObservable() { return Observable.timer(2, TimeUnit.SECONDS); }
- 7、定時器 不斷重複的執行使用RxJava執行使用間隔2秒的間隔執行任務的簡單示例
private final CompositeDisposable disposables = new CompositeDisposable(); @Override protected void onDestroy() { super.onDestroy(); disposables.clear(); // clearing it : do not emit after destroy }
-
使用間隔2秒的間隔執行任務的簡單示例立即開始,
initialDelay
的開始的時間為0
private void doSomeWork() { disposables.add(getObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(getObserver())); } private Observable<? extends Long> getObservable() { return Observable.interval(0, 2, TimeUnit.SECONDS); }
- 8、使用單觀察者的簡單例子
/* * simple example using SingleObserver *使用單觀察者的簡單例子 */ private void doSomeWork() { Single.just("Amit") .subscribe(getSingleObserver()); }
- 觀察者
private SingleObserver<String> getSingleObserver() { return new SingleObserver<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onSuccess(String value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } }; }
- 輸出結果 :注意這裡沒有onNext的方法了,這個比較特殊
11-30 11:11:00.612 16586-16586/com.rxjava2.android.samples D/SingleObserverExampleActivity:onSubscribe : true 11-30 11:11:00.615 16586-16586/com.rxjava2.android.samples D/SingleObserverExampleActivity:onNext value : Amit
- 9、使用完全觀測器的簡單示例,延遲1s才去自行
Completable completable = Completable.timer(1000, TimeUnit.MILLISECONDS); completable .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getCompletableObserver()); private CompletableObserver getCompletableObserver() { return new CompletableObserver() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } }; }
- 輸出結果 :延遲1s
11-30 11:12:41.248 16586-16586/com.rxjava2.android.samples D/CompletableObserverExampleActivity:onSubscribe : false 11-30 11:12:42.270 16586-16586/com.rxjava2.android.samples D/CompletableObserverExampleActivity:onComplete
- 10、使用流動性的簡單示例,說白了 就是累加的過程 1+2+3+4+5====使用Rxjava管理這個帶了一個初始值
Flowable<Integer> observable = Flowable.just(100, 2, 3, 4); observable.reduce(50+1, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t1, Integer t2) { Log.d(TAG, "t1 : " + t1); Log.d(TAG, "t2 : " + t2); return t1 + t2; } }).subscribe(getObserver()); private SingleObserver<Integer> getObserver() { return new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onSuccess(Integer value) { textView.append(" onSuccess : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onSuccess : value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } }; }
- 輸出結果:51+100+2+3+4=160
11-30 11:14:40.489 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:onSubscribe : false 11-30 11:14:40.490 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:t1 : 51 t2 : 100 t1 : 151 t2 : 2 t1 : 153 t2 : 3 t1 : 156 11-30 11:14:40.491 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:t2 : 4 11-30 11:14:40.496 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:onSuccess : value : 160
- 11、說白了 就是累加的過程 1+2+3+4+5====使用Rxjava管理這個沒有帶一個初始值
/* * simple example using reduce to add all the number * * 簡單的例子,用以減少所有數字的新增 */ private void doSomeWork() { getObservable() .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t1, Integer t2) { return t1 + t2; } }) .subscribe(getObserver()); } private Observable<Integer> getObservable() { return Observable.just(1, 2, 3, 4); } private MaybeObserver<Integer> getObserver() { return new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onSuccess(Integer value) { textView.append(" onSuccess : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onSuccess : value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果
11-30 11:16:17.035 16586-16586/com.rxjava2.android.samples D/ReduceExampleActivity:onSubscribe : false 11-30 11:16:17.039 16586-16586/com.rxjava2.android.samples D/ReduceExampleActivity:onSuccess : value : 10
- 12 、一個buffer 緩衝取 ,我要從緩衝區中取資料,而且是跳著取資料,就要這樣做
/* * simple example using buffer operator - bundles all emitted values into a list * * 使用緩衝運算子的簡單示例-將所有發出的值捆綁到列表中 */ private void doSomeWork() { Observable<List<String>> buffered = getObservable().buffer(3, 1); // 3 means,從開始索引和建立列表中最多需要三個 // 1 means, 每次跳一步 // so the it gives the following list // 1 - one, two, three // 2 - two, three, four // 3 - three, four, five // 4 - four, five // 5 - five buffered.subscribe(getObserver()); } private Observable<String> getObservable() { return Observable.just("one", "two", "three", "four", "five"); } private Observer<List<String>> getObserver() { return new Observer<List<String>>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(List<String> stringList) { textView.append(" onNext size : " + stringList.size()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext : size :" + stringList.size()); for (String value : stringList) { textView.append(" value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " : value :" + value); } } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果:以此取出3個數字,同時角標移動1.把這個buffer中的值去完成就ok
11-30 11:17:25.002 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:onSubscribe : false 11-30 11:17:25.009 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:onNext : size :3 11-30 11:17:25.012 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :one 11-30 11:17:25.014 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :two 11-30 11:17:25.016 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :three 11-30 11:17:25.018 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:onNext : size :3 11-30 11:17:25.019 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :two 11-30 11:17:25.021 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :three 11-30 11:17:25.022 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :four 11-30 11:17:25.025 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:onNext : size :3 11-30 11:17:25.027 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :three 11-30 11:17:25.029 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :four 11-30 11:17:25.031 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :five 11-30 11:17:25.033 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:onNext : size :2 11-30 11:17:25.035 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :four 11-30 11:17:25.037 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :five 11-30 11:17:25.039 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:onNext : size :1 11-30 11:17:25.040 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:: value :five 11-30 11:17:25.041 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:onComplete
- 13、 對資料中特定的資訊 做處理 使用過濾器操作符只發出偶數值的簡單示例
/* * simple example by using filter operator to emit only even value * 使用過濾器操作符只發出偶數值的簡單示例 */ private void doSomeWork() { Observable.just(1, 2, 3, 4, 5, 6) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) { return integer % 2 == 0; } }) .subscribe(getObserver()); } private Observer<Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(Integer value) { textView.append(" onNext : "); textView.append(AppConstant.LINE_SEPARATOR); textView.append(" value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext "); Log.d(TAG, " value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果,簡單的意思 就是取偶數
11-30 11:20:24.775 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:onSubscribe : false 11-30 11:20:24.779 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:onNext 11-30 11:20:24.780 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:value : 2 11-30 11:20:24.782 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:onNext value : 4 11-30 11:20:24.786 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:onNext value : 6 11-30 11:20:24.788 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:onComplete
- 14、使用跳過操作符,它不會發出前2個值。 對前面兩個值不會操作
/* Using skip operator, it will not emit * the first 2 values. * 使用跳過操作符,它不會發出前2個值。 */ private void doSomeWork() { getObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .skip(2) .subscribe(getObserver()); } private Observable<Integer> getObservable() { return Observable.just(1, 2, 3, 4, 5); } private Observer<Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(Integer value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果:跳出前面的兩個值
11-30 11:21:22.224 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:onSubscribe : false 11-30 11:21:22.235 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:onNext value : 3 11-30 11:21:22.236 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:onNext value : 4 onNext value : 5 11-30 11:21:22.237 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:onComplete
- 15、 使用掃描運算元,它也傳送先前的結果。 意思就是 我關心每次運算的結果 ,是每次運算的結果 這個有個關鍵的地方 subscribe 裡面的 觀察者 onNext的方法是先行執行的
/* Using scan operator, it sends also the previous result * 使用掃描運算元,它也傳送先前的結果。 * */ private void doSomeWork() { getObservable() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer int1, Integer int2) { Log.d(TAG, "int1 : " + int1); Log.d(TAG, "int2 : " + int2); return int1 + int2; } }) .subscribe(getObserver()); } private Observable<Integer> getObservable() { return Observable.just(1, 2, 3, 4, 5); } private Observer<Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(Integer value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果:三角形數
11-30 11:22:24.396 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:onSubscribe : false 11-30 11:22:24.409 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:onNext value : 1 int1 : 1 int2 : 2 11-30 11:22:24.411 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:onNext value : 3 int1 : 3 int2 : 3 11-30 11:22:24.414 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:onNext value : 6 int1 : 6 int2 : 4 11-30 11:22:24.417 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:onNext value : 10 int1 : 10 int2 : 5 11-30 11:22:24.419 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:onNext value : 15 11-30 11:22:24.420 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:onComplete
- 16、PublishSubject 我個人理解的話,就是一堆資料我要傳送給別人,但是呢最後幾個數字我又要傳送給其他人,所以就需要使用到這個,使用重放操作符,重放確保所有觀察者看到相同的序列。發射專案,即使它們訂閱後,可觀測已經開始發射專案。我個人理解的是,傳送一個buffer,給第一個觀察者,同時我要把這個buffer的尾部長度為4的在傳送給第二個觀察者。
private void doSomeWork() { PublishSubject<Integer> source = PublishSubject.create(); ConnectableObservable<Integer> connectableObservable = source.replay(4); //連線可連線的可觀察的 connectableObservable.connect(); connectableObservable.subscribe(getFirstObserver()); source.onNext(1); source.onNext(2); source.onNext(3); source.onNext(4); source.onNext(5); source.onNext(6); source.onNext(7); source.onComplete(); connectableObservable.subscribe(getSecondObserver()); } private Observer<Integer> getFirstObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " First onSubscribe : " + d.isDisposed()); } @Override public void onNext(Integer value) { textView.append(" First onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " First onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" First onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " First onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" First onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " First onComplete"); } }; } private Observer<Integer> getSecondObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed()); Log.d(TAG, " Second onSubscribe : " + d.isDisposed()); textView.append(AppConstant.LINE_SEPARATOR); } @Override public void onNext(Integer value) { textView.append(" Second onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " Second onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" Second onError : " + e.getMessage()); Log.d(TAG, " Second onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" Second onComplete"); Log.d(TAG, " Second onComplete"); } }; }
- 輸出結果
11-30 11:39:02.882 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onSubscribe : false 11-30 11:39:02.903 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onNext value : 1 11-30 11:39:02.911 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onNext value : 2 11-30 11:39:02.919 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onNext value : 3 11-30 11:39:02.926 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onNext value : 4 11-30 11:39:02.932 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onNext value : 5 11-30 11:39:02.938 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onNext value : 6 11-30 11:39:02.944 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onNext value : 7 11-30 11:39:02.950 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:First onComplete 11-30 11:39:02.954 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:Second onSubscribe : false 11-30 11:39:02.963 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:Second onNext value : 4 11-30 11:39:02.968 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:Second onNext value : 5 11-30 11:39:02.972 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:Second onNext value : 6 11-30 11:39:02.977 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:Second onNext value : 7 11-30 11:39:02.979 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:Second onComplete
- 17 、依次的傳送兩個陣列,而且裡面兩個陣列的是有序的輸出的,所以就要使用到這個裡面的
/** * 使用CONTAT運算子組合可觀察性:CONTAT維護 * 可觀察的順序。 * 將按順序發射所有7個值 * 這裡第一個“A1”,“A2”,“A3”,“A4”,然後是“B1”,“B2”,“B3”。 * 首先從第一個觀察到然後 * 所有從第二可觀察到的所有順序 */ private void doSomeWork() { final String[] aStrings = {"A1", "A2", "A3", "A4"}; final String[] bStrings = {"B1", "B2", "B3"}; final Observable<String> aObservable = Observable.fromArray(aStrings); final Observable<String> bObservable = Observable.fromArray(bStrings); Observable.concat(aObservable, bObservable) .subscribe(getObserver()); } private Observer<String> getObserver() { return new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(String value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext : value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果
11-30 11:42:43.359 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onSubscribe : false 11-30 11:42:43.369 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onNext : value : A1 11-30 11:42:43.375 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onNext : value : A2 11-30 11:42:43.382 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onNext : value : A3 11-30 11:42:43.388 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onNext : value : A4 11-30 11:42:43.393 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onNext : value : B1 11-30 11:42:43.399 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onNext : value : B2 11-30 11:42:43.404 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onNext : value : B3 11-30 11:42:43.409 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:onComplete
- 18、依次的傳送兩個陣列,而且裡面兩個陣列的不是有序的輸出的,但是我始終沒有測出來結果,哎哎 難受的很日了狗! RxJava 合併組合兩個(或多個)Observable資料來源
private void doSomeWork() { final String[] aStrings = {"A1", "A2", "A3", "A4","1", "2", "3", "4","5", "6", "7", "8","9", "10", "11", "12",}; final String[] bStrings = {"B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3"}; final Observable<String> aObservable = Observable.fromArray(aStrings); final Observable<String> bObservable = Observable.fromArray(bStrings); Observable.merge(aObservable, bObservable) .subscribe(getObserver()); } private Observer<String> getObserver() { return new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(String value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext : value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果:合併多個Observables的發射物, Merge 可能會讓合併的Observables發射的資料交錯(有一個類似的操作符 Concat 不會讓數 據交錯,它會按順序一個接著一個發射多個Observables的發射物,雖然我的結果沒有測試出來,但是呢?真的有可能資料會交叉!!!!
11-30 11:46:44.466 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onSubscribe : false 11-30 11:46:44.477 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : A1 11-30 11:46:44.484 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : A2 11-30 11:46:44.490 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : A3 11-30 11:46:44.495 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : A4 11-30 11:46:44.499 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 1 11-30 11:46:44.503 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 2 11-30 11:46:44.508 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 3 11-30 11:46:44.512 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 4 11-30 11:46:44.515 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 5 11-30 11:46:44.517 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 6 11-30 11:46:44.519 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 7 11-30 11:46:44.522 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 8 11-30 11:46:44.525 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 9 11-30 11:46:44.528 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 10 11-30 11:46:44.532 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 11 11-30 11:46:44.535 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : 12 11-30 11:46:44.537 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B1 11-30 11:46:44.539 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B2 11-30 11:46:44.540 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B3 11-30 11:46:44.542 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B1 11-30 11:46:44.543 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B2 11-30 11:46:44.546 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B3 11-30 11:46:44.548 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B1 11-30 11:46:44.551 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B2 11-30 11:46:44.553 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B3 11-30 11:46:44.556 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B1 11-30 11:46:44.559 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B2 11-30 11:46:44.561 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B3 11-30 11:46:44.564 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B1 11-30 11:46:44.567 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B2 11-30 11:46:44.570 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B3 11-30 11:46:44.573 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B1 11-30 11:46:44.575 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B2 11-30 11:46:44.577 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B3 11-30 11:46:44.578 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B1 11-30 11:46:44.580 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B2 11-30 11:46:44.581 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onNext : value : B3 11-30 11:46:44.582 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:onComplete
-
19、讓屬性跟著資料bean傳遞下去
Car 類
public class Car { private String brand; public void setBrand(String brand) { this.brand = brand; } public Observable<String> brandDeferObservable() { return Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() { return Observable.just(brand); } }); } }
/* * Defer used for Deferring Observable code until subscription in RxJava * 推遲在RxJava訂閱可觀察程式碼直到訂閱 */ private void doSomeWork() { Car car = new Car(); Observable<String> brandDeferObservable = car.brandDeferObservable(); // 即使我們在建立了可觀察的品牌之後設定了品牌,我們也會得到寶馬的品牌。如果我們不使用延遲器,我們將沒有作為品牌。 car.setBrand("BMW");// Even if we are setting the brand after creating Observable // we will get the brand as BMW. // If we had not used defer, we would have got null as the brand. brandDeferObservable .subscribe(getObserver()); } private Observer<String> getObserver() { return new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(String value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext : value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
- 輸出結果
11-30 14:17:07.380 19690-19690/com.rxjava2.android.samples D/DeferExampleActivity:onSubscribe : false 11-30 14:17:07.388 19690-19690/com.rxjava2.android.samples D/DeferExampleActivity:onNext : value : BMW 11-30 14:17:07.392 19690-19690/com.rxjava2.android.samples D/DeferExampleActivity:onComplete
- 20、去重,對資料來源,進行去重的操作
/* * distinct() suppresses duplicate items emitted by the source Observable. * 區別()抑制由可觀察到的源發出的重複項。 */ private void doSomeWork() { getObservable() .distinct() .subscribe(getObserver()); } private Observable<Integer> getObservable() { return Observable.just(1, 2, 1, 1, 2, 3, 4, 6, 4); } private Observer<Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(Integer value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext value : " + value); } @Override public void onError(Throwable e) { Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, " onComplete"); } }; }
- 輸出的結果
11-30 14:20:04.382 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onSubscribe : false 11-30 14:20:04.388 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onNext value : 1 11-30 14:20:04.391 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onNext value : 2 11-30 14:20:04.394 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onNext value : 3 11-30 14:20:04.397 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onNext value : 4 11-30 14:20:04.400 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onNext value : 6 onComplete
-
21、對資料來源傳遞,但是隻不過是取最後一個數,把前面的都不需要了,同時給了一個預設的值
A1
private void doSomeWork() { // the default item ("A1") to emit if the source ObservableSource is empty getObservable().last("A1") // the default item ("A1") to emit if the source ObservableSource is empty .subscribe(getObserver()); } private Observable<String> getObservable() { return Observable.just("A1", "A2", "A3", "A4", "A5", "A6"); } private SingleObserver<String> getObserver() { return new SingleObserver<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onSuccess(String value) { textView.append(" onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } }; }
- 輸出結果
11-30 14:20:42.788 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onSubscribe : false 11-30 14:20:42.800 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:onNext value : A6
- 未完待續。。。。。Rxjava2-Android-Samlpes(二)