1. 程式人生 > >Reactor 3 學習筆記(2)

Reactor 3 學習筆記(2)

上篇繼續學習各種方法:

4.9、reduce/reduceWith

    @Test
    public void reduceTest() {
        Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println);
        Flux.range(1, 10).reduceWith(() -> 10, (x, y) -> x + y).subscribe(System.out::println);
    }

輸出:

55
65

上面的程式碼,reduce相當於把1到10累加求和,reduceWith則是先指定一個起始值,然後在這個起始值基礎上再累加。(tips: 除了累加,還可以做階乘) 

reduce示意圖:

reduceWith示意圖:

4.10、merge/mergeSequential/contact

    @Test
    public void mergeTest() {
        Flux.merge(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
                Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
                .toStream().forEach(System.out::println);

        System.out.println("-----------------------------");

        Flux.mergeSequential(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
                Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
                .toStream().forEach(System.out::println);
    }  

merge就是將把多個Flux"按元素實際產生的順序"合併,而mergeSequential則是按多個Flux"被訂閱的順序"來合併,以上面的程式碼來說,二個Flux,從時間上看,元素是交替產生的,所以merge的輸出結果,是混在一起的,而mergeSequential則是能分出Flux整體的先後順序。

0
0
1
1
2
2
3
3
4
4
-----------------------------
0
1
2
3
4
0
1
2
3
4

merge示意圖:

mergeSequential示意圖:

 

與mergeSequential類似的,還有一個contact方法,示意圖如下:

4.11、combineLatest

    @Test
    public void combineLatestTest() {
        Flux.combineLatest(
                Arrays::toString,
                Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(3),
                Flux.just("A", "B"))
                .toStream().forEach(System.out::println);

        System.out.println("------------------");

        Flux.combineLatest(
                Arrays::toString,
                Flux.just(0, 1),
                Flux.just("A", "B"))
                .toStream().forEach(System.out::println);

        System.out.println("------------------");

        Flux.combineLatest(
                Arrays::toString,
                Flux.interval(Duration.of(1000, ChronoUnit.MILLIS)).take(2),
                Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(2))
                .toStream().forEach(System.out::println);
    }

該操作會將所有流中的最新產生的元素合併成一個新的元素,作為返回結果流中的元素。只要其中任何一個流中產生了新的元素,合併操作就會被執行一次。

分析一下第一段輸出:

第1個Flux用了延時生成,第1個數字0,10秒後才產生,這時第2個Flux中的A,B早就生成完畢,所以此時二個Flux中最新生在的元素,就是[0,B],類似的,10秒後,第2個數字1依次產生,再執行1次合併,生成[1,B]...

輸出:

[0, B]
[1, B]
[2, B]
------------------
[1, A]
[1, B]
------------------
[1, 0]
[1, 1]

示意圖如下:

4.12、first

    @Test
    public void firstTest() {
        Flux.first(Flux.fromArray(new String[]{"A", "B"}),
                Flux.just(1, 2, 3))
                .subscribe(System.out::println);
    }

這個很簡單理解,多個Flux,只取第1個Flux的元素。輸出如下:

A
B

示意圖:

 

4.13、 map

    @Test
    public void mapTest() {
        Flux.just('A', 'B', 'C').map(a -> (int) (a)).subscribe(System.out::println);
    }

 map相當於把一種型別的元素序列,轉換成另一種型別,輸出如下:

65
66
67

示意圖:

 

五、訊息處理

寫程式碼時,難免會遇到各種異常或錯誤,所謂訊息處理,就是指如何處理這些異常。

5.1 訂閱錯誤訊息

    @Test
    public void subscribeTest1() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .subscribe(System.out::println, System.err::println);
    }

注意:這裡subscribe第2個引數,指定了System.err::println ,即錯誤訊息,輸出到異常控制檯上。

輸出效果:

示意圖:

5.2 onErrorReturn

即:遇到錯誤時,用其它指定值返回

    @Test
    public void subscribeTest2() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .onErrorReturn("X")
                .subscribe(System.out::println, System.err::println);
    }

輸出:

A
B
C
X

示意圖:

5.3 onErrorResume

跟onErrorReturn有點接近,但更靈活,可以根據異常的型別,有選擇性的處理返回值。

    @Test
    public void subscribeTest3() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .onErrorResume(e -> {
                    if (e instanceof IndexOutOfBoundsException) {
                        return Flux.just("X", "Y", "Z");
                    } else {
                        return Mono.empty();
                    }
                })
                .subscribe(System.out::println, System.err::println);
    }

