1. 程式人生 > >一起來看 rxjs

一起來看 rxjs

更新日誌

  • 2018-05-26 校正
  • 2016-12-03 第一版翻譯

過去你錯過的 Reactive Programming 的簡介

你好奇於這名為Reactive Programming(反應式程式設計)的新事物, 更確切地說,你想了解它各種不同的實現(比如 [Rx*], [Bacon.js], RAC 以及其它各種各樣的框架或庫)

學習它比較困難, 因為比較缺好的學習材料(譯者注: 原文寫就時, RxJs 還在 v4 版本, 彼時社群對 RxJs 的探索還不夠完善). 我在開始學習的時候, 試圖找過教程, 不過能找到的實踐指南屈指可數, 而且這些教程只不過隔靴搔癢, 並不能幫助你做真正瞭解 RxJs 的基本概念. 如果你想要理解其中一些函式, 往往程式碼庫自帶的文件幫不到你. 說白了, 你能一下看懂下面這種文件麼:

Rx.Observable.prototype.flatMapLatest(selector, [thisArg])

按照將元素的索引合併的方法, 把一個 "observable 佇列 " 中的作為一個新的佇列加入到 "observable 佇列的佇列" 中, 然後把 "observable 佇列的佇列" 中的一個 "observable 佇列" 轉換成一個 "僅從最近的 'observable 佇列' 產生的值構成的一個新佇列."

這是都是什麼鬼?

我讀了兩本書, 一本只是畫了個大致的藍圖, 另一本則是一節一節教你 "如何使用 Reactive Libarary" . 最後我以一種艱難的方式來學習 Reactive Programming: 一遍寫, 一遍理解. 在我就職於 Futurice 的時候, 我第一次在一個真實的專案中使用它, 我在遇到問題時, 得到了來自同事的支援.

學習中最困難的地方是 以 Reactive(反應式) 的方式思考. 這意思就是, 放下你以往熟悉的程式設計中的命令式和狀態化思維習慣, 鼓勵自己以一種不同的正規化去思考. 至今我還沒在網上找到任何這方面的指南, 而我認為世界上應該有一個說明如何以 Reactive(反應式) 的方式思考的教程, 這樣你才知道要如何開始使用它. 在閱讀完本文後之後. 請繼續閱讀程式碼庫自帶的文件來指引你之後的學習. 我希望, 這篇文件對你有所幫助.

"什麼是 Reactive Programming(反應式程式設計)?"

在網上可以找到大量對此糟糕的解釋和定義. Wikipedia 的 意料之中地泛泛而談和過於理論化. Stackoverflow 的 聖經般的答案也絕對不適合初學者. Reactive Manifesto 聽起來就像是要給你公司的專案經理或者是老闆看的東西. 微軟的 Rx 術語 "Rx = Observables + LINQ + Schedulers" 也讀起來太繁重, 太微軟了, 以至於你看完後仍然一臉懵逼. 類似於 "reactive" 和 "propagation" 的術語傳達出的含義給人感覺無異於你以前用過的 MV* 框架和趁手的語言已經做到的事情. 我們現有的框架檢視當然是會對資料模型做出反應, 任何的變化當然也是要冒泡的. 要不然, 什麼東西都不會被渲染出來嘛.

所以, 讓我們撇開那些無用的說辭, 嘗試去了解本質.

Reactive programming(反應式程式設計) 是在以非同步資料流來程式設計

當然, 這也不是什麼新東西. 事件匯流排或者是典型的點選事件確實就是非同步事件流, 你可以對其進行 observe(觀察) 或者做些別的事情. 不過, Reactive 是比之更優秀的思維模型. 你能夠建立任何事物的資料流, 而不只是從點選和懸浮事件中. "流" 是普遍存在的, 一切都可能是流: 變數, 使用者輸入, 屬性, 快取, 資料結構等等. 比如, 想象你的 Twitter 時間線會成為點選事件同樣形式的資料流.

熟練掌握該思維模型之後, 你還會接觸到一個令人驚喜的函式集, 其中包含對任何的資料流進行合併、建立或者從中篩選資料的工具. 它充分展現了 "函式式" 的魅力所在. 一個流可以作為另一個流的輸入. 甚至多個流可以作為另一個流的輸入. 你可以合併兩個流. 你可以篩選出一個僅包含你需要的資料的另一個流. 你可以從一個流對映資料值到另一個流.

