1. 程式人生 > >RxJava學習 - 9. Multicasting, Replaying, and Caching

RxJava學習 - 9. Multicasting, Replaying, and Caching

RxJava學習 - 9. Multicasting, Replaying, and Caching

我們已經見過hot和cold Observable,雖然大部分是cold(甚至使用Observable.interval()的)。當你有不止一個Observer的時候,預設行為是為每個Observer增加一個分開的流。你也許希望這樣,也許不希望這樣,我們需要意識到:什麼時候,通過multicasting(使用ConnectableObservable)會把一個Observable強制變成hot。

Understanding multicasting

看下面的程式碼:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable<Integer> threeIntegers = Observable.range(1, 3);

        threeIntegers.subscribe(i -> System.out.println("Observer One: " + i)
); threeIntegers.subscribe(i -> System.out.println("Observer Two: " + i)); } }

輸出是

Observer One: 1
Observer One: 2
Observer One: 3
Observer Two: 1
Observer Two: 2
Observer Two: 3

上面的程式,第一個Observer接收完所有的三個emissions,然後呼叫onComplete()。然後,第二個Observer接收三個emissions(重新生成的),然後呼叫onComplete()。這是兩個分開的流。如果我們想把他們合併到一個流,把每個emission都發給Observers,可以呼叫Observable的publish(),這樣返回一個ConnectableObservable。設定Observers,呼叫connect()來開始發射,兩個Observers收到了相同的emissions:

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

public class Launcher {
    public static void main(String[] args) {
        ConnectableObservable<Integer> threeIntegers =
                Observable.range(1, 3).publish();
        threeIntegers.subscribe(i -> System.out.println("Observer One: " + i));
        threeIntegers.subscribe(i -> System.out.println("Observer Two: " + i));
        threeIntegers.connect();
    }
}

輸出是

Observer One: 1
Observer Two: 1
Observer One: 2
Observer Two: 2
Observer One: 3
Observer Two: 3

使用ConnectableObservable會強制源成為hot,發射一個流給所有的Observers。這就叫multicasting,但是呼叫不同的operators,還有細微的差別。
甚至當你呼叫publish(),使用ConnectableObservable,下來的任何operators還是可以增加分開的流。我們看看這行為,該如何管理。

Multicasting with operators

看看multicasting在一個operators鏈內是如何工作的,我們使用Observable.range(),然後map每個emission成一個隨機整數。因為這些隨機值對每個訂閱來說,是不確定的,也是不同的,可以讓我們觀察multicasting是否工作了。
我們發射數字1-3,對映成一個0-100000的隨機整數。如果有兩個Observers,我們可以希望每個收到不同的值。注意,你的輸出和我的輸出是不同的,我們感興趣的是,確認兩個Observers接收到不同的隨機數:

import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        Observable<Integer> threeRandoms = Observable.range(1,3).map(i -> randomInt());

        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));        
    }
    
    public static int randomInt() {
        return ThreadLocalRandom.current().nextInt(100000);
    }        
}

我的輸出是

Observer 1: 6207
Observer 1: 10604
Observer 1: 5121
Observer 2: 7127
Observer 2: 94588
Observer 2: 34253

看,Observable.range()源產生了兩個分開的emission生成器,每個都是cold的。每個流有自己的分開的map()例項,
於是,每個Observer得到了不同的隨機數。流的結構如下:
Two separate streams of operations are created for each Observer

你會說,那我要給兩個Observers發射相同的三個隨機數,可以首先是在Observable.range()之後呼叫publish(),產生ConnectableObservable。然後,可以呼叫map(),然後是Observers呼叫connect()。但是,你看到了,不是你期望的結果。每個Observer得到了分開的隨機數:

import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        ConnectableObservable<Integer> threeInts = Observable.range(1,3).publish();
        Observable<Integer> threeRandoms = threeInts.map(i -> randomInt());
        
        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
        threeInts.connect();      
    }
    
    public static int randomInt() {
        return ThreadLocalRandom.current().nextInt(100000);
    }        
}

輸出是

Observer 1: 94084
Observer 2: 94961
Observer 1: 2308
Observer 2: 84564
Observer 1: 9046
Observer 2: 78881

為什麼呢?Observable.range()之後multicast,但是,multicasting發生在map()之前。map()之後,每個Observer仍然得到一個分開的流。publish()之前的任何事被組合成一個流(或者更技術一點,一個代理Observer)。
但是publish()之後,會fork成分開的流,如下圖:
fork

