1. 程式人生 > >響應式編程(Reactive Programming)(Rx)介紹

響應式編程(Reactive Programming)(Rx)介紹

基本 master 個人 點擊 next() 展示 練習 already rand

很明顯你是有興趣學習這種被稱作響應式編程的新技術才來看這篇文章的。

學習響應式編程是很困難的一個過程,特別是在缺乏優秀資料的前提下。剛開始學習時,我試過去找一些教程,並找到了為數不多的實用教程,但是它們都流於表面,從沒有圍繞響應式編程構建起一個完整的知識體系。庫的文檔往往也無法幫助你去了解它的函數。不信的話可以看一下這個:

通過合並元素的指針,將每一個可觀察的元素序列放射到一個新的可觀察的序列中,然後將多個可觀察的序列中的一個轉換成一個只從最近的可觀察序列中產生值得可觀察的序列。

天啊。

我看過兩本書,一本只是講述了一些概念,而另一本則糾結於如何使用響應式編程庫。我最終放棄了這種痛苦的學習方式,決定在開發中一邊使用響應式編程,一邊理解它。在 Futurice 工作期間,我嘗試在真實項目中使用響應式編程,並且當我遇到困難時,得到了同事們的幫助。

在學習過程中最困難的一部分是 以響應式編程的方式思考 。這意味著要放棄命令式且帶狀態的編程習慣,並且要強迫你的大腦以一種不同的方式去工作。在互聯網上我找不到任何關於這方面的教程,而我覺得這世界需要一份關於怎麽以響應式編程的方式思考的實用教程,這樣你就有足夠的資料去起步。庫的文檔無法為你的學習提供指引,而我希望這篇文章可以。

“什麽是響應式編程?”

在互聯網上有著一大堆糟糕的解釋與定義。Wikipedia 一如既往的空泛與理論化。Stackoverflow 的權威答案明顯不適合初學者。Reactive Manifesto 看起來是你展示給你公司的項目經理或者老板們看的東西。微軟的 Rx terminology "Rx = Observables + LINQ + Schedulers" 過於重量級且微軟味十足,只會讓大部分人困惑。相對於你所使用的 MV* 框架以及鐘愛的編程語言,"Reactive" 和 "Propagation of change" 這些術語並沒有傳達任何有意義的概念。框架的 Views 層當然要對 Models 層作出反應,改變當然會傳播。如果沒有這些,就沒有東西會被渲染了。

所以不要再扯這些廢話了。

響應式編程是使用異步數據流進行編程

一方面,這並不是什麽新東西。Event buses 或者 Click events 本質上就是異步事件流,你可以監聽並處理這些事件。響應式編程的思路大概如下:你可以用包括 Click 和 Hover 事件在內的任何東西創建 Data stream。Stream 廉價且常見,任何東西都可以是一個 Stream:變量、用戶輸入、屬性、Cache、數據結構等等。舉個例子,想像一下你的 Twitter feed 就像是 Click events 那樣的 Data stream,你可以監聽它並相應的作出響應。

在這個基礎上,你還有令人驚艷的函數去組合、創建、過濾這些 Streams。這就是函數式魔法的用武之地。Stream 能接受一個,甚至多個 Stream 為輸入。你可以融合兩個 Stream,也可以從一個 Stream 中過濾出你感興趣的 Events 以生成一個新的 Stream,還可以把一個 Stream 中的數據值 映射到一個新的 Stream 中。

既然 Stream 在響應式編程中如此重要,那麽我們就應該好好的了解它們,就從我們熟悉的"Clicks on a button" Event stream 開始。

技術分享

Stream 就是一個按時間排序的 Events 序列,它可以放射三種不同的 Events:(某種類型的)Value、Error 或者一個" Completed" Signal。考慮一下"Completed"發生的時機,例如,當包含這個按鈕的窗口或者視圖被關閉時。

通過分別為 Value、Error、"Completed"定義事件處理函數,我們將會異步地捕獲這些 Events。有時可以忽略 Error 與"Completed",你只需要定義 Value 的事件處理函數就行。監聽一個 Stream 也被稱作是訂閱 ,而我們所定義的函數就是觀察者,Stream則是被觀察者,其實就是 Observer Design Pattern。