輸出:

A
B
C
X
Y
Z

示意圖:

 

5.4 retry

即:遇到異常,就重試。

    @Test
    public void subscribeTest4() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .retry(1)
                .subscribe(System.out::println, System.err::println);
    }

輸出:

示意圖:

六、(執行緒)排程器

reactor中到處充滿了非同步呼叫,內部必然有一堆執行緒排程,Schedulers提供瞭如下幾種呼叫策略:

6.1 Schedulers.immediate() - 使用當前執行緒
6.2 Schedulers.elastic() - 使用執行緒池
6.3 Schedulers.newElastic("test1") - 使用(新)執行緒池(可以指定名稱,更方便除錯)
6.4 Schedulers.single() - 單個執行緒
6.5 Schedulers.newSingle("test2") - (新)單個執行緒(可以指定名稱,更方便除錯)
6.6 Schedulers.parallel() - 使用並行處理的執行緒池(取決於CPU核數)
6.7 Schedulers.newParallel("test3")  - 使用並行處理的執行緒池(取決於CPU核數,可以指定名稱,方便除錯)
6.8 Schedulers.fromExecutorService(Executors.newScheduledThreadPool(5)) - 使用Executor(這個最靈活)

示例程式碼:

    @Test
    public void schedulesTest() {
        Flux.fromArray(new String[]{"A", "B", "C", "D"})
                .publishOn(Schedulers.newSingle("TEST-SINGLE", true))
                .map(x -> String.format("[%s]: %s", Thread.currentThread().getName(), x))
                .toStream()
                .forEach(System.out::println);
    }

輸出: 

[TEST-SINGLE-1]: A
[TEST-SINGLE-1]: B
[TEST-SINGLE-1]: C
[TEST-SINGLE-1]: D

七、測試&除錯

非同步處理,通常是比較難測試的,reactor提供了StepVerifier工具來進行測試。

7.1 常規單元測試

    @Test
    public void stepTest() {
        StepVerifier.create(Flux.just(1, 2)
                .concatWith(Mono.error(new IndexOutOfBoundsException("test")))
                .onErrorReturn(3))
                .expectNext(1)
                .expectNext(2)
                .expectNext(3)
                .verifyComplete();
    }

上面的示例,Flux先生成1,2這兩個元素,然後拋了個錯誤,但馬上用onErrorReturn處理了異常,所以最終應該是期待1,2,3,complete這樣的序列。 

7.2 模擬時間流逝

Flux.interval這類延時操作,如果延時較大,比如幾個小時之類的,要真實模擬的話,效率很低,StepVerifier提供了withVirtualTime方法,來模擬加快時間的流逝(是不是很體貼^_^)

    @Test
    public void stepTest2() {
        StepVerifier.withVirtualTime(() -> Flux.interval(Duration.of(10, ChronoUnit.MINUTES),
                Duration.of(5, ChronoUnit.SECONDS))
                .take(2))
                .expectSubscription()
                .expectNoEvent(Duration.of(10, ChronoUnit.MINUTES))
                .thenAwait(Duration.of(5, ChronoUnit.SECONDS))
                .expectNext(0L)
                .thenAwait(Duration.of(5, ChronoUnit.SECONDS))
                .expectNext(1L)
                .verifyComplete();
    }

上面這個Flux先停10分鐘,然後每隔5秒生成一個數字,然後取前2個數字。程式碼先呼叫

expectSubscription 期待流被訂閱,然後
expectNoEvent(Duration.of(10, ChronoUnit.MINUTES)) 期望10分鐘內,無任何事件(即:驗證Flux先暫停10分鐘),然後
thenAwait(Duration.of(5, ChronoUnit.SECONDS)) 等5秒鐘,這時已經生成了數字0
expectNext(0L) 期待0L
... 後面的大家自行理解吧。

7.3 記錄日誌 

    @Test
    public void publisherTest() {
        Flux.just(1, 0)
                .map(c -> 1 / c)
                .log("MY-TEST")
                .subscribe(System.out::println);
    }

輸出:

示意圖:

7.4 checkpoint檢查點

可以在一些懷疑的地方,加上checkpoint檢查,參考下面的程式碼:

    @Test
    public void publisherTest() {
        Flux.just(1, 0)
                .map(c -> 1 / c)
                .checkpoint("AAA")
                .subscribe(System.out::println);
    }

輸出:

點選檢視原圖