1. 程式人生 > >附2:Reactor 3 之選擇合適的操作符——響應式Spring的道法術器

附2:Reactor 3 之選擇合適的操作符——響應式Spring的道法術器

Spring WebFlux 響應式編程

本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor Operators

本節的內容來自我翻譯的Reactor 3 參考文檔——如何選擇操作符。由於部分朋友打開github.io網速比較慢或上不去,貼出來方便大家查閱。

如果一個操作符是專屬於 FluxMono 的,那麽會給它註明前綴。
公共的操作符沒有前綴。如果一個具體的用例涉及多個操作符的組合,這裏以方法調用的方式展現,
會以一個點(.)開頭,並將參數置於圓括號內,比如: .methodCall(parameter)

1)創建一個新序列,它...

  • 發出一個 T,我已經有了:just
    • ...基於一個 Optional<T>
      Mono#justOrEmpty(Optional<T>)
    • ...基於一個可能為 null 的 T:Mono#justOrEmpty(T)
  • 發出一個 T,且還是由 just 方法返回
    • ...但是“懶”創建的:使用 Mono#fromSupplier 或用 defer 包裝 just
  • 發出許多 T,這些元素我可以明確列舉出來:Flux#just(T...)
  • 基於叠代數據結構:
    • 一個數組:Flux#fromArray
    • 一個集合或 iterable:Flux#fromIterable
    • 一個 Integer 的 range:Flux#range
    • 一個 Stream
      提供給每一個訂閱:Flux#fromStream(Supplier<Stream>)
  • 基於一個參數值給出的源:
    • 一個 Supplier<T>Mono#fromSupplier
    • 一個任務:Mono#fromCallableMono#fromRunnable
    • 一個 CompletableFuture<T>Mono#fromFuture
  • 直接完成:empty
  • 立即生成錯誤:error
    • ...但是“懶”的方式生成 Throwableerror(Supplier<Throwable>)
  • 什麽都不做:never
  • 訂閱時才決定:defer
  • 依賴一個可回收的資源:using
  • 可編程地生成事件(可以使用狀態):
    • 同步且逐個的:Flux#generate
    • 異步(也可同步)的,每次盡可能多發出元素:Flux#create
      Mono#create 也是異步的,只不過只能發一個)

2)對序列進行轉化

  • 我想轉化一個序列:

    • 1對1地轉化(比如字符串轉化為它的長度):map
    • ...類型轉化:cast
    • ...為了獲得每個元素的序號:Flux#index
    • 1對n地轉化(如字符串轉化為一串字符):flatMap + 使用一個工廠方法
    • 1對n地轉化可自定義轉化方法和/或狀態:handle
    • 對每一個元素執行一個異步操作(如對 url 執行 http 請求):flatMap + 一個異步的返回類型為 Publisher 的方法
    • ...忽略一些數據:在 flatMap lambda 中根據條件返回一個 Mono.empty()
    • ...保留原來的序列順序:Flux#flatMapSequential(對每個元素的異步任務會立即執行,但會將結果按照原序列順序排序)
    • ...當 Mono 元素的異步任務會返回多個元素的序列時:Mono#flatMapMany
  • 我想添加一些數據元素到一個現有的序列:

    • 在開頭添加:Flux#startWith(T...)
    • 在最後添加:Flux#concatWith(T...)
  • 我想將 Flux 轉化為集合(一下都是針對 Flux 的)

    • 轉化為 List:collectListcollectSortedList
    • 轉化為 Map:collectMapcollectMultiMap
    • 轉化為自定義集合:collect
    • 計數:count
    • reduce 算法(將上個元素的reduce結果與當前元素值作為輸入執行reduce方法,如sum) reduce
    • ...將每次 reduce 的結果立即發出:scan
    • 轉化為一個 boolean 值:
    • 對所有元素判斷都為true:all
    • 對至少一個元素判斷為true:any
    • 判斷序列是否有元素(不為空):hasElements
    • 判斷序列中是否有匹配的元素:hasElement
  • 我想合並 publishers...

    • 按序連接:Flux#concat.concatWith(other)
    • ...即使有錯誤,也會等所有的 publishers 連接完成:Flux#concatDelayError
    • ...按訂閱順序連接(這裏的合並仍然可以理解成序列的連接):Flux#mergeSequential
    • 按元素發出的順序合並(無論哪個序列的,元素先到先合並):Flux#merge / .mergeWith(other)
    • ...元素類型會發生變化:Flux#zip / Flux#zipWith
    • 將元素組合:
    • 2個 Monos 組成1個 Tuple2Mono#zipWith
    • n個 Monos 的元素都發出來後組成一個 Tuple:Mono#zip
    • 在終止信號出現時“采取行動”:
    • 在 Mono 終止時轉換為一個 Mono<Void>Mono#and
    • 當 n 個 Mono 都終止時返回 Mono<Void>Mono#when
    • 返回一個存放組合數據的類型,對於被合並的多個序列:
      • 每個序列都發出一個元素時:Flux#zip
      • 任何一個序列發出元素時:Flux#combineLatest
    • 只取各個序列的第一個元素:Flux#firstMono#firstmono.or<br/>(otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux)
    • 由一個序列觸發(類似於 flatMap,不過“喜新厭舊”):switchMap
    • 由每個新序列開始時觸發(也是“喜新厭舊”風格):switchOnNext
  • 我想重復一個序列:repeat

    • ...但是以一定的間隔重復:Flux.interval(duration).flatMap(tick -&gt; myExistingPublisher)
  • 我有一個空序列,但是...

    • 我想要一個缺省值來代替:defaultIfEmpty
    • 我想要一個缺省的序列來代替:switchIfEmpty
  • 我有一個序列,但是我對序列的元素值不感興趣:ignoreElements

    • ...並且我希望用 Mono 來表示序列已經結束:then
    • ...並且我想在序列結束後等待另一個任務完成:thenEmpty
    • ...並且我想在序列結束之後返回一個 MonoMono#then(mono)
    • ...並且我想在序列結束之後返回一個值:Mono#thenReturn(T)
    • ...並且我想在序列結束之後返回一個 FluxthenMany
  • 我有一個 Mono 但我想延遲完成...

    • ...當有1個或N個其他 publishers 都發出(或結束)時才完成:Mono#delayUntilOther
    • ...使用一個函數式來定義如何獲取“其他 publisher”:Mono#delayUntil(Function)
  • 我想基於一個遞歸的生成序列的規則擴展每一個元素,然後合並為一個序列發出:
    • ...廣度優先:expand(Function)
    • ...深度優先:expandDeep(Function)