上面的示意圖也可以使用ASCII重畫為下圖,在下面的部分教程中我們會使用這幅圖:

    --a---b-c---d---X---|->

    a, b, c, d are emitted values
    X is an error
    | is the ‘completed‘ signal
    ---> is the timeline 

既然已經開始對響應式編程感到熟悉,為了不讓你覺得無聊,我們可以嘗試做一些新東西:我們將會把一個 Click event stream 轉為新的 Click event stream。

首先,讓我們做一個能記錄一個按鈕點擊了多少次的計數器 Stream。在常見的響應式編程庫中,每個Stream都會有多個方法,如 map, filter, scan, 等等。當你調用其中一個方法時,例如 clickStream.map(f),它就會基於原來的 Click stream 返回一個新的 Stream 。它不會對原來的 Click steam 作任何修改。這個特性稱為不可變性,它對於響應式編程 Stream,就如果汁對於薄煎餅。我們也可以對方法進行鏈式調用,如 clickStream.map(f).scan(g)

    clickStream: ---c----c--c----c------c-->
               vvvvv map(c becomes 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
    counterStream: ---1----2--3----4------5--> 

map(f) 會根據你提供的 f 函數把原 Stream 中的 Value 分別映射到新的 Stream 中。在我們的例子中,我們把每一次 Click 都映射為數字 1。scan(g) 會根據你提供的 g 函數把 Stream 中的所有 Value 聚合成一個 Value x = g(accumulated, current) ,這個示例中 g 只是一個簡單的添加函數。然後,每 Click 一次, counterStream 就會把點擊的總次數發給它的觀察者。

為了展示響應式編程真正的實力,讓我們假設你想得到一個包含“雙擊”事件的 Stream。為了讓它更加有趣,假設我們想要的這個 Stream 要同時考慮三擊(Triple clicks),或者更加寬泛,連擊(兩次或更多)。深呼吸一下,然後想像一下在傳統的命令式且帶狀態的方式中你會怎麽實現。我敢打賭代碼會像一堆亂麻,並且會使用一些變量保存狀態,同時也有一些計算時間間隔的代碼。

而在響應式編程中,這個功能的實現就非常簡單。事實上,這邏輯只有 4 行代碼。但現在我們先不管那些代碼。用圖表的方式思考是理解怎樣構建Stream的最好方法,無論你是初學者還是專家。

技術分享

灰色的方框是用來轉換 Stream 函數的。首先,簡而言之,我們把連續 250 ms 內的 Click 都積累到一個列表中(就是buffer(stream.throttle(250ms) 做的事。不要在意這些細節,我們只是展示一下響應式編程而已)。結果是一個列表的 Stream ,然後我們使用 map() 把每個列表映射為一個整數,即它的長度。最終,我們使用 filter(x >= 2) 把整數 1 給過濾掉。就這樣,3 個操作就生成了我們想要的 Stream。然後我們就可以訂閱(“監聽”)這個 Stream,並以我們所希望的方式作出反應。

我希望你能感受到這個示例的優美之處。這個示例只是冰山一角:你可以把同樣的操作應用到不同種類的 Stream 上,例如,一個 API 響應的 Stream;另一方面,還有很多其它可用的函數。

“為什麽我要使用響應式編程(RP)?”

響應式編程提高了代碼的抽象層級,所以你可以只關註定義了業務邏輯的那些相互依賴的事件,而非糾纏於大量的實現細節。RP 的代碼往往會更加簡明。

特別是在開發現在這些有著大量與數據事件相關的 UI events 的高互動性 Webapps、手機 apps 的時候,RP 的優勢就更加明顯。10年前,網頁的交互就只是提交一個很長的表單到後端,而在前端只產生簡單的渲染。Apps 就表現得更加的實時了:修改一個表單域就能自動地把修改後的值保存到後端,為一些內容"點贊"時,會實時的反應到其它在線用戶那裏等等。

現在的 Apps 有著大量各種各樣的實時 Events,以給用戶提供一個交互性較高的體驗。我們需要工具去應對這個變化,而響應式編程就是一個答案。

以 RP 方式思考的例子

讓我們做一些實踐。一個真實的例子一步一步的指導我們以 RP 的方式思考。不是虛構的例子,也沒有只解釋了一半的概念。學完教程之後,我們將寫出真實可用的代碼,並做到知其然,知其所以然。

在這個教程中,我將會使用 JavaScript 和 RxJS 作為工具 ,因為JavaScript是現在最多人會的語言,而 Rx* library family 有多種語言版本,並支持多種平臺(.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等)。所以,無論你用的是什麽工具,你都能從下面這個教程中受益。

實現"Who to follow"推薦界面

在 Twitter 上,這個表明其他賬戶的 UI 元素看起來是這樣的:

技術分享

我們將會重點模擬它的核心功能,如下:

  • 啟動時從 API 那裏加載帳戶數據,並顯示 3 個推薦
  • 點擊"Refresh"時,加載另外 3 個推薦用戶到這三行中
  • 點擊帳戶所在行的‘x‘按鈕時,只清除那一個推薦然後顯示一個新的推薦
  • 每行都會顯示帳戶的頭像,以及他們主頁的鏈接

我們可以忽略其它的特性和按鈕,因為它們是次要的。同時,因為 Twitter 最近關閉了對非授權用戶的 API,我們將會為 Github 實現這個推薦界面,而非 Twitter。這是Github獲取用戶的API。

如果你想先看一下最終效果,這裏有完成後的代碼 http://jsfiddle.net/staltz/8jFJH/48/。

請求和響應

在 Rx 中你該怎麽處理這個問題呢? 好吧,首先,(幾乎) 所有的東西都可以轉為一個Stream 。這就是Rx的咒語。讓我們先從最簡單的特性開始:"在啟動時,從API加載3個帳戶的數據"。這並沒有什麽特別,就只是簡單的(1)發出一個請求,(2)收到一個響應,(3)渲染這個響應。所以,讓我們繼續,並用Stream代表我們的請求。一開始可能會覺得殺雞用牛刀,但我們應當從最基本的開始,對吧?

在啟動的時候,我們只需要發出一個請求,所以如果我們把它轉為一個Data stream的話,那就是一個只有一個Value的Stream。稍後,我們知道將會有多個請求發生,但現在,就只有一個請求。

    --a------|->

    Where a is the string ‘https://api.github.com/users‘ 

這是一個我們想向其發出請求的 URL 的 Stream。每當一個請求事件發生時,它會告訴我們兩件事:"什麽時候"與"什麽東西"。"什麽時候"這個請求會被執行,就是什麽時候這個 Event 會被映射。"什麽東西"會被請求,就是這個映射出來的值:一個包含 URL 的 String。

在 RX 中,創建只有一個值的 Stream 是非常簡單的。官方把一個 Stream 稱作“Observable”,因為它可以被觀察,但是我發現那是個很愚蠢的名子,所以我把它叫做 Stream*。

    var requestStream = Rx.Observable.just(‘https://api.github.com/users‘); 

但是現在,那只是一個包含了String的Stream,並沒有其他操作,所以我們需要以某種方式使那個值被映射。就是通過subscribing 這個 Stream。

    requestStream.subscribe(function(requestUrl) {
    // execute the request
    jQuery.getJSON(requestUrl, function(responseData) {
        // ...
    });
    }

留意一下我們使用了 jQuery 的 Ajax 函數(我們假設你已經知道 should know already)去處理異步請求操作。但先等等,Rx 可以用來處理異步 Data stream。那這個請求的響應就不能當作一個包含了將會到達的數據的 Stream 嗎?當然,從理論上來講,應該是可以的,所以我們嘗試一下。

    requestStream.subscribe(function(requestUrl) {
    // execute the request
    var responseStream = Rx.Observable.create(function (observer) {
        jQuery.getJSON(requestUrl)
        .done(function(response) { observer.onNext(response); })
        .fail(function(jqXHR, status, error) { observer.onError(error); })
        .always(function() { observer.onCompleted(); });
    });

    responseStream.subscribe(function(response) {
        // do something with the response
    });
    } 

Rx.Observable.create()所做的事就是通過顯式的通知每一個 Observer (或者說是“Subscriber”) Data events(onNext() )或者 Errors ( onError() )來創建你自己的 Stream。而我們所做的就只是把 jQuery Ajax Promise 包裝起來而已。打擾一下,這意味者Promise本質上就是一個Observable?

技術分享

是的。

Observable 就是 Promise++。在 Rx 中,你可以用 var stream = Rx.Observable.fromPromise(promise) 輕易的把一個 Promise 轉為 Observable,所以我們就這樣子做吧。唯一的不同就是 Observable 並不遵循 Promises/A+,但概念上沒有沖突。Promise 就是只有一個映射值的 Observable。Rx Stream 比 Promise 更進一步的是允許返回多個值。

這樣非常不錯,並展現了 Observables 至少有 Promise 那麽強大。所以如果你相信 Promise 宣傳的那些東西,那麽也請留意一下 Rx Observables 能勝任些什麽。

現在回到我們的例子,如果你已經註意到了我們在 subscribe() 內又調用了另外一個 subscribe() ,這類似於 Callback hell。同樣,你應該也註意到 responseStream 是建立在 requestStream 之上的。就像你之前了解到的那樣,在 Rx 內有簡單的機制可以從其它 Stream 中轉換並創建出新的 Stream,所以我們也應該這樣子做。

你現在需要知道的一個基本的函數是 [map(f)](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypemapselector-thisarg) ,它分別把 f() 應用到 Stream A 中的每一個值中,並把返回的值放進 Stream B 裏。如果我們也對請求 Stream 與響應 Stream 進行同樣的處理,我們可以把 Request URL 映射為響應 Promise(而 Promise 可以轉為 Streams)。

    var responseMetastream = requestStream
    .map(function(requestUrl) {
     return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

然後,我們將會創造一個叫做" Metastream "的怪物:包含 Stream 的 Stream。暫時不需要害怕。Metastream 就是一個 Stream,其中映射的值還是另外一個 Stream。你可以把它想像為 pointers:每個映射的值都是一個指向其它 Stream 的指針。在我們的例子裏,每個請求 URL 都會被映射一個指向包含響應 Promise stream 的指針。

技術分享

Response 的 Metastream 看起來會讓人困惑,並且看起來也沒有幫到我們什麽。我們只想要一個簡單的響應 stream,其中每個映射的值應該是 JSON 對象,而不是一個 JSON 對象的‘Promise‘。是時候介紹 (Mr. Flatmap)(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypeflatmapselector-resultselector) 了:它是 map() 的一個版本,通過把應用到"trunk" Stream 上的所有操作都應用到"branch" Stream 上,可以"flatten" Metastream。Flatmap 並不是用來"修復" Metastream 的,因為 Metastream 也不是一個漏洞,這只是一些用來處理 Rx 中的異步響應的工具。

    var responseStream = requestStream
    .flatMap(function(requestUrl) {
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

技術分享

很好。因為響應stream是根據請求 stream定義的,所以 如果 我們後面在請求 stream上發起更多的請求的話,在響應 stream上我們將會得到相應的響應事件,就像預期的那樣:

    requestStream:  --a-----b--c------------|->
    responseStream: -----A--------B-----C---|->

    (lowercase is a request, uppercase is its response) 

現在,我們終於有了一個響應 stream,所以可以把收到的數據渲染出來了:

    responseStream.subscribe(function(response) {
    // render `response` to the DOM however you wish
    }); 

把目前為止所有的代碼放到一起就是這樣:

    var requestStream = Rx.Observable.just(‘https://api.github.com/users‘);

    var responseStream = requestStream
    .flatMap(function(requestUrl) {
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
    });

    responseStream.subscribe(function(response) {
    // render `response` to the DOM however you wish
    }); 

刷新按鈕

我之前並沒有提到返回的 JSON 是一個有著 100 個用戶數據的列表。因為這個 API 只允許我們設置偏移量,而無法設置返回的用戶數,所以我們現在是只用了 3 個用戶的數據而浪費了另外 97 個的數據。這個問題暫時可以忽略,稍後我們會學習怎麽緩存這些數據。

每點擊一次刷新按鈕,請求 stream 就會映射一個新的 URL,同時我們也能得到一個新的響應。我們需要兩樣東西:一個是刷新按鈕上 Click events 組成的 Stream(咒語:一切都能是 Stream),同時我們需要根據刷新 click stream 而改變請求 stream。幸運的是,RxJS 提供了從 Event listener 生成 Observable 的函數。

    var refreshButton = document.querySelector(‘.refresh‘);
    var refreshClickStream = Rx.Observable.fromEvent(refreshButton, ‘click‘); 

既然刷新 click event 本身並沒有提供任何要請求的 API URL,我們需要把每一次的 Click 都映射為一個實際的 URL。現在,我們把刷新 click stream 改為新的請求 stream,其中每一個 Click 都分別映射為帶有隨機偏移量的 API 端點。

    var requestStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return ‘https://api.github.com/users?since=‘ + randomOffset;
      }); 

因為我比較笨並且也沒有使用自動化測試,所以我剛把之前做好的一個特性毀掉了。現在在啟動時不會再發出任何的請求,而只有在點擊刷新按鈕時才會。額...這兩個行為我都需要:無論是點擊刷新按鈕時還是剛打開頁面時都該發出一個請求。

我們知道怎麽分別為這兩種情況生成 Stream:

    var requestOnRefreshStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return ‘https://api.github.com/users?since=‘ + randomOffset;
    });

    var startupRequestStream = Rx.Observable.just(‘https://api.github.com/users‘); 

但我們怎樣才能把這兩個"融合"為一個呢?好吧,有 merge() 函數。這就是它做的事的圖解:

    stream A: ---a--------e-----o----->
    stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o-----> 

這樣就簡單了:

    var requestOnRefreshStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return ‘https://api.github.com/users?since=‘ + randomOffset;
    });

    var startupRequestStream = Rx.Observable.just(‘https://api.github.com/users‘);

    var requestStream = Rx.Observable.merge(
    requestOnRefreshStream, startupRequestStream
    ); 

還有一個更加簡潔的可選方案,不需要使用中間變量。

    var requestStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return ‘https://api.github.com/users?since=‘ + randomOffset;
    })
      .merge(Rx.Observable.just(‘https://api.github.com/users‘)); 

甚至可以更簡短,更具有可讀性:

    var requestStream = refreshClickStream
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return ‘https://api.github.com/users?since=‘ + randomOffset;
    })
      .startWith(‘https://api.github.com/users‘);

startWith() 函數做的事和你預期的完全一樣。無論你輸入的 Stream 是怎樣,startWith(x) 輸出的 Stream 一開始都是 x 。但是還不夠 DRY,我重復了 API 終端 string。一種修復的方法是去掉 refreshClickStream 最後的startWith() ,並在一開始的時候"模擬"一次刷新 Click。

var requestStream = refreshClickStream.startWith(‘startup click‘)
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  });

很好。如果你把之前我"毀掉了的版本"的代碼和現在的相比,就會發現唯一的不同是加了 startWith() 函數。

用 Stream 構建三個推薦

到現在為止,我們只是談及了這個推薦 UI 元素在 responeStream 的 subscribe() 內執行的渲染步驟。對於刷新按鈕,我們還有一個問題:當你點擊‘刷新’ 時,當前存在的三個推薦並不會被清除。新的推薦會在響應到達後出現,為了讓 UI 看起來舒服一些,當點擊刷新時,我們需要清理掉當前的推薦。

    refreshClickStream.subscribe(function() {
    // clear the 3 suggestion DOM elements 
    });

不,別那麽快,朋友。這樣不好,我們現在有兩個訂閱者會影響到推薦的 DOM 元素(另外一個是responseStream.subscribe() ),而且這樣完全不符合 Separation of concerns。還記得響應式編程的咒語麽?

技術分享

所以讓我們把顯示的推薦設計成一個 stream,其中每一個映射的值都是包含了推薦內容的 JSON 對象。我們以此把三個推薦內容分開來。現在第一個推薦看起來是這樣子的:

    var suggestion1Stream = responseStream
    .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
      }); 

其他的, suggestion2Streamsuggestion3Stream 可以簡單的拷貝 suggestion1Stream 的代碼來使用。這不是 DRY,它會讓我們的例子變得更加簡單一些,加之我覺得這是一個可以幫助考慮如何減少重復的良好實踐。

我們不在 responseStream 的 subscribe() 中處理渲染了,我們這麽處理:

    suggestion1Stream.subscribe(function(suggestion) {
    // render the 1st suggestion to the DOM
    }); 

回到"當刷新時,清理掉當前的推薦",我們可以很簡單的把刷新點擊映射為 null,並且在 suggestion1Stream 中包含進來,如下:

    var suggestion1Stream = responseStream
    .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
    })
    .merge(
        refreshClickStream.map(function(){ return null; })
      );

