1. 程式人生 > >RxJava學習 - 4. Other Observable sources

RxJava學習 - 4. Other Observable sources

RxJava學習 - 4. Other Observable sources

Observable.range()

可以使用Observable.range()發射一定範圍內的整數。從start值開始傳送發射,每次加1,直到一定的count。這些數通過onNext()事件傳遞,跟著一個onComplete()事件:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1,10)
            .subscribe(s -> System.out.println("RECEIVED: " + s));
    }
}

可以使用Observable.rangeLong()發射比較大的數。

Observable.interval()

Observable.interval()生成一個基於時間的Observable。每經過一個時間間隔它將發射一個連續的long emission(從0開始)。
下面的程式碼,每秒鐘發射一個數:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[]args) {
        Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(s -> System.out.println(s + " Mississippi"));
        sleep(5000);
    }
    
    public
static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }

Observable.interval()在一個定時器上執行,所以,需要一個單獨的執行緒做排程。上面的程式,main()方法生成一個Observable,但是不等它完成。它在一個新的執行緒上發射。要讓main()不退出,我們使用sleep()方法讓程式存活了5秒鐘。在程式退出之前,我們的Observable有5秒鐘時間可以發射。當你增加一個生產程式的時候,你一般不會碰到這樣的問題,web服務、Android程式或者JavaFX將保持程式的存活。
Observable.interval()返回的是一個hot還是cold Observable?因為它是事件驅動的(頁數無限的),你可能會說它是hot。
但是,增加一個Observer,等5秒鐘,再加一個Observer。發生了什麼?請看:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Long> seconds = Observable.interval(1, TimeUnit.SECONDS);
        //Observer 1
        seconds.subscribe(l -> System.out.println("Observer 1: " + l));
        //sleep 5 seconds
        sleep(5000);
        //Observer 2
        seconds.subscribe(l -> System.out.println("Observer 2: " + l));
        //sleep 5 seconds
        sleep(5000);        
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出是這樣的:

Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 0
Observer 1: 6
Observer 2: 1
Observer 1: 7
Observer 2: 2
Observer 1: 8
Observer 2: 3
Observer 1: 9
Observer 2: 4

5秒鐘過去了,Observer 2來了。注意,它有自己的計時器,從0開始。這兩個observers實際上有他們自己的emissions,每個都是從0開始。
所以,這個Observable實際上是cold。想讓所有的observers使用同一個計時器,可以使用ConnectableObservable:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        ConnectableObservable<Long> seconds =
                Observable.interval(1, TimeUnit.SECONDS).publish();
        //observer 1
        seconds.subscribe(l -> System.out.println("Observer 1: " + l));
        seconds.connect();
        //sleep 5 seconds
        sleep(5000);
        //observer 2
        seconds.subscribe(l -> System.out.println("Observer 2: " + l));
        //sleep 5 seconds
        sleep(5000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出如下:

Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
Observer 1: 6
Observer 2: 6
Observer 1: 7
Observer 2: 7
Observer 1: 8
Observer 2: 8
Observer 1: 9
Observer 2: 9

Observable.future()

RxJava的Observables比Futures更健壯和富有表現力,但是,如果你現在使用的庫仍然生產Futures,可以很容易地把他們轉換成Observables:

import io.reactivex.Observable;
import java.util.concurrent.Future;

public class Launcher {
    public static void main(String[] args) {
    Future<String> futureValue = ...;
    Observable.fromFuture(futureValue)
        .map(String::length)
        .subscribe(System.out::println);
    }
}

Observable.empty()

看上去好像沒什麼用,但是,Observable可以什麼都不發射,然後呼叫onComplete():

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
    Observable<String> empty = Observable.empty();
    
    empty.subscribe(System.out::println,
        Throwable::printStackTrace,
        () -> System.out.println("Done!"));
    }
}

空的observables通常代表空的資料集。有時候,有些運算也能返回空,比如filter()。有時候,你故意使用Observable.empty()。
empty Observable實質上就是RxJava的null。Empty Observables比null優雅,程式會繼續,不會拋NullPointerExceptions。

Observable.never()

Observable.empty()的近親是Observable.never()。唯一的不同是,它不呼叫onComplete(),observers一直在等emissions,卻永遠等不來:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable<String> empty = Observable.never();
        empty.subscribe(System.out::println,
                Throwable::printStackTrace,
                () -> System.out.println("Done!"));
        sleep(5000);
    }
}