3)“窺視”(只讀)序列

  • 再不對序列造成改變的情況下,我想:

    • 得到通知或執行一些操作:
    • 發出元素:doOnNext
    • 序列完成:Flux#doOnCompleteMono#doOnSuccess
    • 因錯誤終止:doOnError
    • 取消:doOnCancel
    • 訂閱時:doOnSubscribe
    • 請求時:doOnRequest
    • 完成或錯誤終止:doOnTerminate(Mono的方法可能包含有結果)
      • 但是在終止信號向下遊傳遞 之後doAfterTerminate
    • 所有類型的信號(Signal):Flux#doOnEach
    • 所有結束的情況(完成complete、錯誤error、取消cancel):doFinally
    • 記錄日誌:log
  • 我想知道所有的事件:
    • 每一個事件都體現為一個 single 對象:
    • 執行 callback:doOnEach
    • 每個元素轉化為 single 對象:materialize
      • ...在轉化回元素:dematerialize
    • 轉化為一行日誌:log

4)過濾序列

  • 我想過濾一個序列

    • 基於給定的判斷條件:filter
    • ...異步地進行判斷:filterWhen
    • 僅限於指定類型的對象:ofType
    • 忽略所有元素:ignoreElements
    • 去重:
    • 對於整個序列:Flux#distinct
    • 去掉連續重復的元素:Flux#distinctUntilChanged
  • 我只想要一部分序列:

    • 只要 N 個元素:
    • 從序列的第一個元素開始算:Flux#take(long)
      • ...取一段時間內發出的元素:Flux#take(Duration)
      • ...只取第一個元素放到 Mono 中返回:Flux#next()
      • ...使用 request(N) 而不是取消:Flux#limitRequest(long)
    • 從序列的最後一個元素倒數:Flux#takeLast
    • 直到滿足某個條件(包含):Flux#takeUntil(基於判斷條件),Flux#takeUntilOther(基於對 publisher 的比較)
    • 直到滿足某個條件(不包含):Flux#takeWhile
    • 最多只取 1 個元素:
    • 給定序號:Flux#elementAt
    • 最後一個:.takeLast(1)
      • ...如果為序列空則發出錯誤信號:Flux#last()
      • ...如果序列為空則返回默認值:Flux#last(T)
    • 跳過一些元素:
    • 從序列的第一個元素開始跳過:Flux#skip(long)
      • ...跳過一段時間內發出的元素:Flux#skip(Duration)
    • 跳過最後的 n 個元素:Flux#skipLast
    • 直到滿足某個條件(包含):Flux#skipUntil(基於判斷條件),Flux#skipUntilOther (基於對 publisher 的比較)
    • 直到滿足某個條件(不包含):Flux#skipWhile
    • 采樣:
    • 給定采樣周期:Flux#sample(Duration)
      • 取采樣周期裏的第一個元素而不是最後一個:sampleFirst
    • 基於另一個 publisher:Flux#sample(Publisher)
    • 基於 publisher“超時”:Flux#sampleTimeout (每一個元素會觸發一個 publisher,如果這個 publisher 不被下一個元素觸發的 publisher 覆蓋就發出這個元素)
  • 我只想要一個元素(如果多於一個就返回錯誤)...
    • 如果序列為空,發出錯誤信號:Flux#single()
    • 如果序列為空,發出一個缺省值:Flux#single(T)
    • 如果序列為空就返回一個空序列:Flux#singleOrEmpty

