1. 程式人生 > >實戰SpringCloud響應式微服務系列教程(第六章)

實戰SpringCloud響應式微服務系列教程(第六章)

本章節介紹:Flux和Mono操作符

和其他主流的響應式程式設計一樣,Reactor框架的設計目標也是為了簡化相應式流的使用方法。為此Reactor框架提供了大量操作符用於操作Flux和Mono物件。

本節不打算全面詳細介紹,我們的思路是將這些操作符分類,然後對每一類中具有代表性的操作符展開討論。

對於其他沒有介紹到的操作符可參考Reactor框架的官方文件。

在本節中我們把Flux和Mono操作符分為以下7大類。

  • 轉換 (Transforming)操作符負責對序列中的元素進行轉變。
  • 過濾 (Filtering)操作符負責將不需要的資料從序列中進行過濾。
  • 組合 (Combining) 操作符負責將序列中的元素進行合併和連線。
  • 條件 (Conditional) 操作符負責根據特定條件對序列中的元素進行處理。
  • 數學 (Mathematical) 操作符負責對序列中的元素執行各種數學操作。
  • Obserable工具(Utility) 操作符提供的是一些針對流失處理的輔助性工具。
  • 日誌和除錯(Log&Debug) 操作符提供了針對執行時日誌以及如何對序列進行程式碼除錯的工具類。

1. 轉換操作符

Reactor框架中常用的轉換操作符包括buffer、map、flatMap和window。

(1)buffer

buffer操作符把當前流中的元素收集到集合中,並把集合物件作為流中的新元素。使用buffer操作符在進行元素收集時可以指定集合物件所包含的元素的最大數量。

以下程式碼先使用range()方法建立了1~50這50個元素,然後演示了使用buffer從包含這50個元素的流中構建集合,每個集合包含10個元素,一共構建5個集合。

Flux.range(1,50).buffer(10).subscribe(System.out::println);

 

上面程式碼執行結果如下:

[1,2,3,4,5,6,7,8,9,10]
[11,12,13,14,15,16,17,18,19,20]
[21,22,23,24,25,26,27,28,29,30]
[31,32,33,34,35,36,37,38,39,40]
[41,42,43,44,45,46,47,48,49,50]

 

buffer操作符的另一種用法是指定收集的時間間隔,由此演變出了bufferTimeout()方法。bufferTimeout()方法可以指定時間間隔為一個Duration物件或者毫秒數,即使用bufferTimeoutMillis()或者bufferMillis()這兩個方法。

除了指定元素數量和時間間隔,還可以通過bufferUnitl和bufferWhile操作符進行資料收集。bufferUnitl會一直收集,知道斷言(Predicate)條件返回true。使得斷言條件返回true的那個元素可以選擇新增到當前集合或者下一個集合當中。

而bufferWhile只有當斷言條件返回true時才會收集,一旦值為false,會立即開始下一次收集。

程式碼如下:

Flux.range(1,10).bufferUnitl(i -> i%2 ==0).subscribe(System.out::println);
System.out.println("-----------------------------------");
Flux.range(1,10).bufferWhile(i -> i%2 ==0).subscribe(System.out::println);

 

以上程式碼執行結果如下:

[1,2]
[3,4]
[5.6]
[7,8]
[9,10]
--------------------------
[2]
[4]
[6]
[8]
[10]

 

(2)map

map操作符相當於一種對映操作,他對流中每一個元素對映一個函式,從而達到一個變幻效果。

Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i * 2;
                })
                .subscribe(e -> log.info("get:{}",e));

 

以上程式碼執行結果:

10:53:57.058 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
10:53:57.062 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
10:53:57.063 [main] INFO reactor.Flux.Array.1 - | onNext(1)
10:53:58.067 [main] INFO com.example.demo.FluxTest - get:2
10:53:58.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
10:53:59.071 [main] INFO com.example.demo.FluxTest - get:4
10:53:59.071 [main] INFO reactor.Flux.Array.1 - | onNext(3)
10:54:00.076 [main] INFO com.example.demo.FluxTest - get:6
10:54:00.076 [main] INFO reactor.Flux.Array.1 - | onNext(4)
10:54:01.080 [main] INFO com.example.demo.FluxTest - get:8
10:54:01.081 [main] INFO reactor.Flux.Array.1 - | onComplete()

 

(3) flatMap

與map不同,操作符把流中的每一個元素轉換成一個流,再把轉換之後得到的所有流中的元素進行合併。

flatMap操作符非常實用,程式碼示例如下:

Flux.just(1,5).flatMap(x -> Mono.just(x*x)).subscribe(System.out::println);

 

以上程式碼中我們對1和5這兩個元素進行了flatMap操作,操作的結果是返回他們的平方值進行合併,執行結果如下:

1
25

 