如果我們想防止map()產生兩個分開的流,我們需要在map()之後呼叫publish():

import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        ConnectableObservable<Integer> threeRandoms = Observable.range(1,3)
                .map(i -> randomInt()).publish();
        
        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
        threeRandoms.connect();
    }
    
    public static int randomInt() {
        return ThreadLocalRandom.current().nextInt(100000);
    }        
}

輸出是

Observer 1: 33845
Observer 2: 33845
Observer 1: 50389
Observer 2: 50389
Observer 1: 71504
Observer 2: 71504

終於對了!每個Observer得到了相同的三個隨機數。現在,一個流例項通過了整個鏈,因為map()在publish()的前面,而不是後面:
behind

When to multicast

多個Observers的時候,Multicasting有利於防止冗餘工作。也可以提高效能,減少記憶體和CPU的使用,或者能簡化業務邏輯(當所有的Observers需要相同的emissions的時候)。
資料驅動的cold Observables應該只有在你想提高效能或者多個Observers接收相同資料的時候使用multicast。記住,multicasting增加hot ConnectableObservables,你不得不小心並且及時呼叫onnect(),這樣Observers才不會錯過資料。
甚至如果你的源Observable是hot(比如JavaFX或者Android的UI事件),putting operators也可能導致冗餘工作和監聽。當只有一個Observer的時候,用不著multicast,multicasting能導致不必要的負擔。但是,如果有多個Observers,你需要找到可以multicast和合並上遊操作的代理點。這個點是典型的邊界,在這裡,Observers有相同的上游操作,和不同的下游操作。
比如,你有一個Observer,用來列印隨機數,但是另一個Observer使用reduce()求和。在這個點,一個流應該fork成兩個分開的流,因為他們不再冗餘,做的是不同的工作:

import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3)
                .map(i -> randomInt()).publish();
        //Observer 1 - print each random integer
        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        //Observer 2 - sum the random integers, then print
        threeRandoms.reduce(0, (total, next) -> total + next)
                .subscribe(i -> System.out.println("Observer 2: " + i));
        threeRandoms.connect();
    }
    
    public static int randomInt() {
        return ThreadLocalRandom.current().nextInt(100000);
    }        
}

輸出是

Observer 1: 96689
Observer 1: 9730
Observer 1: 86978
Observer 2: 193397

Automatic connection

有時候,你想手動呼叫ConnectableObservable的connect(),控制emissions開始發射的時間。允許Observable動態connect要小心點,這時候,Observers容易錯過emissions。

autoConnect()

ConnectableObservable可以很方便地使用autoConnect()。對於一個給定的ConnectableObservable,呼叫autoConnect()會返回一個Observable,在一定數量的Observers訂閱以後,它會自動呼叫connect()。前面的例子有兩個Observers,可以在publish()之後,立刻呼叫autoConnect(2):

import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        Observable<Integer> threeRandoms = Observable.range(1, 3)
                .map(i -> randomInt())
                .publish()
                .autoConnect(2);
        //Observer 1 - print each random integer
        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        //Observer 2 - sum the random integers, then print
        threeRandoms.reduce(0, (total, next) -> total + next)
                .subscribe(i -> System.out.println("Observer 2: " + i));
    }
    
    public static int randomInt() {
        return ThreadLocalRandom.current().nextInt(100000);
    }        
}

這樣做,省掉了ConnectableObservable和後面的connect()呼叫。它會在得到兩個訂閱以後開始發射。
甚至當所有的下游Observers完成了或者dispose,autoConnect()會為源儲存它的訂閱。如果源finite並且disposes,如果新的Observer想訂閱,它不會再次訂閱源。
如果我們在上面的例子上增加第三個Observer,autoConnect()的引數還是2,第三個Observer會錯過這些emissions:

import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        Observable<Integer> threeRandoms = Observable.range(1, 3)
                .map(i -> randomInt()).publish().autoConnect(2);
        
        //Observer 1 - print each random integer
        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        //Observer 2 - sum the random integers, then print
        threeRandoms.reduce(0, (total, next) -> total + next).subscribe(i -> System.out.println("Observer 2: " + i));
        //Observer 3 - receives nothing
        threeRandoms.subscribe(i -> System.out.println("Observer 3: " + i));
    }
    
    public static int randomInt() {
        return ThreadLocalRandom.current().nextInt(100000);
    }        
}