5)錯誤處理

  • 我想創建一個錯誤序列:error...

    • ...替換一個完成的 Flux.concat(Flux.error(e))
    • ...替換一個完成的 Mono.then(Mono.error(e))
    • ...如果元素超時未發出:timeout
    • ...“懶”創建:error(Supplier&lt;Throwable&gt;)
  • 我想要類似 try/catch 的表達方式:

    • 拋出異常:error
    • 捕獲異常:
    • 然後返回缺省值:onErrorReturn
    • 然後返回一個 FluxMonoonErrorResume
    • 包裝異常後再拋出:.onErrorMap(t -&gt; new RuntimeException(t))
    • finally 代碼塊:doFinally
    • Java 7 之後的 try-with-resources 寫法:using 工廠方法
  • 我想從錯誤中恢復...

    • 返回一個缺省的:
    • 的值:onErrorReturn
    • PublisherFlux#onErrorResumeMono#onErrorResume
    • 重試:retry
    • ...由一個用於伴隨 Flux 觸發:retryWhen
  • 我想處理回壓錯誤(向上遊發出“MAX”的 request,如果下遊的 request 比較少,則應用策略)...
    • 拋出 IllegalStateExceptionFlux#onBackpressureError
    • 丟棄策略:Flux#onBackpressureDrop
    • ...但是不丟棄最後一個元素:Flux#onBackpressureLatest
    • 緩存策略(有限或無限):Flux#onBackpressureBuffer
    • ...當有限的緩存空間用滿則應用給定策略:Flux#onBackpressureBuffer 帶有策略 BufferOverflowStrategy

6) 基於時間的操作

  • 我想將元素轉換為帶有時間信息的 Tuple2&lt;Long, T&gt;...

    • 從訂閱時開始:elapsed
    • 記錄時間戳:timestamp
  • 如果元素間延遲過長則中止序列:timeout

  • 以固定的周期發出元素:Flux#interval

  • 在一個給定的延遲後發出 0:static Mono.delay.

  • 我想引入延遲:
    • 對每一個元素:Mono#delayElementFlux#delayElements
    • 延遲訂閱:delaySubscription

7)拆分 Flux

  • 我想將一個 Flux&lt;T&gt; 拆分為一個 Flux&lt;Flux&lt;T&gt;&gt;

    • 以個數為界:window(int)
    • ...會出現重疊或丟棄的情況:window(int, int)
    • 以時間為界:window(Duration)
    • ...會出現重疊或丟棄的情況:window(Duration, Duration)
    • 以個數或時間為界:windowTimeout(int, Duration)
    • 基於對元素的判斷條件:windowUntil
    • ...觸發判斷條件的元素會分到下一波(cutBefore 變量):.windowUntil(predicate, true)
    • ...滿足條件的元素在一波,直到不滿足條件的元素發出開始下一波:windowWhile (不滿足條件的元素會被丟棄)
    • 通過另一個 Publisher 的每一個 onNext 信號來拆分序列:window(Publisher)windowWhen
  • 我想將一個 Flux&lt;T&gt; 的元素拆分到集合...

    • 拆分為一個一個的 List:
    • 以個數為界:buffer(int)
      • ...會出現重疊或丟棄的情況:buffer(int, int)
    • 以時間為界:buffer(Duration)
      • ...會出現重疊或丟棄的情況:buffer(Duration, Duration)
    • 以個數或時間為界:bufferTimeout(int, Duration)
    • 基於對元素的判斷條件:bufferUntil(Predicate)
      • ...觸發判斷條件的元素會分到下一個buffer:.bufferUntil(predicate, true)
      • ...滿足條件的元素在一個buffer,直到不滿足條件的元素發出開始下一buffer:bufferWhile(Predicate)
    • 通過另一個 Publisher 的每一個 onNext 信號來拆分序列:buffer(Publisher)bufferWhen
    • 拆分到指定類型的 "collection":buffer(int, Supplier&lt;C&gt;)
  • 我想將 Flux&lt;T&gt; 中具有共同特征的元素分組到子 Flux:groupBy(Function&lt;T,K&gt;)(註意返回值是 Flux&lt;GroupedFlux&lt;K, T&gt;&gt;,每一個 GroupedFlux 具有相同的 key 值 K,可以通過 key() 方法獲取)。

8)回到同步的世界

  • 我有一個 Flux&lt;T&gt;,我想:

    • 在拿到第一個元素前阻塞:Flux#blockFirst
    • ...並給出超時時限:Flux#blockFirst(Duration)
    • 在拿到最後一個元素前阻塞(如果序列為空則返回 null):Flux#blockLast
    • ...並給出超時時限:Flux#blockLast(Duration)
    • 同步地轉換為 Iterable&lt;T&gt;Flux#toIterable
    • 同步地轉換為 Java 8 Stream&lt;T&gt;Flux#toStream
  • 我有一個 Mono&lt;T&gt;,我想:
    • 在拿到元素前阻塞:Mono#block
    • ...並給出超時時限:Mono#block(Duration)
    • 轉換為 CompletableFuture&lt;T&gt;Mono#toFuture

附2:Reactor 3 之選擇合適的操作符——響應式Spring的道法術器