這個Observable主要用來測試,生產中不經常使用。我們使用sleep(),就像Observable.interval()那樣,因為main執行緒產生Observable以後不會等待。
這個例子裡,我們等了5秒鐘,就是為了證明什麼都沒發射,然後,程式退出。

Observable.error()

你可以增加一個Observable,立刻呼叫onError():

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.error(new Exception("Crash and burn!"))
                .subscribe(i -> System.out.println("RECEIVED: " + i),
                        Throwable::printStackTrace,
                        () -> System.out.println("Done!"));
    }
}

你也可以使用lambda提供異常,這樣給每個Observer提供了單獨的異常例項:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.error(() -> new Exception("Crash and burn!"))
                .subscribe(i -> System.out.println("RECEIVED: " + i),
                        Throwable::printStackTrace,
                        () -> System.out.println("Done!"));
    }
}

Observable.defer()

Observable.defer()是一個強有力的工廠,為每個Observer增加單獨的狀態。當使用一定的Observable工廠的時候,如果你的源是有狀態的,你可能想給每個Observer增加單獨的狀態。你的源Observable可能不知道它的引數已經修改了,傳送過時的emissions。這裡有個簡單的例子,你有一個Observable.range(),有兩個靜態的int屬性:start和count。
如果你訂閱這個Observable,修改count,然後再次訂閱,你會發現,第二個Observer沒看到這個變化:

import io.reactivex.Observable;

public class Launcher {
    private static int start = 1;
    private static int count = 5;
    
    public static void main(String[] args) {
        Observable<Integer> source = Observable.range(start, count);
        source.subscribe(i -> System.out.println("Observer 1: " + i));
        //modify count
        count = 10;
        source.subscribe(i -> System.out.println("Observer 2: " + i));
    }
}

輸出是這樣的:

Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 2: 2
Observer 2: 3
Observer 2: 4
Observer 2: 5

想解決這個問題,你能玩兒每個訂閱者增加一個fresh Observable。使用Observable.defer()就可以實現,它接受一個lambda,
說明怎麼為每個訂閱者增加一個Observable。因為這樣做每次增加一個新的Observable,就可以反映引數的變化:

import io.reactivex.Observable;

public class Launcher {
    private static int start = 1;
    private static int count = 5;
    
    public static void main(String[] args) {
        Observable<Integer> source = Observable.defer(() ->
                Observable.range(start,count));
        source.subscribe(i -> System.out.println("Observer 1: " + i));
        //modify count
        count = 10;
        source.subscribe(i -> System.out.println("Observer 2: " + i));
    }
}

輸出是這樣的:

Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 2: 2
Observer 2: 3
Observer 2: 4
Observer 2: 5
Observer 2: 6
Observer 2: 7
Observer 2: 8
Observer 2: 9
Observer 2: 10

如果你的Observable源是原生實現,而且超過一個Observer(比如,重新使用一個Iterator,該迭代器只迭代一次),也可以使用Observable.defer()解決。

Observable.fromCallable()

如果你需要執行一個計算或者動作,然後發射它,你可以使用Observable.just()(或者Single.just()、Maybe.just())。
但是,有時候,我們想晚一點再做,或者是deferred的方式。而且,如果處理過程發生錯誤,我們希望以onError()的方式發到Observable chain,而不是以Java傳統的方式拋異常。例如,如果你的Observable.just()想用1除以0,會拋異常,但是不發給Observer:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
    Observable.just(1 / 0)
            .subscribe(i -> System.out.println("RECEIVED: " + i),
                    e -> System.out.println("Error Captured: " + e));
    }
}

輸出是:

java.lang.ArithmeticException: / by zero

	at Launcher.main(Launcher.java:6)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

如果希望響應式地處理錯誤,把錯誤發給Observer處理。上面的例子,可以使用Observable.fromCallable(),它接受一個lambda Supplier,發生錯誤時會發射給Observer:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.fromCallable(() -> 1 / 0)
            .subscribe(i -> System.out.println("Received: " + i),
                e -> System.out.println("Error Captured: " + e));
    }
}

輸出是:

Error Captured: java.lang.ArithmeticException: / by zero