1. 程式人生 > >rxJava 2.0入門之觀察者模式

rxJava 2.0入門之觀察者模式

前言

RxJava其實已經推出很久了,可以說是已經很火了,但是目前仍然還有相當一部分Android開發者沒有使用過,甚至說是想用,卻不知道怎麼用,或者不知道自己的專案哪裡可以用到,從本篇開始我們將以一些列文章逐步揭開rxJava神奇的面紗,從入門到實戰,讓你也可以輕鬆上手rxJava。
https://blog.csdn.net/wmz199123/article/details/78845659

為什麼要使用rxJava

眾所周知rxJava是一個以觀察者模式思想為核心基於事件流的通過鏈式程式設計優雅簡潔的實現非同步操作的庫。
隨著專案的不斷壯大,我們的業務邏輯越來越複雜,然而我們可以通過rxJava提供的豐富的操作符和便捷的非同步操作來完成對於複雜業務的處理。
相比於Android中的AsyncTask 、Handler實現非同步的方式,rxJava有個很大的優點就是:隨著程式邏輯變得越來越複雜,RxJava依然能夠保持簡潔,這也是rxJava深受廣大開發者喜愛的原因。

新增依賴支援

要在Android中使用RxJava2, 先新增Gradle配置:

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
// 使用各種操作符時必須新增
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

開始進入正題

此處借用別人舉的水管的例子來進行講解,如下圖:
在這裡插入圖片描述
上面一根水管為事件產生的水管,叫它上游吧,下面一根水管為事件接收的水管叫它下游吧,兩根水管通過一定的方式連線起來,使得上游每產生一個事件,下游就能收到該事件。
這裡的上游和下游就分別對應著RxJava中的Observable和Observer,它們之間的連線就對應著subscribe(),因此這個關係用RxJava來表示就是:

        //1,建立一個上游 Observable:被觀察者
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.
onNext(2); emitter.onNext(3); emitter.onComplete(); } }); //2,建立一個下游 Observer:觀察者 Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "error"); } @Override public void onComplete() { Log.d(TAG, "complete"); } }; //3,建立訂閱關係 observable.subscribe(observer);

這個執行的結果就是:

12-02 03:37:17.818 4166-4166/zlc.season.rxjava2demo D/TAG: subscribe
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 1
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 2
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 3
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: complete

這裡需要強調的是: 只有當觀察者和被觀察者建立連線之後, 被觀察者才會開始傳送事件. 也就是呼叫了觀察者的subscribe()方法之後才開始傳送事件。
把這段程式碼連起來寫就成了RxJava引以為傲的鏈式操作:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "complete");
            }
        });

接下來解釋一下其中兩個陌生的玩意:ObservableEmitter和Disposable.

ObservableEmitter

Emitter是發射器的意思,那就很好猜了,這個就是用來發出事件的,它可以發出三種類型的事件,通過呼叫emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發出next事件、complete事件和error事件。

但是,請注意,並不意味著你可以隨意亂七八糟發射事件,需要滿足一定的規則:

1,上游可以傳送無限個onNext, 下游也可以接收無限個onNext.
2,當上遊傳送了一個onComplete後,上游onComplete之後的事件將會繼續傳送, 而下游收到onComplete事件之後將不再繼續接收事件.
3,當上遊傳送了一個onError後, 上游onError之後的事件將繼續傳送, 而下游收到onError事件之後將不再繼續接收事件.
4,上游可以不傳送onComplete或onError.
5,最為關鍵的是onComplete和onError必須唯一併且互斥,即不能發多個onComplete, 也不能發多個onError, 也不能先發一個onComplete, 然後再發一個onError,反之亦然,傳送多個onComplete是可以正常執行的, 依然是收到第一個onComplete就不再接收了, 但若是傳送多個onError, 則收到第二個onError事件會導致程式會崩潰.

Disposable

這個單詞的字面意思是一次性用品,用完即可丟棄的. 那麼在RxJava中怎麼去理解它呢, 對應於上面的水管的例子, 我們可以把它理解成兩根管道之間的一個機關, 當呼叫它的dispose()方法時, 它就會將兩根管道切斷, 從而導致下游收不到事件.

注意: 呼叫dispose()並不會導致上游不再繼續傳送事件, 上游會繼續傳送剩餘的事件.

來看個例子, 我們讓上游依次傳送1,2,3,complete,4,在下游收到第二個事件之後, 切斷水管, 看看執行結果:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
                i++;
                if (i == 2) {
                    Log.d(TAG, "dispose");
                    mDisposable.dispose();
                    Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "complete");
            }
        });

執行結果為:

12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4

從執行結果我們看到, 在收到onNext 2這個事件後, 切斷了水管, 但是上游仍然傳送了3, complete, 4這幾個事件, 而且上游並沒有因為傳送了onComplete而停止. 同時可以看到下游的onSubscribe()方法是最先呼叫的。

有時候,你可能覺得,我就列印幾個數,還要把Observable寫的那麼麻煩,能不能簡便一點呢?答案是肯定的,RxJava內建了很多簡化建立Observable物件的函式,比如Observable.just就是用來建立只發出一個事件就結束的Observable物件,上面建立Observable物件的程式碼可以簡化為一行:

Observable<String> observable = Observable.just("hello");

Observable.just同樣可以傳送多個引數:

Observable observable = Observable.just("you", "are", "beautiful");
Consumer<String> onNextConsumer = new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, s);
    }
};
observable.subscribe(onNextConsumer);

執行的結果如下:
在這裡插入圖片描述
同樣對於Observer,這個例子中,我們其實並不關心OnComplete和OnError,我們只需要在onNext的時候做一些處理,這時候就可以使用Consumer類。
subscribe()帶有一個Consumer引數的方法表示下游只關心onNext事件, 其他的事件我假裝沒看見, 因此我們如果只需要onNext事件可以這麼寫:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "onNext: " + integer);
            }
        });

RxJava2.X中,Observeable用於訂閱Observer,是不支援背壓的,而Flowable用於訂閱Subscriber,是支援背壓(Backpressure)的。
背壓是指在非同步場景中,被觀察者傳送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低傳送速度的策略。

rxJava其他觀察者模式

當然,除了上面這兩種觀察者,還有一類觀察者

Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver

關於背壓以及其他觀察者模式詳見部落格:https://www.jianshu.com/p/220955eefc1f

執行緒排程

正常情況下, Observer和Observable是工作在同一個執行緒中的, 也就是說Observable在哪個執行緒發事件, Observer就在哪個執行緒接收事件.
RxJava中, 當我們在主執行緒中去建立一個Observable來發送事件, 則這個Observable預設就在主執行緒傳送事件.
當我們在主執行緒去建立一個Observer來接收事件, 則這個Observer預設就在主執行緒中接收事件,但其實在現實工作中我們更多的是需要進行執行緒切換的,最常見的例子就是在子執行緒中請求網路資料,在主執行緒中進行展示

要達到這個目的, 我們需要先改變Observable傳送事件的執行緒, 讓它去子執行緒中傳送事件, 然後再改變Observer的執行緒, 讓它去主執行緒接收事件. 通過RxJava內建的執行緒排程器可以很輕鬆的做到這一點. 接下來看一段程式碼:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
        }
    });

    Consumer<Integer> consumer = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
            Log.d(TAG, "onNext: " + integer);
        }
    };

    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);
}

在這裡插入圖片描述
可以看到, observable傳送事件的執行緒的確改變了, 是在一個叫 RxNewThreadScheduler-1的執行緒中傳送的事件, 而consumer 仍然在主執行緒中接收事件, 這說明我們的目的達成了, 接下來看看是如何做到的。
這段程式碼只不過是增加了兩行程式碼:

.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())

簡單的來說, subscribeOn() 指定的是Observable傳送事件的執行緒, observeOn() 指定的是Observer接收事件的執行緒.
多次指定Observable的執行緒只有第一次指定的有效, 也就是說多次呼叫subscribeOn() 只有第一次的有效, 其餘的會被忽略.
多次指定Observer的執行緒是可以的, 也就是說每呼叫一次observeOn() , Observer的執行緒就會切換一次.例如:

observable.subscribeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(consumer);

這段程式碼中指定了兩次上游傳送事件的執行緒, 分別是newThread和IO執行緒, 下游也指定了兩次執行緒,分別是main和IO執行緒. 執行結果為:
在這裡插入圖片描述

可以看到, Observable雖然指定了兩次執行緒, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler執行緒中, 而Observer則跑到了RxCachedThreadScheduler 中, 這個CacheThread其實就是IO執行緒池中的一個.

RxJava 2.0中Scheduler執行緒排程器的種類

常用的是 Schedulers.io()進行耗時操作、和 AndroidSchedulers.mainThread()更新ui。

AndroidSchedulers.mainThread():

在Android UI執行緒中執行任務,為Android開發定製。

Schedulers . io( ):

用於IO密集型的操作,例如讀寫SD卡檔案,查詢資料庫,訪問網路等,具有執行緒快取機制,在此排程器接收到任務後,先檢查執行緒快取池中,是否有空閒的執行緒,如果有,則複用,如果沒有則建立新的執行緒,並加入到執行緒池中,如果每次都沒有空閒執行緒使用,可以無上限的建立新執行緒。