當渲染時,null 解釋為"沒有數據",所以把 UI 元素隱藏起來。

    suggestion1Stream.subscribe(function(suggestion) {
    if (suggestion === null) {
        // hide the first suggestion DOM element
    }
    else {
        // show the first suggestion DOM element
        // and render the data
    }
    }); 

現在的示意圖:

    refreshClickStream: ----------o--------o---->
    requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->   
    suggestion1Stream: ----s-----N---s----N-s-->
    suggestion2Stream: ----q-----N---q----N-q-->
    suggestion3Stream: ----t-----N---t----N-t--> 

其中,N 代表了 null

作為一種補充,我們也可以在一開始的時候就渲染“空的”推薦內容。這通過把 startWith(null) 添加到 Suggestion stream 就完成了:

    var suggestion1Stream = responseStream
    .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
    })
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
      .startWith(null); 

現在結果是:

    refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->   
    suggestion1Stream: -N--s-----N----s----N-s-->
    suggestion2Stream: -N--q-----N----q----N-q-->
    suggestion3Stream: -N--t-----N----t----N-t--> 

關閉推薦並使用緩存的響應

還有一個功能需要實現。每一個推薦,都該有自己的"X"按鈕以關閉它,然後在該位置加載另一個推薦。最初的想法,點擊任何關閉按鈕時都需要發起一個新的請求:

    var close1Button = document.querySelector(‘.close1‘);
    var close1ClickStream = Rx.Observable.fromEvent(close1Button, ‘click‘);
    // and the same for close2Button and close3Button

    var requestStream = refreshClickStream.startWith(‘startup click‘)
    .merge(close1ClickStream) // we added this
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return ‘https://api.github.com/users?since=‘ + randomOffset;
      }); 