在系統開發過程中我們經常會碰到對資料庫中的資料進行逐一處理的場景,這時候可以充分利用flatMap操作符的特性開展相關操作。以下程式碼展示瞭如何使用flatMap對資料庫中獲取的資料進行逐一刪除的方法。

Mono<void> deleteFiles = 
fileRepository.findByname(flieName).flatMap(fileRepository::delete);

 

(4)window

window操作符類似於buffer,所不同的是,window操作符是把當前流中的元素收集到另一個Flux序列中。因此它的返回值型別是Flux<flux>,而不是簡單的Flux。</flux

示例程式碼:

Flux.range(1,5).window(2).toIterable().forEach(w -> {
   w.subscribe(System.out::println);
   System.out.println("---------------------------")
})

 

以上程式碼執行結果如下。這裡生成了5個元素,然後通過window操作符把這個5個元素轉變成3個Flux物件,並通過forEach()工具把這些物件打印出來。

1
2
-----------------
3
4
-----------------
5

 

2.過濾操作符

Reactor中 常用過操作符包括filter、first、last、skip/skipLast、take/takeLast等。

(1)filter

filter操作符的含義與普通的過濾器類似,就是對流中包含的元素進行過濾,只是留下滿足指定條件的元素。

例如,我們想對1~10這10個元素進行過濾,只獲取能被2取餘的元素,可以使用如下程式碼。

Flux.range(1,10).filter(i -> i % 2 ==0).subscribe(System.out::println);

 

(2)first

first操作符的執行效果即為返回流中的第一個元素。

(3)last

last操作符與first類似,返回流中的最後一個元素。

(4)skip/skipLast

如果使用skip操作符,將會忽略資料流中的前n個元素。類似的如果使用skipLast將會忽略資料流中的後n個元素。

(5)take/takeLast

take系列操作符用來從當前流中提取元素。我們可以按照指定的數量來提取元素,對應的方法是take(long n);同時,也可以按照指定的時間間隔來提取元素,分別使用take(Duration time) 和takeMillis(long time)。類似的takeLast操作符用來從當前流中尾部提取元素。

take和takeLast操作符示例程式碼如下:

Flux.range(1,100).take(10).subscribe(System.out::println);
Flux.range(1,100).takeLast(10).subscribe(System.out::println);

 

3. 組合操作符

Reactor中常用的組合操作符有then/when、merge、starWith和zip

(1)then/when

then操作符的含義是等到上一個操作符完成時在做下一個。

when操作符的含義是等到多個操作儀器完成。

如下程式碼很好的展示了when操作符的實際應用場景。

public Mono<Void> updateFiles(Flux<FilePart> files){
        return files.flatMap(file ->{
            Mono<Void> copyFileToFileServer =...;
            Mono<Void> saFilePathToDataBase = ...;
            return Mono.when(copyFileToFileServer);
        });
    }

 

(2)starWith

starWith操作符的含義是在資料元素序列的開頭插入指定的元素項。

(3)merge

merge操作符用來把多個流合併成一個Flux序列,該操作符按照所有流中的元素實際生產順序合併。

merge操作符示例程式碼如下:

Flux.merge(Flux.intervalMillis(0,10).take(3),
   Flux.intervalMillis(5,10).take(3)).toStream()
   .forEach(System.out::println);

 

請注意這裡兩個Flux.intervalMillis()方法都是在限制10ms內生產一個新元素。

執行結果如下:

0
0
1
1
2
2

 

不同於merge,mergeSequeetial操作符則是按照所有流被訂閱的順序以流為單位進行合併。

請看如下程式碼:

Flux.mergeSequeetial(Flux.intervalMillis(0,10).take(3),
   Flux.intervalMillis(5,10).take(3)).toStream()
   .forEach(System.out::println);

 

我們僅僅只是將merge換成了mergeSequeetial。

執行結果如下:

0
1
2
0
1
2

 

(4)zipWith

zipWith把當前流中的元素與另外一個流中的元素按照一對一的方式進行合併。使用zipWith
操作符在合併時可以不做任何處理,如此得到一個元素型別為Tuple2的流,示例程式碼如下:

Flux.just("a","b").zipWith(Flux.just("c","b")).subscribe(System.out::println);

 

執行結果如下:

[a,c]
[b,d]

 

另外,我們還可以通過一個BiFunction函式對合並的元素進行處理,所得到的流的元素型別為該函式的返回值。

程式碼如下:

Flux.just("a","b").zipWith(Flux.just("c","b"),(s1,s2) -> String.format("%+%",s1,s2))
   .subscribe(System.out::println);

 

執行結果如下:

a+c
b+d

 

本章節完!歷史章節

實戰SpringCloud響應式微服務系列教程(第一章)

實戰SpringCloud響應式微服務系列教程(第二章)

實戰SpringCloud響應式微服務系列教程(第三章)

實戰SpringCloud響應式微服務系列教程(第四章)

實戰SpringCloud響應式微服務系列教程(第五