Schedulers.newThread( ):

在每執行一個任務時建立一個新的執行緒,不具有執行緒快取機制,因為建立一個新的執行緒比複用一個執行緒更耗時耗力,雖然使用Schedulers.io( )的地方,都可以使用Schedulers.newThread( ),但是,Schedulers.newThread( )的效率沒有Schedulers.io( )高。

Schedulers.computation():

用於CPU 密集型計算任務,即不會被 I/O 等操作限制性能的耗時操作,例如xml,json檔案的解析,Bitmap圖片的壓縮取樣等,具有固定的執行緒池,大小為CPU的核數。不可以用於I/O操作,因為I/O操作的等待時間會浪費CPU。

Schedulers.trampoline():

在當前執行緒立即執行任務,如果當前執行緒有任務在執行,則會將其暫停,等插入進來的任務執行完之後,再將未完成的任務接著執行。

Schedulers.single():

擁有一個執行緒單例,所有的任務都在這一個執行緒中執行,當此執行緒中有任務執行時,其他任務將會按照先進先出的順序依次執行。

Scheduler.from(@NonNull Executor executor):

指定一個執行緒排程器,由此排程器來控制任務的執行策略。

Observable.doSubscribe() 它和Subscribe.onStart() 同樣是在subscribe()呼叫後而且在事件傳送前執行,但區別在於它可以指定執行緒。

在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由於在 subscribe() 發生時就被呼叫了,因此不能指定執行緒,而是隻能執行在 subscribe() 被呼叫時的執行緒。這就導致如果 onStart() 中含有對執行緒有要求的程式碼(例如在介面上顯示一個 ProgressBar,這必須在主執行緒執行),將會有執行緒非法的風險,因為有時你無法預測 subscribe() 將會在什麼執行緒執行。

而與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 呼叫後而且在事件傳送前執行,但區別在於它可以指定執行緒。

預設情況下, doOnSubscribe() 執行在 subscribe() 發生的執行緒;而如果在 doOnSubscribe() 之後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的執行緒。
示例程式碼:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主執行緒執行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主執行緒
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