讓我們基於 "流是 Reactive 的中心" 這個設想, 來細緻地做看一下整個思維模型, 就從我們熟知的 "點選一個按鈕" 事件流開始.

Click event stream

每個流是一個按時序不間斷的事件序列. 它可能派發出三個東西: (某種型別的)一個數值, 一個錯誤, 或者一個 "完成" 訊號. 說到 "完成" , 舉個例子, 當包含了這個按鈕的當前視窗/檢視關閉時, 也就是 "完成" 訊號發生時.

我們僅能非同步地捕捉到這些事件: 通過定義三種函式, 分別用來捕捉派發出的數值、錯誤以及 "完成" 訊號. 有時候後兩者可以被忽略, 你只需定義用來捕捉數值的函式. 我們把對流的 "偵聽" 稱為訂閱(subscribing), 我們定義的這三種函式合起來就是觀察者, 流則是被觀察的主體(或者叫"被觀察者"). 這正是設計模式中的觀察者模式.

描述這種方式的另一種方式用 ASCII 字元來畫個導圖, 在本教程的後續的部分也能看到這種圖形.

--a---b-c---d---X---|->

a, b, c, d 代表被派發出的值
X 代表錯誤
| 代表"完成"訊號
---> 則是時間線

這些都是是老生常談了, 為了不讓你感到無聊, 現在來點新鮮的東西: 我們將原生的點選事件流進行變換, 來建立新的點選事件流.