這個沒有效果。這將會關閉並且重新加載 所有 的推薦,而不是僅僅處理我們點擊的那一個。有一些不一樣的方法可以解決,並且讓它變得更加有趣,我們可以通過復用之前的請求來解決它。API 的響應頁面有 100 個用戶,而我們僅僅使用其中的三個,所以還有很多的新數據可以使用,無須重新發起請求。

同樣的,我們用Stream的方式來思考。當點擊‘close1‘時,我們想要用 responseStream 最近的映射從響應列表中獲取一個隨機的用戶,如:

    requestStream: --r--------------->
    responseStream: ------R----------->
    close1ClickStream: ------------c----->
    suggestion1Stream: ------s-----s-----> 

在 Rx* 中, 叫做連接符函數的 combineLatest 似乎實現了我們想要的功能。它接受兩個 Stream,A 和 B 作為輸入,當其中一個 Stream 發射一個值時, combineLatest 把最近兩個發射的值 a 和 b 從各自的 Stream 中取出並且返回一個 c = f(x,y) ,其中 f 為你定義的函數。用圖來表示更好:

    stream A: --a-----------e--------i-------->
    stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

    where f is the uppercase function 

我們可以在 close1ClickStreamresponseStream 上使用 combineLatest(),所以無論什麽時候當一個按鈕被點擊時,我們可以獲得最新的響應發射值,並且在 suggestion1Stream 上產生一個新的值。另一方面,combineLatest() 是對稱的,當一個新的響應在 responseStream 發射時,它將會把最後的‘關閉 1‘的點擊事件一起合並來產生一個新的推薦。這是有趣的,因為它允許我們把之前的 suggestion1Stream 代碼簡化成下邊這個樣子:

    var suggestion1Stream = close1ClickStream
    .combineLatest(responseStream,             
        function(click, listUsers) {
        return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
    )
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
      .startWith(null); 

還有一個問題需要解決。combineLatest() 使用最近的兩個數據源,但是當其中一個來源沒發起任何事件時,combineLatest() 無法在 Output stream 中產生一個 Data event。從上邊的 ASCII 圖中,你可以看到,當第一個 Stream 發射值 a 時,這個值時並沒有任何輸出產生,只有當第二個 Stream 發射值 b 時才有值輸出。

有多種方法可以解決這個問題,我們選擇最簡單的一種,一開始在‘close 1‘按鈕上模擬一個點擊事件:

    var suggestion1Stream = close1ClickStream.startWith(‘startup click‘) // we added this
    .combineLatest(responseStream,             
        function(click, listUsers) {l
         return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
    )
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
      .startWith(null); 

結束

終於完成了,所有的代碼合在一起是這樣子:

    var refreshButton = document.querySelector(‘.refresh‘);
    var refreshClickStream = Rx.Observable.fromEvent(refreshButton, ‘click‘);

    var closeButton1 = document.querySelector(‘.close1‘);
    var close1ClickStream = Rx.Observable.fromEvent(closeButton1, ‘click‘);
    // and the same logic for close2 and close3

    var requestStream = refreshClickStream.startWith(‘startup click‘)
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return ‘https://api.github.com/users?since=‘ + randomOffset;
    });

    var responseStream = requestStream
    .flatMap(function (requestUrl) {
        return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
    });

    var suggestion1Stream = close1ClickStream.startWith(‘startup click‘)
    .combineLatest(responseStream,             
        function(click, listUsers) {
        return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
    )
    .merge(
        refreshClickStream.map(function(){ return null; })
    )
    .startWith(null);
    // and the same logic for suggestion2Stream and suggestion3Stream

    suggestion1Stream.subscribe(function(suggestion) {
    if (suggestion === null) {
        // hide the first suggestion DOM element
    }
    else {
        // show the first suggestion DOM element
        // and render the data
    }
     }); 

你可以查看這個最終效果 http://jsfiddle.net/staltz/8jFJH/48/

這段代碼雖然短小,但實現了不少功能:它適當的使用 Separation of concerns 實現了對 Multiple events 的管理,甚至緩存了響應。函數式的風格讓代碼看起來更加 Declarative 而非 Imperative:我們並非給出一組指令去執行,而是通過定義 Stream 之間的關系 定義這是什麽 。舉個例子,我們使用 Rx 告訴計算機 *suggestion1Stream* 是 由 ‘close 1‘ Stream 與最新響應中的一個用戶合並而來,在程序剛運行或者刷新時則是 null

留意一下代碼中並沒有出現如 ifforwhile 這樣的控制語句,或者一般 JavaScript 應用中典型的基於回調的控制流。如果你想使用 filter() ,上面的 subscribe() 中甚至可以不用 ifelse (實現細節留給讀者作為練習)。在 Rx 中,我們有著像 mapfilterscanmergecombineLateststartWith 這樣的 Stream 函數,甚至更多類似的函數去控制一個事件驅動(Event-driven)的程序。這個工具集讓你可以用更少的代碼實現更多的功能。

接下來會發生什麽

如果你覺得 Rx* 會成為你首選的響應式編程庫,花點時間去熟悉這個big list of functions,它包括了如何轉換、合並、以及創建 Observable。如果你想通過圖表去理解這些函數,請看 RxJava‘s very useful documentation with marble diagrams。無論什麽時候你遇到問題,畫一下這些圖,思考一下,看一下這一大串函數,然後繼續思考。以我個人經驗,這樣效果很明顯。

一旦你開始使用 Rx 去編程,很有必要去理解 Cold vs Hot Observables 中的概念。如果忽略了這些,你一不小心就會被它坑了。我提醒過你了。通過學習真正的函數式編程去提升自己的技能,並熟悉那些會影響到 Rx 的問題,比如副作用。

但是響應式編程不僅僅是 Rx。還有相對容易理解的 Bacon.js,它沒有 Rx 那些怪癖。Elm Language 則以它自己的方式支持 RP:它是一門會編譯成 Javascript + HTML + CSS 的響應式編程語言 ,並有一個 time travelling debugger。非常厲害。

Rx 在需要處理大量事件的 Frontend 和 Apps 中非常有用。但它不僅僅能用在客戶端,在後端或者與數據庫交互時也非常有用。事實上,RxJava 是實現Netflix‘s API服務器端並發的一個重要組件 。Rx 並不是一個只能在某種應用或者語言中使用的 Framework。它本質上是一個在開發任何 Event-driven 軟件中都能使用的編程範式。

如果教程幫到你了,請支持。

響應式編程(Reactive Programming)(Rx)介紹