subscribeOn 作用於該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的建立操作符總是被其之後最近的 subscribeOn 控制 。沒看懂不要緊,看下面程式碼你就懂了。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onCompleted(
            
           

相關推薦

rxJava 2.0入門觀察模式

前言 RxJava其實已經推出很久了,可以說是已經很火了,但是目前仍然還有相當一部分Android開發者沒有使用過,甚至說是想用,卻不知道怎麼用,或者不知道自己的專案哪裡可以用到,從本篇開始我們將以一些列文章逐步揭開rxJava神奇的面紗,從入門到實戰,讓你也可以輕鬆上手rxJava

Android設計模式系列(2)--SDK原始碼觀察模式

觀察者模式,是一種非常常見的設計模式,在很多系統中隨處可見,尤其是涉及到資料狀態發生變化需要通知的情況下。本文以AbstractCursor為例子,展開分析。觀察者模式,Observer Pattern,是一個很實用的模式,本人曾經接觸到的各種平臺以及曾經參與專案中列印模板直譯器中都用到了此模式。 1.意圖

大話設計模式觀察模式

arm eve his watermark observer cts 多個 放下 們的 從前,有個放羊娃。每天都去山上放羊,一天,他認為十分無聊。就想了個捉弄大家尋開心的主意。他向著山下正在種田的農夫們大聲喊:“狼來了!狼來了!救命啊!”農夫們聽到喊聲

PHP面向對象觀察模式

模擬 信息 return date this != false 管理者 and     觀察者模式按我的的理解,主要分為觀察者類和被被觀察者類二個部分。被觀察者類會繼承一個接口(如:Observable)實現對觀察者的添加、刪除和通知(即通知觀察者發生了特定事件),它聚合了

設計模式觀察模式

觀察者模式 observer 監聽器的底層實現 observable 群發消息 轉發機制 1、觀察者模式的簡單介紹: a、核心: - 觀察者模式主要用於 1 : N 的通知中。當一個對象(目標對象 Subject 或者 Objservable )的狀態變化時,他需要及時告知

敏捷開發觀察模式

http observer targe mark ref text 一行代碼 模式 文件 事件響應是觀察者模式的核心點。 我們在某個基礎類中定義這麽一個成員變量,該成員變量的屬性值為Get和Set,具備默認值,在Set下賦值後,加上一行代碼用來觸發事件響應。當外部程序,為該

設計模式觀察模式詳解

http 通知 stat 發布-訂閱 () class arraylist nag .cn 觀察者模式又稱為發布-訂閱模式,涉及的角色有:   ●  抽象主題   ●  具體主題   ●  抽象觀察者   ●  具體觀察者 案例演示:公司發放工資的時候通知所有觀察者工資已發

23種設計模式觀察模式

主題 一個 server bsp 監聽 images 關系 .com 自動更新 觀察者模式(Observer):定義了一種一對多的關系,讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態發生變化時,會通知所有觀察者對象,使它們能夠自動更新自己。 23種設計模式之

GOF23設計模式觀察模式(observer)

hang 事件監聽器 rgs str arr public pda import lob 一、觀察者模式概述   觀察者模式主要用於 1 :N 的通知。當一個對象(目標對象 Subject 或 Observable)的狀態變化時,它需要通知一系列對象(觀察者對象 Obser

JAVA設計模式觀察模式

設計原則 設計模式 測試類 stat tin 正常 san date() 觀察者模式 轉載請註明出處:https://www.cnblogs.com/luohanguo/p/7825656.html 1、初步認識 觀察者模式的定義:   在對象之間定義了一對多的依賴

js 設計模式觀察模式

swe 有變 主題 頁面 ESS eache 不能 mov 學生類 觀察者模式 又被稱為“發布-訂閱”模式,目的是解決主題對象和觀察者之間功能的耦合性。發布者和訂閱者之間是互不幹擾的,沒有聯系的,通過觀察者,當做中介,將二者聯系起來。 例子:以學生和老師之間的為例 1.首先

Java 設計模式 觀察模式

float stock chan bject 17. sta chang eth int http://www.verejava.com/?id=16999149610674 package com.observer.theory; public class Test {

js觀察模式

.com update 模式 分享圖片 pub 執行 技術分享 函數 圖片 觀察者模式: 大體上是, 1、松耦合的代碼; 2、一對多的關系; 3、主體狀態變化時,所有依賴被通知; 4、主體和觀察者互不知曉。 基本上,滿足上面四點的,就可以算是觀察者模式了。來看一個demo,

PHP 設計模式觀察模式

通知 更新 一對多 改變 變化 variable 它的 all 正常 觀察者模式定義對象的一對多依賴,這樣一來,當一個對象改變狀態時,它的所有依賴者都會收到通知並自動更新! 設計原則 在觀察者模式中,會改變的是主題的狀態以及觀察者的數目。用這個模式,你可以改變

VUE學習--觀察模式

prop 江湖 雷鳴 天下 功夫 auth 技術 同時 也有 p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px ".PingFang SC"; color: #454545 } span.s1 { font: 12.0

Android異步框架RxJava 1.x系列(一) - 觀察模式及實現

from 預覽 目錄 ole 普通 清零 handler 使用 tps Android異步框架RxJava 1.x系列(一) - 觀察者模式及實現 前言 RxJava 是一款基於 Java VM 實現的響應式編程擴展庫 - 基於觀察者模式的異步和事件處理框架。RxJava

C++設計模式觀察模式

圖片 .com attach mov pan rtu cts gin pda //觀察者模式 class Observer{ public: virtual void Updata() = 0; }; class Subject{ public: voi

設計模式的藝術 行為型模式觀察模式

前言 紅燈停,綠燈行,在日常的交通中,每每遇到紅燈,司機總是要在路口進行等待,等到綠燈才能通過,這個時候司機就扮演了一個觀察者的角色,隨著燈的顏色的變化,司機的行為也跟著變化,在軟體系統中,有些物件之間也存在類似交通訊號燈和汽車之間的關係,一個物件的的行為狀態改變導致了其他物件的狀態或行為也發生

java23中設計模式觀察模式

什麼叫觀察者模式? 當被觀察者的資料更新時,將會通知觀察該主題的所有觀察者。就像---微信公眾號就是被觀察者---關注該公眾號的就是觀察者---》 當公眾號更新文章時,關注的人就會獲得該資訊。這就是觀察者模式的應用。 用圖說明: 下面將用程式碼來解釋上面的內容: 1.觀

ios觀察模式

什麼是觀察者模式?我們先打個比方,這就像你訂報紙。比如你想知道美國最近發生了些什麼新聞。你可能會訂閱一份美國週刊 。然後美國一旦有了新的故事,美國週刊就發一刊,並郵寄給你。當你收到這份報刊,然後你就能夠了解美國最新的動態。其實這就是觀察者模式,A對B的變化感興趣,就註冊為B的觀察者,當B發生變化