首先, 我們做一個計數流, 來指明一個按鈕被點選了多少次. 在一般的 Reactive 庫中, 每個流都附帶了許多諸如mapfilterscan 等的方法. 當你呼叫這些方法之一(比如比如clickStream.map(f))時, 它返回一個基於 clickStream 的新的流. 它沒有對原生的點選事件做任何修改. 這種(不對原有流作任何修改的)特性叫做immutability(不可變性), 而它和 Reactive(反應式) 這個概念的契合度之高好比班戟和糖漿(譯者注: 班戟就是薄煎餅, 該稱呼多見於中國廣東地區. 此句意為 immutability 與 Reactive 兩個概念高度契合). 這樣的流允許我們進行鏈式呼叫, 比如clickStream.map(f).scan(g):

  clickStream: ---c----c--c----c------c-->
               vvvvv map(c becomes 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->

map(f) 方法根據你提供的函式f替換每個被派發的元素形成一個新的流. 在上例中, 我們將每次點選都對映為計數 1. scan(g) 方法則在內部執行x = g(accumulated, current), 以某種方式連續聚合處理該流之前所有的值, 在該例子中, g 就是簡單的加法. 然後, 一次點擊發生時, counterStream 就派發一個點選數的總值.

為了展示 Reactive 真正的能力, 我們假設你想要做一個 "雙擊事件" 的流. 或者更厲害的, 我們假設我們想要得到一個 "三擊事件流" , 甚至推廣到更普遍的情況, "多擊流". 現在, 深呼吸, 想象一下按照傳統的命令式和狀態化思維習慣要如何完成這項工作? 我敢說那會煩死你了, 它必須包含各種各樣用來保持狀態的變數, 以及一些對週期性工作的處理.

然而, 以 Reactive 的方式, 它會非常簡單. 事實上, 這個邏輯只不過是四行程式碼. 不過讓我們現在忘掉程式碼.無論你是個初學者還是專家, 藉助導圖來思考, 才是理解和構建流最好的方法.

Multiple clicks stream

圖中的灰色方框是將一個流轉換成另一個流的方法. 首先, 每經過 "250毫秒" 的 "事件靜默" (簡單地說, 這是在 buffer(stream.throttle(250ms)) 完成的. (現在先)不必擔心對這點的細節的理解, 我們主要是演示下 Reactive 的能力.), 我們就得到了一個 "點選動作" 的列表, 即, 轉換的結果是一個列表的流, 而從這個流中我們應用 map() 將每個列表對映成對應該佇列的長度的整數值. 最後, 我們使用 filter(x >= 2) 方法忽略掉所有的 1. 如上: 這 3 步操作將產生我們期望的流. 我們之後可以訂閱("偵聽")它, 並按我們希望的處理方式處理流中的資料.

我希望你感受到了這種方式的美妙. 這個例子只是一次不過揭示了冰山一角: 你可以將相同的操作應用到不同種類的流上, 比如 API 返回的流中. 除此以外, 還有許多有效的函式.

"為什麼我應該採用反應式程式設計?"

Reactive Programming (反應式程式設計) 提升了你程式碼的抽象層次, 你可以更多地關注用於定義業務邏輯的事件之間的互相依賴, 而不必寫大量的細節程式碼來處理事件. RP(反應式程式設計)的程式碼會更簡潔明瞭.

在現代網頁應用和移動應用中, 這種好處是顯而易見的, 這些場景下, 與資料事件關聯的大量 UI 事件需要被高頻地互動. 10 年前, 和 web 頁面的互動只是很基礎地提交一個長長的表單給後端, 然後執行一次簡單的重新渲染. 在這 10 年間, App 逐漸變得更有實時性: 修改表單中的單個欄位能夠自動觸發一次到後端的儲存動作, 對某個內容的 "點贊" 需要實時反饋到其他相關的使用者......

現今的 App 有大量的實時事件, 它們共同作用, 以帶給使用者良好的體驗. 我們要能簡潔處理這些事件的工具, 而 Reactive Programming 方式我們想要的.

舉例說明如何以反應式程式設計的方式思考

現在我們進入到實戰. 一個真實的手把手教你如何以 RP(反應式程式設計) 的方式來思考的例子. 注意這裡不是隨處抄來的例子, 不是半吊子解釋的概念. 到這篇教程結束為止, 我們會在寫出真正的功能性程式碼的同時, 理解我們做的每個動作.

我選擇了 JavaScript 和 RxJS 作為工具, 原因是, JavaScript 是當下最為人熟知的語言, 而 [Rx*] 支援多數語言和平臺 (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等). , 無論你的工具是什麼, 你可以從這篇教程中收益.

實現一個"建議關注"盒子

在 Twitter 上, 有一個 UI 元素是建議你可以關注的其它賬戶.

Twitter Who to follow suggestions box

我們將著重講解如何模仿出它的核心特性:

  • 在頁面啟動時, 從 API 中載入賬戶資料, 並展示三個推薦關注者
  • 在點選"重新整理"時, 載入另外的三個推薦關注的賬戶, 形成新三行
  • 在點選一個賬戶的 "x" 按鈕時, 清除該賬戶並展示一個新的
  • 每一行顯示賬戶的頭像和到他們主頁的連結

我們可以忽略其它的特性和按鈕, 它們都是次要的. 另外, Twitter 最近關閉了非認證請求介面, 作為替代, 我們使用 [Github 的 API] 來構建這個關注別人 UI.(注: 到本稿的最新的校正為止, github 的該介面對非認證使用者啟用了一段時間內訪問頻次限制)

如果你想盡早看一下完整的程式碼, 請點選[樣例程式碼].

請求和回覆

你如何用 Rx 處理這個問題?

首先, (幾乎) 萬物皆可為流 .這是 "Rx 口訣". 讓我們從最容易的特性開始: "在頁面啟動時, 從 API 中載入賬戶資料". 這沒什麼難得, 只需要(1) 發一個請求, (2) 讀取回復, (3) 渲染回覆的中的資料. 所以我們直接把我們我們的請求當做流. 一開始就用流也許頗有"殺雞焉用牛刀"的意味, 但為了理解, 我們需要從基本的例子開始.

在應用啟動的時候, 我們只需要一個請求, 因此如果我們將它作為一個數據流, 它將會只有一個派發的值. 我們知道之後我們將有更多的請求, 但剛開始時只有一個.

--a------|->

其中 a 是字串 'https://api.github.com/users'

這是一個將請求的 URL 的流. 無論請求何時發生, 它會告訴我們兩件事: 請求發生的時刻和內容. 請求執行之時就是事件派發之時, 請求的內容就是被派發的值: 一個 URL 字串.

建立這樣一個單值流對 [Rx*] 來說非常簡單, 官方對於流的術語, 是 "Observable"(可被觀察者), 顧名思義它是可被觀察的, 但我覺得這名字有點傻, 所以我稱呼它為 _流_.

var requestStream = Rx.Observable.just('https://api.github.com/users');

但現在, 這只是一個字串流, 不包含其他操作, 所以我們需要要在值被派發的時候做一些事情. 這依靠對流的訂閱.

requestStream.subscribe(function(requestUrl) {
  // 執行該請求
  jQuery.getJSON(requestUrl, function(responseData) { // ... }); }

注意我們使用了 jQuery Ajax 回撥(我們假定你應已對此有了解)來處理請求操作的非同步性. 但稍等, Rx 就是處理 非同步 資料流的. 難道這個請求的回覆不就是一個在未來某一刻會帶回返回資料的流麼? 從概念上講, 它看起來就是的, 我們來嘗試寫一下.

requestStream.subscribe(function(requestUrl) {
  // 執行該請求
  var responseStream = Rx.Observable.create(function (observer) { jQuery.getJSON(requestUrl) .done(function(response) { observer.onNext(response); }) .fail(function(jqXHR, status, error) { observer.onError(error); }) .always(function() { observer.onCompleted(); }); }); responseStream.subscribe(function(response) { // 對回覆做一些處理 }); }

Rx.Observable.create() 所做的是自定義一個流, 這個流會通知其每個觀察者(或者說其"訂閱者" )有資料產生 (onNext()) 或發生了錯誤 (onError()). 我們需要做的僅僅是包裝 jQuery Ajax Promise. 稍等, 這難道是說 Promise 也是一個 Observable?

 

是的. Observable 就是一個 Promise++ 物件. 在 Rx 中, 通過執行 var stream = Rx.Observable.fromPromise(promise) 你就可以把一個 Promise 轉換成一個 Observable. 僅有的區別在於 Observables 不符合 Promises/A+ 標準, 但他們在概念上是不衝突的. 一個 Promise 就是一個僅派發一個值的 Observable. Rx 流就是允許多次返回值的 Promise.

這個例子很可以的, 它展示了 Observable 是如何至少有 Promise 的能力. 因此如果你喜歡 Promise, 請注意 Rx Observable 也可以做到同樣的事.

現在回到我們的例子上, 也許你已經注意到了, 我們在一箇中 subscribe() 呼叫了另一個 subscribe(), 這有點像回撥地獄. 另外, responseStream 的建立也依賴於 requestStream. 但正如前文所述, 在 Rx 中有簡單的機制來最流作變換並支援從其他流建立一個新的流, 接下來我們來做這件事.

到目前為止, 你應該知道的對流進行變換的一個基礎方法是 map(f), 將 "流 A" 中的每一個元素作 f() 處理, 然後在 "流 B" 中生成一一對應的值. 如果我們這樣處理我們的請求和回覆流, 我們可以把請求 URL 對映到回覆的 Promise (被當做是流) 中.

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });

這下我們建立了一個叫做 元流 (流的流) 的奇怪的東西. 不必對此感到疑惑, 元流, 就是其中派發值是流的流. 你可以把它想象成 指標): 每個被派發的值都是對其它另一個流的 指標 . 在我們的例子中, 每個請求的 URL 都被對映為一個指標, 指向一個個包含 URL 對應的返回資料的 promise 流.

Response metastream

這個元流看上去有點讓人迷惑, 而且對我們根本沒什麼用. 我們只是想要一個簡單的回覆流, 其中每個派發的值都應是一個 JSON 物件, 而不是一個包含 JSON 物件的 Promise. 現在來認識 Flatmap: 它類似於 map(), 但它是把 "分支" 流中派發出的的每一項值在 "主幹" 流中派發出來, 如此, 它就可以對元流進行扁平化處理.(譯者注: 這裡, "分支" 流指的是元流中每個被派發的值, "主幹" 流是指這些值有序構成的流, 由於元流中的每個值都是流, 作者不得不用 "主幹" 和 "分支" 這樣的比喻來描述元流與其值的關係). 在此, Flatmap 並不是起到了"修正"的作用, 元流也並不是一個 bug, 相反, 它們正是 Rx 中處理非同步回覆流的工具.

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });

Response stream

漂亮. 因為回覆流是依據請求流定義的, 設想之後
有更多的發生在請求流中的事件, 不難想象, 就會有對應的發生在回覆流中的的回覆事件:

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(小寫的是一個請求, 大寫的是一個回覆)

現在我們終於得到了回覆流, 我們就可以渲染接收到的資料

responseStream.subscribe(function(response) {
  // 按你設想的方式渲染 `response` 為 DOM
});

整理一下到目前為止的程式碼, 如下:

var requestStream = Rx.Observable.just('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); }); responseStream.subscribe(function(response) { // 按你設想的方式渲染 `response` 為 DOM });

重新整理按鈕

現在我們注意到, 回覆中的 JSON 是一個包含 100 個使用者的列表. [Github 的 API] 只允許我們指定一頁的偏移量, 而不能指定讀取的一頁中的專案數量, 所以我們只用到 3 個數據物件, 剩下的 97 個只能浪費掉. 我們暫時忽略這個問題, 之後我們會看到通過快取回覆來處理它.

每次重新整理按鈕被點選的時候, 請求流應該派發一個新的 URL, 因此我們會得到一個新的回覆. 我們需要兩樣東西: 一個重新整理按鈕的點選事件流(口訣: 萬物皆可成流), 並且我們需要改變請求流以依賴重新整理點選流. 好在, RxJs 擁有從事件監聽器產生 Observable 的工具.

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

既然重新整理點選事件自身不帶任何 API URL, 我們需要對映每次點選為一個實際的 URL. 現在我們將請求流改成重新整理點選流, 這個流被對映為每次帶有隨機的偏移引數的、到 API 的請求.

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

如果我直接這樣寫, 也不做自動化測試, 那這段程式碼其實有個特性沒實現. 即請求不會在頁面載入完時發生, 只有當重新整理按鈕被點選的時候才會. 但其實, 兩種行為我們都需要: 重新整理按鈕被點選的時候的請求, 或者是頁面剛開啟時的請求.

兩種場景下需要不同的流:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

但我們如何才能"合併"這兩者為同一個呢? 有一個 merge() 方法. 用導圖來解釋的話, 它看起來像是這樣的.

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

那我們要做的事就變得很容易了:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge( requestOnRefreshStream, startupRequestStream );

也有另外一種更乾淨的、不需要中間流的寫法:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .merge(Rx.Observable.just('https://api.github.com/users'));

甚至再短、再有可讀性一點:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .startWith('https://api.github.com/users');

startWith() 會照你猜的那樣去工作: 給流一個起點. 無論你的輸入流是怎樣的, 帶 startWith(x) 的輸出流總會以 x 作為起點. 但我這樣做還不夠 [DRY], 我把 API 字串寫了兩次. 一種修正的做法是把 startWith() 用在 refreshClickStream 上, 這樣可以從"模擬"在頁面載入時一次重新整理點選事件.

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

漂亮. 如果你現在回頭去看我說 "有個特性沒實現" 的那一段, 你應該能看出那裡的程式碼和這裡的程式碼的區別僅僅是多了一個 startWith().

使用流來建立"3個推薦關注者"的模型

到現在為止, 我們只是寫完了一個發生在回覆流的 subscribe() 中的 推薦關注者 的 UI. 對於重新整理按鈕, 我們要解決一個問題: 一旦你點選了"重新整理", 現在的三個推薦關注者仍然沒有被清理. 新的推薦關注者只在請求內回覆後才能拿到, 不過為了讓 UI 看上去令人舒適, 我們需要在重新整理按鈕被點選的時候就清理當前的推薦關注者.

refreshClickStream.subscribe(function() {
  // 清理 3 個推薦關注者的 DOM 元素
});

稍等一下. 這樣做不太好, 因為這樣我們就有兩個會影響到推薦關注者的 DOM 元素的 subscriber (另一個是 responseStream.subscribe()), 這聽起來不符合 Separation of concerns. 還記得 Reactive 口訣嗎?

Mantra

在 "萬物皆可為流" 的指導下, 我們把推薦關注者構建為一個流, 其中每個派發出來的值都是一個包含了推薦關注人資料的 JSON 物件. 我們會對三個推薦關注者的資料分別做這件事. 像這樣來寫:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 從列表中隨機獲取一個使用者 return listUsers[Math.floor(Math.random()*listUsers.length)]; });

至於獲取另外兩個使用者的流, 即 suggestion2Stream 和 suggestion3Stream, 只需要把 suggestion1Stream 複製一遍就行了. 這不夠 [DRY], 不過對我們的教程而言, 這樣能讓我們的示例簡單些, 同時我認為, 思考如何在這個場景下避免重複編寫 suggestion[N]Stream 也是個好的思維練習, 就留給讀者去考慮吧.

我們讓渲染的過程發生在回覆流的 subscribe() 中, 而是這樣做:

suggestion1Stream.subscribe(function(suggestion) {
  // 渲染第 1 個推薦關注者
});

回想之前我們說的 "重新整理的時候, 清理推薦關注者", 我們可以簡單地將重新整理單擊事件對映為 "null" 資料(它代表當前的推薦關注者為空), 並且在 suggestion1Stream 做這項工作, 如下:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 從列表中隨機獲取一個使用者 return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) );

在渲染的時候, 我們把 null 解釋為 "沒有資料", 隱藏它的 UI 元素.

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) { // 隱藏第 1 個推薦關注者元素 } else { // 顯示第 1 個推薦關注者元素並渲染資料 } });

整個情景是這樣的:

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q--> suggestion3Stream: ----t-----N---t----N-t-->

其中 N 表示 null

(譯者注: 注意, 當 refreshClickStream 產生新值, 即使用者進行點選時, null 的產生總是立刻發生在 refreshClickStream 之後; 而 refreshClickStream => requestStream => responseStream, responseStream 中的值, 是發給 API 介面的非同步請求的結果, 這個結果的產生往往會需要花一點時間, 必然在 null 之後, 因此可以達到 "為了讓 UI 看上去令人舒適, 我們需要在重新整理按鈕被點選的時候就清理當前的推薦關注者" 的效果).

稍微完善一下, 我們會在頁面啟動的時候也會渲染 "空" 推薦關注人. 為此可以 startWith(null) 放在推薦關注人的流裡:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 從列表中隨機獲取一個使用者 return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

最後我們得到的流:

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q--> suggestion3Stream: -N--t-----N----t----N-t-->

關閉推薦關注人, 並利用已快取的回覆資料

目前還有一個特性沒有實現. 每個推薦關注人格子應該有它自己的 'x' 按鈕來關閉它, 然後載入另一個數據來代替. 也許你的第一反應是, 用一種簡單方法: 在點選關閉按鈕的時候, 發起一個請求, 然後更新這個推薦人:

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click'); // close2Button 和 close3Button 重複此過程 var requestStream = refreshClickStream.startWith('startup click') .merge(close1ClickStream) // 把關閉按鈕加在這裡 .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

然而這沒不對. (由於 refreshClickStream 影響了所有的推薦人流, 所以)該過程會關閉並且重新載入_所有的_推薦關注人, 而不是僅更新我們想關掉的那一個. 這裡有很多方式來解決這個問題, 為了玩點炫酷的, 我們會重用之前的回覆資料中別的推薦人. API 返回的資料每頁包含 100 個使用者, 但我們每次只用到其中的 3 個, 所以我們有很多有效的重新整理資料可以用, 沒必要再請求新的.

再一次的, 讓我們用流的思維來思考. 當一個 'close1'點選事件發生的時候, 我們使用 responseStream中 最近被派發的 回覆來從回覆的使用者列表中隨機獲取一個使用者. 如下:

    requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

在 [Rx*] 中, 有一個合成器方法叫做 combineLatest, 似乎可以完成我們想做的事情. 它把兩個流 A 和 B 作為其輸入, 而當其中任何一個派發值的時候, combineLatest 會把兩者最近派發的值 a 和 b 按照 c = f(x,y) 的方法合併處理再輸出, 其中 f 是你可以定義的方法. 用圖來解釋也許更清楚:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

在該例中, f 是一個轉換為全大寫的函式

我們可以把 combineLatest() 用在 close1ClickStream 和 responseStream 上, 因此一旦 "關閉按鈕1" 被點選(導致 close1ClickStream 產生新值), 我們都能得到最新的返回資料, 並在 suggestion1Stream中產生一個新的值. 由於 combineLatest() 的對稱性的, 任何時候, 只要 responseStream 派發了一個新的回覆, 它也將合併最新的一次 '關閉按鈕1被點選' 事件來產生一個新的推薦關注人. 這個特性非常有趣, 因為它允許我們簡化我們之前的 suggestion1Stream , 如下:

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

在上述思考中, 還有一點東西被遺漏. combineLatest() 使用了兩個資料來源中最近的資料, 但是如果這些源中的某個從未派發過任何東西, combineLatest() 就不能產生一個數據事件到輸出流. 如果你再細看上面的 ASCII 圖, 你會發現當第一個流派發 a 的時候, 不會有任何輸出. 只有當第二個流派發 b 的時候才能產生一個輸出值.

有幾種方式來解決該問題, 我們仍然採取最簡單的一種, 就是在頁面啟動的時候模擬一次對 '關閉按鈕1' 按鈕的點選:

var suggestion1Stream = close1ClickStream.startWith('startup click') // 把對"關閉按鈕1"的點選的模擬加在這裡
  .combineLatest(responseStream,
    function(click, listUsers) {l return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

總結整理

現在我們的工作完成了. 完整的程式碼如下所示:

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click'); var closeButton1 = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click'); // close2 和 close3 是同樣的邏輯 var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var responseStream = requestStream .flatMap(function (requestUrl) { return Rx.Observable.fromPromise($.ajax({url: requestUrl})); }); var suggestion1Stream = close1ClickStream.startWith('startup click') .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null); // suggestion2Stream 和 suggestion3Stream 是同樣的邏輯 suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // 隱藏第 1 個推薦關注者元素 } else { // 顯示第 1 個推薦關注者元素並渲染資料 } });

你可以在這裡檢視完整的[樣例程式碼]

很慚愧, 這只是一個微小的程式碼示例, 但它的資訊量很大: 它著重表現了, 如何對關注點進行適當的隔離, 從而對不同流進行管理, 甚至充分利用了返回資料的流. 這樣的函式式風格使得程式碼像宣告式多於像命令式: 我們並不用給出一個要執行的的結構化序列, 我們只是通過定義流之間的關係來表達系統中每件事物是什麼. 舉例來說, 通過 Rx, 我們告訴計算機 _suggestion1Stream 就是 點選關閉按鈕1 的流, 與最近一個API返回的(使用者中隨機選擇的一個)的使用者的流, 重新整理時產生 null 的流, 和應用啟動時產生 null 的流的合併流_.

回想一下那些你熟稔的流程控制的語句(比如 ifforwhile), 以及 Javascript 應用中隨處可見的基於回撥的控制流. (只要你願意, )你甚至可以在上文的 subscribe() 中不寫 if 和 else, 而是(在 observable 上)使用 filter()(這一塊我就不寫實現細節了, 留給你作為練習). 在 Rx 中, 有很多流處理方法, 比如 mapfilterscanmergecombineLateststartWith, 以及非常多用於控制一個事件驅動的程式的流的方法. 這個工具集讓你用更少的程式碼而寫出更強大的效果.

接下來還有什麼?

如果你願意用 [Rx*] 來做反應式程式設計, 請花一些時間來熟悉這個 函式列表, 其中涉及如何變換, 合併和建立 Observables (被觀察者). 如果你想以圖形的方式理解這些方法, 可以看一下 彈珠圖解 RxJava. 一旦你對理解某物有困難的時候, 試著畫一畫圖, 基於圖來思考, 看一下函式列表, 再繼續思考. 以我的經驗, 這樣的學習流程非常有用.

一旦你熟悉瞭如何使用 [Rx] 進行變成, 理解冷熱酸甜, 想吃就吃...哦不, 冷熱 Observables 就很有必要了. 反正就算你跳過了這一節, 你也會回來重新看的, 勿謂言之不預也. 建議通過學習真正的函數語言程式設計來磨練你的技巧, 並且熟悉影響各種議題, 比如"影響 [Rx] 的副作用"什麼的.

不過, 實現了反應式程式設計的庫並非並非只有 [Rx]. [Bacon.js] 的執行機制就很直觀, 理解它不像理解 [Rx] 那麼難; [Elm Language] 在特定的應用場景有很強的生命裡: 它是一種會編譯到 Javascript + HTML + CSS 的反應式程式語言, 它的特色在於 [time travelling debugger]. 這些都很不錯.

Rx 在嚴重依賴事件的前端應用中表現優秀. 但它不只是只為客戶端應用服務的, 在接近資料庫的後端場景中也大有可為. 實際上, [RxJava 正是啟用 Netflex 服務端併發能力的關鍵]. Rx 不是一個嚴格限於某種特定型別應用的框架或者是語言. 它其實是一種正規化, 你可以在任何事件驅動的軟體中實踐它.

 


原文連結
本文為雲棲社群原創內容,未經允許不得轉載。