附2:Reactor 3 之選擇合適的操作符——響應式Spring的道法術器
本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor Operators
本節的內容來自我翻譯的Reactor 3 參考文檔——如何選擇操作符。由於部分朋友打開github.io網速比較慢或上不去,貼出來方便大家查閱。
如果一個操作符是專屬於
Flux
或Mono
的,那麽會給它註明前綴。
公共的操作符沒有前綴。如果一個具體的用例涉及多個操作符的組合,這裏以方法調用的方式展現,
會以一個點(.)開頭,並將參數置於圓括號內,比如:.methodCall(parameter)
。
1)創建一個新序列,它...
- 發出一個
T
,我已經有了:just
- ...基於一個
Optional<T>
Mono#justOrEmpty(Optional<T>)
- ...基於一個可能為
null
的 T:Mono#justOrEmpty(T)
- ...基於一個
- 發出一個
T
,且還是由just
方法返回- ...但是“懶”創建的:使用
Mono#fromSupplier
或用defer
包裝just
- ...但是“懶”創建的:使用
- 發出許多
T
,這些元素我可以明確列舉出來:Flux#just(T...)
- 基於叠代數據結構:
- 一個數組:
Flux#fromArray
- 一個集合或 iterable:
Flux#fromIterable
- 一個 Integer 的 range:
Flux#range
- 一個
Stream
Flux#fromStream(Supplier<Stream>)
- 一個數組:
- 基於一個參數值給出的源:
- 一個
Supplier<T>
:Mono#fromSupplier
- 一個任務:
Mono#fromCallable
,Mono#fromRunnable
- 一個
CompletableFuture<T>
:Mono#fromFuture
- 一個
- 直接完成:
empty
- 立即生成錯誤:
error
- ...但是“懶”的方式生成
Throwable
:error(Supplier<Throwable>)
- ...但是“懶”的方式生成
- 什麽都不做:
never
- 訂閱時才決定:
defer
- 依賴一個可回收的資源:
using
- 可編程地生成事件(可以使用狀態):
- 同步且逐個的:
Flux#generate
- 異步(也可同步)的,每次盡可能多發出元素:
Flux#create
(Mono#create
也是異步的,只不過只能發一個)
- 同步且逐個的:
2)對序列進行轉化
-
我想轉化一個序列:
- 1對1地轉化(比如字符串轉化為它的長度):
map
- ...類型轉化:
cast
- ...為了獲得每個元素的序號:
Flux#index
- 1對n地轉化(如字符串轉化為一串字符):
flatMap
+ 使用一個工廠方法 - 1對n地轉化可自定義轉化方法和/或狀態:
handle
- 對每一個元素執行一個異步操作(如對 url 執行 http 請求):
flatMap
+ 一個異步的返回類型為Publisher
的方法 - ...忽略一些數據:在 flatMap lambda 中根據條件返回一個
Mono.empty()
- ...保留原來的序列順序:
Flux#flatMapSequential
(對每個元素的異步任務會立即執行,但會將結果按照原序列順序排序) - ...當 Mono 元素的異步任務會返回多個元素的序列時:
Mono#flatMapMany
- 1對1地轉化(比如字符串轉化為它的長度):
-
我想添加一些數據元素到一個現有的序列:
- 在開頭添加:
Flux#startWith(T...)
- 在最後添加:
Flux#concatWith(T...)
- 在開頭添加:
-
我想將
Flux
轉化為集合(一下都是針對Flux
的)- 轉化為 List:
collectList
,collectSortedList
- 轉化為 Map:
collectMap
,collectMultiMap
- 轉化為自定義集合:
collect
- 計數:
count
- reduce 算法(將上個元素的reduce結果與當前元素值作為輸入執行reduce方法,如sum)
reduce
- ...將每次 reduce 的結果立即發出:
scan
- 轉化為一個 boolean 值:
- 對所有元素判斷都為true:
all
- 對至少一個元素判斷為true:
any
- 判斷序列是否有元素(不為空):
hasElements
- 判斷序列中是否有匹配的元素:
hasElement
- 轉化為 List:
-
我想合並 publishers...
- 按序連接:
Flux#concat
或.concatWith(other)
- ...即使有錯誤,也會等所有的 publishers 連接完成:
Flux#concatDelayError
- ...按訂閱順序連接(這裏的合並仍然可以理解成序列的連接):
Flux#mergeSequential
- 按元素發出的順序合並(無論哪個序列的,元素先到先合並):
Flux#merge
/.mergeWith(other)
- ...元素類型會發生變化:
Flux#zip
/Flux#zipWith
- 將元素組合:
- 2個 Monos 組成1個
Tuple2
:Mono#zipWith
- n個 Monos 的元素都發出來後組成一個 Tuple:
Mono#zip
- 在終止信號出現時“采取行動”:
- 在 Mono 終止時轉換為一個
Mono<Void>
:Mono#and
- 當 n 個 Mono 都終止時返回
Mono<Void>
:Mono#when
- 返回一個存放組合數據的類型,對於被合並的多個序列:
- 每個序列都發出一個元素時:
Flux#zip
- 任何一個序列發出元素時:
Flux#combineLatest
- 每個序列都發出一個元素時:
- 只取各個序列的第一個元素:
Flux#first
,Mono#first
,mono.or<br/>(otherMono).or(thirdMono)
,`flux.or(otherFlux).or(thirdFlux) - 由一個序列觸發(類似於
flatMap
,不過“喜新厭舊”):switchMap
- 由每個新序列開始時觸發(也是“喜新厭舊”風格):
switchOnNext
- 按序連接:
-
我想重復一個序列:
repeat
- ...但是以一定的間隔重復:
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
- ...但是以一定的間隔重復:
-
我有一個空序列,但是...
- 我想要一個缺省值來代替:
defaultIfEmpty
- 我想要一個缺省的序列來代替:
switchIfEmpty
- 我想要一個缺省值來代替:
-
我有一個序列,但是我對序列的元素值不感興趣:
ignoreElements
- ...並且我希望用
Mono
來表示序列已經結束:then
- ...並且我想在序列結束後等待另一個任務完成:
thenEmpty
- ...並且我想在序列結束之後返回一個
Mono
:Mono#then(mono)
- ...並且我想在序列結束之後返回一個值:
Mono#thenReturn(T)
- ...並且我想在序列結束之後返回一個
Flux
:thenMany
- ...並且我希望用
-
我有一個 Mono 但我想延遲完成...
- ...當有1個或N個其他 publishers 都發出(或結束)時才完成:
Mono#delayUntilOther
- ...使用一個函數式來定義如何獲取“其他 publisher”:
Mono#delayUntil(Function)
- ...當有1個或N個其他 publishers 都發出(或結束)時才完成:
- 我想基於一個遞歸的生成序列的規則擴展每一個元素,然後合並為一個序列發出:
- ...廣度優先:
expand(Function)
- ...深度優先:
expandDeep(Function)
- ...廣度優先:
3)“窺視”(只讀)序列
-
再不對序列造成改變的情況下,我想:
- 得到通知或執行一些操作:
- 發出元素:
doOnNext
- 序列完成:
Flux#doOnComplete
,Mono#doOnSuccess
- 因錯誤終止:
doOnError
- 取消:
doOnCancel
- 訂閱時:
doOnSubscribe
- 請求時:
doOnRequest
- 完成或錯誤終止:
doOnTerminate
(Mono的方法可能包含有結果)- 但是在終止信號向下遊傳遞 之後 :
doAfterTerminate
- 但是在終止信號向下遊傳遞 之後 :
- 所有類型的信號(
Signal
):Flux#doOnEach
- 所有結束的情況(完成complete、錯誤error、取消cancel):
doFinally
- 記錄日誌:
log
- 我想知道所有的事件:
- 每一個事件都體現為一個
single
對象: - 執行 callback:
doOnEach
- 每個元素轉化為
single
對象:materialize
- ...在轉化回元素:
dematerialize
- ...在轉化回元素:
- 轉化為一行日誌:
log
- 每一個事件都體現為一個
4)過濾序列
-
我想過濾一個序列
- 基於給定的判斷條件:
filter
- ...異步地進行判斷:
filterWhen
- 僅限於指定類型的對象:
ofType
- 忽略所有元素:
ignoreElements
- 去重:
- 對於整個序列:
Flux#distinct
- 去掉連續重復的元素:
Flux#distinctUntilChanged
- 基於給定的判斷條件:
-
我只想要一部分序列:
- 只要 N 個元素:
- 從序列的第一個元素開始算:
Flux#take(long)
- ...取一段時間內發出的元素:
Flux#take(Duration)
- ...只取第一個元素放到
Mono
中返回:Flux#next()
- ...使用
request(N)
而不是取消:Flux#limitRequest(long)
- ...取一段時間內發出的元素:
- 從序列的最後一個元素倒數:
Flux#takeLast
- 直到滿足某個條件(包含):
Flux#takeUntil
(基於判斷條件),Flux#takeUntilOther
(基於對 publisher 的比較) - 直到滿足某個條件(不包含):
Flux#takeWhile
- 最多只取 1 個元素:
- 給定序號:
Flux#elementAt
- 最後一個:
.takeLast(1)
- ...如果為序列空則發出錯誤信號:
Flux#last()
- ...如果序列為空則返回默認值:
Flux#last(T)
- ...如果為序列空則發出錯誤信號:
- 跳過一些元素:
- 從序列的第一個元素開始跳過:
Flux#skip(long)
- ...跳過一段時間內發出的元素:
Flux#skip(Duration)
- ...跳過一段時間內發出的元素:
- 跳過最後的 n 個元素:
Flux#skipLast
- 直到滿足某個條件(包含):
Flux#skipUntil
(基於判斷條件),Flux#skipUntilOther
(基於對 publisher 的比較) - 直到滿足某個條件(不包含):
Flux#skipWhile
- 采樣:
- 給定采樣周期:
Flux#sample(Duration)
- 取采樣周期裏的第一個元素而不是最後一個:
sampleFirst
- 取采樣周期裏的第一個元素而不是最後一個:
- 基於另一個 publisher:
Flux#sample(Publisher)
- 基於 publisher“超時”:
Flux#sampleTimeout
(每一個元素會觸發一個 publisher,如果這個 publisher 不被下一個元素觸發的 publisher 覆蓋就發出這個元素)
- 我只想要一個元素(如果多於一個就返回錯誤)...
- 如果序列為空,發出錯誤信號:
Flux#single()
- 如果序列為空,發出一個缺省值:
Flux#single(T)
- 如果序列為空就返回一個空序列:
Flux#singleOrEmpty
- 如果序列為空,發出錯誤信號:
5)錯誤處理
-
我想創建一個錯誤序列:
error
...- ...替換一個完成的
Flux
:.concat(Flux.error(e))
- ...替換一個完成的
Mono
:.then(Mono.error(e))
- ...如果元素超時未發出:
timeout
- ...“懶”創建:
error(Supplier<Throwable>)
- ...替換一個完成的
-
我想要類似 try/catch 的表達方式:
- 拋出異常:
error
- 捕獲異常:
- 然後返回缺省值:
onErrorReturn
- 然後返回一個
Flux
或Mono
:onErrorResume
- 包裝異常後再拋出:
.onErrorMap(t -> new RuntimeException(t))
- finally 代碼塊:
doFinally
- Java 7 之後的 try-with-resources 寫法:
using
工廠方法
- 拋出異常:
-
我想從錯誤中恢復...
- 返回一個缺省的:
- 的值:
onErrorReturn
Publisher
:Flux#onErrorResume
和Mono#onErrorResume
- 重試:
retry
- ...由一個用於伴隨 Flux 觸發:
retryWhen
- 我想處理回壓錯誤(向上遊發出“MAX”的 request,如果下遊的 request 比較少,則應用策略)...
- 拋出
IllegalStateException
:Flux#onBackpressureError
- 丟棄策略:
Flux#onBackpressureDrop
- ...但是不丟棄最後一個元素:
Flux#onBackpressureLatest
- 緩存策略(有限或無限):
Flux#onBackpressureBuffer
- ...當有限的緩存空間用滿則應用給定策略:
Flux#onBackpressureBuffer
帶有策略BufferOverflowStrategy
- 拋出
6) 基於時間的操作
-
我想將元素轉換為帶有時間信息的
Tuple2<Long, T>
...- 從訂閱時開始:
elapsed
- 記錄時間戳:
timestamp
- 從訂閱時開始:
-
如果元素間延遲過長則中止序列:
timeout
-
以固定的周期發出元素:
Flux#interval
-
在一個給定的延遲後發出
0
:staticMono.delay
. - 我想引入延遲:
- 對每一個元素:
Mono#delayElement
,Flux#delayElements
- 延遲訂閱:
delaySubscription
- 對每一個元素:
7)拆分 Flux
-
我想將一個
Flux<T>
拆分為一個Flux<Flux<T>>
:- 以個數為界:
window(int)
- ...會出現重疊或丟棄的情況:
window(int, int)
- 以時間為界:
window(Duration)
- ...會出現重疊或丟棄的情況:
window(Duration, Duration)
- 以個數或時間為界:
windowTimeout(int, Duration)
- 基於對元素的判斷條件:
windowUntil
- ...觸發判斷條件的元素會分到下一波(
cutBefore
變量):.windowUntil(predicate, true)
- ...滿足條件的元素在一波,直到不滿足條件的元素發出開始下一波:
windowWhile
(不滿足條件的元素會被丟棄) - 通過另一個 Publisher 的每一個 onNext 信號來拆分序列:
window(Publisher)
,windowWhen
- 以個數為界:
-
我想將一個
Flux<T>
的元素拆分到集合...- 拆分為一個一個的
List
: - 以個數為界:
buffer(int)
- ...會出現重疊或丟棄的情況:
buffer(int, int)
- ...會出現重疊或丟棄的情況:
- 以時間為界:
buffer(Duration)
- ...會出現重疊或丟棄的情況:
buffer(Duration, Duration)
- ...會出現重疊或丟棄的情況:
- 以個數或時間為界:
bufferTimeout(int, Duration)
- 基於對元素的判斷條件:
bufferUntil(Predicate)
- ...觸發判斷條件的元素會分到下一個buffer:
.bufferUntil(predicate, true)
- ...滿足條件的元素在一個buffer,直到不滿足條件的元素發出開始下一buffer:
bufferWhile(Predicate)
- ...觸發判斷條件的元素會分到下一個buffer:
- 通過另一個 Publisher 的每一個 onNext 信號來拆分序列:
buffer(Publisher)
,bufferWhen
- 拆分到指定類型的 "collection":
buffer(int, Supplier<C>)
- 拆分為一個一個的
- 我想將
Flux<T>
中具有共同特征的元素分組到子 Flux:groupBy(Function<T,K>)
(註意返回值是Flux<GroupedFlux<K, T>>
,每一個GroupedFlux
具有相同的 key 值K
,可以通過key()
方法獲取)。
8)回到同步的世界
-
我有一個
Flux<T>
,我想:- 在拿到第一個元素前阻塞:
Flux#blockFirst
- ...並給出超時時限:
Flux#blockFirst(Duration)
- 在拿到最後一個元素前阻塞(如果序列為空則返回 null):
Flux#blockLast
- ...並給出超時時限:
Flux#blockLast(Duration)
- 同步地轉換為
Iterable<T>
:Flux#toIterable
- 同步地轉換為 Java 8
Stream<T>
:Flux#toStream
- 在拿到第一個元素前阻塞:
- 我有一個
Mono<T>
,我想:- 在拿到元素前阻塞:
Mono#block
- ...並給出超時時限:
Mono#block(Duration)
- 轉換為
CompletableFuture<T>
:Mono#toFuture
- 在拿到元素前阻塞:
附2:Reactor 3 之選擇合適的操作符——響應式Spring的道法術器