1. 程式人生 > >響應式程式設計系列一《規約》

響應式程式設計系列一《規約》

提升開發效率,降低維護成本一直是開發團隊永恆不變的宗旨。近兩年來國內的技術圈子中越來越多的開始提及ReactiveX,越來越多的應用和麵試中都會有ReactiveX,響應式程式設計中RxJava可謂如魚得水。

目錄

1. 背景

1. 背景

ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,最初是LINQ的一個擴充套件,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個程式設計模型,目標是提供一致的程式設計介面,幫助開發者更方便的處理非同步資料流,Rx庫支援.NET、JavaScript和C++,Rx近幾年越來越流行了,現在已經支援幾乎全部的流行程式語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社群網站是

reactivex.io

主要版本:RxJava 1.x(官方已宣佈停止維護),RxJava 2.x(全新的API)

2. 響應式程式設計是什麼

響應式程式設計是一種基於非同步資料流概念的程式設計模式。資料流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合併為一條新的流。響應式程式設計的一個關鍵概念是事件。事件可以被等待,可以觸發過程,也可以觸發其它事件。

Rx提供了一系列的操作符,你可以使用它們來過濾(filter)、選擇(select)、變換(transform)、結合(combine)和組合(compose)多個Observable,這些操作符讓執行和複合變得非常高效。

響應式流從2013年開始,作為提供非阻塞背壓的非同步流處理標準的倡議。它旨在解決處理元素流的問題——如何將元素流從釋出者傳遞到訂閱者,而不需要釋出者阻塞,或訂閱者有無限制的緩衝區或丟棄。

 響應式流模型非常簡單——訂閱者向釋出者傳送多個元素的非同步請求。 釋出者向訂閱者非同步傳送多個或稍少的元素。響應式流在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。

3. 優勢 & 代價

優勢

  • 函式式風格:對可觀察資料流使用無副作用的輸入輸出函式,避免了程式裡錯綜複雜的狀態;
  • 簡化程式碼:Rx的操作符通通常可以將複雜的難題簡化為很少的幾行程式碼(宣告式的組合這些序列);
  • 非同步錯誤處理:傳統的try/catch沒辦法處理非同步計算,Rx提供了合適的錯誤處理機制;
  • 輕鬆使用併發:Rx的Observables和Schedulers讓開發者可以擺脫底層的執行緒同步和各種併發問題;

代價

  1. 雖然複用執行緒有助於提高吞吐量,但一旦在某個回撥函式中執行緒被卡住,那麼這個執行緒上所有的請求都會被阻塞,最嚴重的情況,整個應用會被拖垮。
  2. 難以除錯。由於 Rx 強大的描述能力,在一個典型的 Rx 應用中,大部分程式碼都是以鏈式表示式的形式出現,比如flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出錯,你將很難定位到具體是哪個環節出了問題。所幸的是,Rx 框架一般都會提供一些工具方法來輔助進行除錯。

4. Reactive Streams規約

4.1 Publisher

是潛在無限數量的有序元素的生產者。 它根據收到的要求向當前訂閱者釋出(或傳送)元素

4.2 Subscriber

從釋出者那裡訂閱並接收元素。 釋出者向訂閱者傳送訂閱令牌(subscription token)。 使用訂閱令牌,訂閱者從釋出者哪裡請求多個元素。 當元素準備就緒時,釋出者向訂閱者傳送多個或更少的元素。 訂閱者可以請求更多的元素。 釋出者可能有多個來自訂閱者的元素待處理請求。

4.3 Subscription

表示訂閱者訂閱的一個釋出者的令牌。 當訂閱請求成功時,釋出者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與釋出者進行互動,例如請求更多的元素或取消訂閱。

4.4 Processor

充當訂閱者和釋出者的處理階段。Processor<T,R>訂閱型別T的資料元素,接收並轉換為型別R的資料,併發布變換後的資料,該介面繼承了PublisherSubscriber介面。

5. 主流實現

5.1 Rx2.x

RxJava是響應式流的Java實現之一,而RxJava 2.0 已經按照Reactive-Streams specification規範完全的重寫了。

5.2 Reactive Stream

Reactor 2.0.0.RC1 於2015年02月19日由Pivotal RTI(Spring 框架發起者)釋出,支援 Reactive Stream,它的構架總覽:

注:上圖參考附錄Reactor指南中文版,Reactor1.x實在是不太出名,也是規範沒出來吧

Reactor 程式碼庫拆分成多個子模組,便於選擇所需功能,不受其他功能程式碼塊干擾。

下面舉例說明,為實現非同步目標,響應式技術和 Reactor 模組該如何搭配:

  1. Spring XD + Reactor-Net (Core/Stream): 使用 Reactor 作為 Sink/Source IO 驅動。
  2. Grails | Spring + Reactor-Stream (Core): 用 Stream 和 Promise 做後臺處理。
  3. Spring Data + Reactor-Bus (Core): 發射資料庫事件 (儲存/刪除/…​)。
  4. Spring Integration Java DSL + Reactor Stream (Core): Spring 整合的微批量資訊通道。
  5. RxJavaReactiveStreams + RxJava + Reactor-Core: 融合富結構與高效非同步 IO 處理
  6. RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream): 用 RxJava 做資料輸入,非同步 IO 驅動做傳輸。

與Rx2.x的差異

RxJava

reactor-stream

說明

Observable

reactor.rx.Stream

Reactive Stream Publisher的實現

Operator

reactor.rx.action.Action

Reactive Stream Processor的實現

Observable with 1 data at most

reactor.rx.Promise

返回唯一結果的型別,  Reactive Stream Processor實現並提供了可選的非同步分發功能。 

Factory API (just, from…)

reactor.rx.Streams

和core模組的 data-focused 子類一樣, 返回 Stream

Functional API (map, filter…)

reactor.rx.Stream

和core模組的data-focused 子類一樣, 返回Stream

Schedulers

reactor.core.Dispatcher, org.reactivestreams.Processor

Reactor Stream計算無限制的共享Dispatcher或者有限的Processor的操作。

Observable.observeOn()

Stream.dispatchOn()

只是dispatcher引數的一個適配命名。

5.3 Java9

JDK 9 java.util.concurrent 包提供了兩個主要的 API 來處理響應流:

  1. Flow
  2. SubmissionPublisher

5.4 Spring WebFlux

Spring WebFlux 是 Spring 5 的一個新模組,包含了響應式 HTTP 和 WebSocket 的支援,在容器中 Spring WebFlux 會將輸入流適配成 Mono 或者 Flux 格式進行統一處理。,另外在上層服務端支援兩種不同的程式設計模型: - 基於 Spring MVC 註解 @Controller 等 - 基於 Functional 函式式路由

為啥只能執行在 Servlet 3.1+ 容器?3.1 規範其中一個新特性是非同步處理支援。

5.5 Vertx

Vert.x是一個非同步無阻塞的網路框架,其參照物是node.js。基本上node.js能幹的事情,Vert.x都能幹。Vert.x利用Netty4的EventLoop來做單執行緒的事件迴圈,所以跑在Vert.x上的業務不能做CPU密集型的運算,這樣會導致整個執行緒被阻塞。

Vert.x目前是見過功能最強大(core、web、Data access、reacive、microservices、MQTT,第三方庫依賴最少的Java框架,它只依賴Netty4以及Jacskon,另外如果你需要建立分散式的Vert.x則再依賴HazelCast這個分散式框架,注意Vert.x3必須基於Java8。

總結,響應式程式設計已經慢慢成我我們開發中的主流,後續將帶您深入瞭解《rxjava》