如果你沒傳numberOfSubscribers引數,預設值是1。下面的例子,我們publish和autoConnect Observable.interval(),第一個Observer開始發射emissions,3秒以後,另一個Observer來了,但是會錯過前面幾個emissions,後面的都接收到了:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Long> seconds = Observable.interval(1, TimeUnit.SECONDS)
                .publish()
                .autoConnect();
        //Observer 1
        seconds.subscribe(i -> System.out.println("Observer 1: " + i));
        sleep(3000);
        //Observer 2
        seconds.subscribe(i -> System.out.println("Observer 2: " + i));
        sleep(3000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

如果numberOfSubscribers引數是0,它會立刻開始發射,不等任何Observers。

refCount() and share()

ConnectableObservable的refCount()類似autoConnect(1),在得到一個訂閱以後發射。但是,有一個重要的不同:當它不再有Observers的時候,會處置它自己,當有了新的訂閱,就重新開始。當它不再有Observers的時候,它不儲存對源的訂閱,當來了另一個Observer,就重新開始。
讓我們看個例子:有一個每秒發射的Observable.interval(),使用refCount()實現multicast。Observer 1接受了五個emissions,Observer 2接受了兩個。我們錯開這兩個訂閱,中間隔三秒鐘。因為這兩個訂閱是有限的(由於take()),在Observer 3來的時候,他們應該已經終止了,此時,不應該再有之前的Observers。注意,Observer 3開始於0:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Long> seconds =
                Observable.interval(1, TimeUnit.SECONDS)
                        .publish()
                        .refCount();
        //Observer 1
        seconds.take(5).subscribe(l -> System.out.println("Observer 1: " + l));
        sleep(3000);
        //Observer 2
        seconds.take(2).subscribe(l -> System.out.println("Observer 2: " + l));
        sleep(3000);
        //there should be no more Observers at this point
        //Observer 3
        seconds.subscribe(l -> System.out.println("Observer 3: " + l));
        sleep(3000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出是

Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 3: 0
Observer 3: 1
Observer 3: 2

也可以使用publish().refCount()的別名share():

Observable<Long> seconds =
        Observable.interval(1, TimeUnit.SECONDS).share();

Replaying and caching

Multicasting也允許快取多個Observers之間共享的值。重播和快取資料是一個multicasting活動,我們將會看到怎麼做才是安全和有效率的。

Replaying

replay()是儲存一定範圍內的從前的emissions的一個強有力的辦法,當來了新的Observer就重新發射他們。它會返回一個ConnectableObservable,既能multicast emissions,也能發射一定範圍內的從前的emissions。它快取的從前的emissions會在來了新的Observer的時候立刻發射,然後它會發射之後的emissions。
讓我們從不帶引數的replay()開始。它會為遲來的Observers重播所有的從前的emissions,然後發射新產生的。如果我們使用每秒發射的Observable.interval(),
呼叫replay()實現multicast,重播從前的整數emissions。因為replay()返回ConnectableObservable,我們使用autoConnect(),這樣會在第一個訂閱之後發射。
三秒以後,來了第二個Observer,看看發生了什麼:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Long> seconds =
                Observable.interval(1, TimeUnit.SECONDS)
                        .replay()
                        .autoConnect();
        //Observer 1
        seconds.subscribe(l -> System.out.println("Observer 1: " + l));
        sleep(3000);
        //Observer 2
        seconds.subscribe(l -> System.out.println("Observer 2: " + l));
        sleep(3000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出是

Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 2: 0
Observer 2: 1
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5

看到了吧?三秒以後,Observer 2來了,立刻收到錯過的前面三個emissions:0、1和2.然後,它收到的emissions和Observer 1的一樣。注意,這樣做消耗大量的記憶體,replay()快取了它接收到的全部emissions。如果源是無限的,或者只關心最後的一些emissions,可以指定bufferSize引數,限制重播的數量。如果我們呼叫replay(2),第二個Observer不會收到0,但是會收到1和2。
如果你甚至在沒有訂閱的時候,也想使用replay()儲存快取的值,就和autoConnect()結合,而不是refCount()。比如下面的例子,第二個Observer只能接收到最後的值:

import io.reactivex.Observable;

public class Launcher {
    public