我所瞭解的RxJS
RxJS 是使用 Observables的響應式程式設計的庫,它使編寫非同步或基於回撥的程式碼更容易,是ReactiveX程式設計理念的JavaScript版本。RxJS的強大之處正是它使用純函式來產生值的能力。這意味著你的程式碼更不容易出錯。
安裝
官方安裝
npm install rxjs /// 匯入整個核心功能集: import Rx from 'rxjs/Rx'; Rx.Observable.of(1,2,3) 複製程式碼
推薦安裝
根據官方安裝發現rxjs不能完全載入,需要依賴rxjs-compat包,推薦使用以下安裝
npm i -s Rxjs@6 rxjs-compat@6 import * as Rx from 'rxjs/Rx' 複製程式碼
RxJS核心概念

Observable簡介

Observable舉例說明
Rx.Observable.of('1', '2', '3').map(x=>x*10).filter(x=>x>5).subscribe(x=>console.log(x)) 複製程式碼
- 建立過程 : Rx.Observable.of('1', '2', '3') 建立一個依次傳送1、2、3的observable
- 邏輯過程 :*.map().filter()*每個值乘以10,然後去過濾出大於5的值。如果先寫filter操作符,然後再map,則得不到資料
- 訂閱過程 :*subscribe()*類似回撥函式。這個過程會得到一個物件subscription。
- 執行過程 : x=>console.log(x) 預設情況下為執行next回撥
- 清理過程 : 示例如下
const subscription = Rx.Observable.of('1','2','3').map(x=>x*10).filter(x=>x>5).delay(1000).subscribe(x=>console.log(x)); subscription.unsubscribe() 複製程式碼
Subject簡介
什麼是 Subject?- RxJS Subject 是一種特殊型別的 Observable,它允許將值多播給多個觀察者,所以 Subject 是多播的,而普通的 Observables 是單播的(每個已訂閱的觀察者都擁有 Observable 的獨立執行)。 每個 Subject 都是 Observable 。 - 對於 Subject,你可以提供一個觀察者並使用 subscribe 方法,就可以開始正常接收值。從觀察者的角度而言,它無法判斷 Observable 執行是來自普通的 Observable 還是 Subject 。 在 Subject 的內部,subscribe 不會呼叫傳送值的新執行。它只是將給定的觀察者註冊到觀察者列表中,類似於其他庫或語言中的 addListener 的工作方式。 每個 Subject 都是觀察者。 - Subject 是一個有如下方法的物件: next(v)、error(e) 和 complete() 。要給 Subject 提供新值,只要呼叫 next(theValue),它會將值多播給已註冊監聽該 Subject 的觀察者們。 Subject 像是 Observable,但是可以多播給多個觀察者。Subject 還像是 EventEmitters,維護著多個監聽器的登錄檔。 根據官網,我們大概可以以下理解: Observable類似單車道單行線,逆行或者多輛車同時開都是不允許的 Subject類似沒有監控的雙行線,隨你往哪裡開,怎麼開,多少車開都沒有問題 所以可以理解Subject是一類特殊的Observable,它可以向多個Observer多路推送數值。普通的Observable並不具備多路推送的能力(每一個Observer都有自己獨立的執行環境),而Subject可以共享一個執行環境
Subject舉例說明
const test = Observable.interval(1000).take(3); const observerA = { v => console.log(`a:${v}`) } const observerB = { v => console.log(`b:${v}`) }///定義好observable test .subscribe(observerA) setTimeout(() => {test .subscribe(observerB) }, 2000) ///因為observable是單播的,所以會輸出 a:0、a:1、b:0、a:2、b:1、b:2 const subject = new Subject() subject.subscribe(observerA) test.subscribe(subject) setTimeout(() => {subject.subscribe(observerB)}, 2000) ///因為Subject是多播的,共享一個執行,所以輸出為:a:0、a:1、a:2、b:2 複製程式碼
Subject多型
由於subject的特殊性,衍生出多種subject的變體,具體就不闡述了,他們的對比如下圖
Rxjs | 是否儲存資料 | 是否需要初始值 | 何時向訂閱者釋出資料 |
---|---|---|---|
Subject | 否 | 否 | 及時釋出,有新資料就釋出 |
BehaviorSubject | 是,儲存最後一條資料或者初始值 | 是 | 及時釋出,有新資料就釋出 |
ReplaySubject | 是,儲存所有資料 | 否 | 及時釋出,有新資料就釋出 |
AsyncSubject | 是,儲存最後一條資料 | 是 | 延時釋出,只有當資料來源完成時才會釋出 |
Scheduler簡介
什麼是Scheduler? - Scheduler控制著何時啟動 subscription 和何時傳送通知。它由三部分組成
排程器是一種資料結構。它知道如何根據優先順序或其他標準來儲存任務和將任務進行排序。 排程器是執行上下文 。 它表示在何時何地執行任務(舉例來說,立即的,或另一種回撥函式機制(比如 setTimeout 或 process.nextTick),或動畫幀)。 排程器有一個(虛擬的)時鐘 。 排程器功能通過它的 getter 方法 now() 提供了“時間”的概念。在具體排程器上安排的任務將嚴格遵循該時鐘所表示的時間。 排程器可以讓你規定 Observable 在什麼樣的執行上下文中傳送通知給它的觀察者。
操作符歸納
RxJS提供了各種API來建立資料流:
單值:of, empty, never 多值 :from 定時 :interval, timer 從事件建立 :fromEvent 從Promise建立 :fromPromise 自定義建立 :create
創建出來的資料流是一種可觀察的序列,可以被訂閱,也可以被用來做一些轉換操作,比如:
改變資料形態:map, mapTo, pluck 過濾一些值 :filter, skip, first, last, take 時間軸上的操作 :delay, timeout, throttle, debounce, audit, bufferTime 累加 :reduce, scan 異常處理 :throw, catch, finally, retry, 條件執行 :takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn 轉接 :switch
也可以對若干個資料流進行組合:
concat,保持原來的序列順序連線兩個資料流 merge ,合併序列 race ,預設條件為其中一個數據流完成 forkJoin ,預設條件為所有資料流都完成 zip ,取各來源資料流最後一個值合併為物件 combineLatest ,取各來源資料流最後一個值合併為陣列
RxJS 難點
RxJS 處理非同步邏輯,資料流,事件非常擅長。使用Rxjs前處理資料一般是處於一種'上帝'視角來對資料視覺化的除錯,Rxjs大大縮短了程式碼量的同時能夠更好的達到資料的處理(純淨性)。正是由於其強大的特性,所以學習Rxjs有以下難點(個人認為) 1、抽象程度比較高,需要開發人員具備比較強的歸納總結能力 2、操作符多而且雜,需要花大力氣記住並且合理使用各個操作符
測試題
- 1、滑鼠點選後console相隔2秒輸出5的倍數
- 2、現有3個非同步操作a、b、c,請提供讓三個非同步並行完成後同時輸出值的方法
- 3、’人和未來大資料‘ ===》 取最後4個字(多種方法)
- 4、模擬一個程式設計師,工資不漲,每天賺相同的錢n,錢足夠了(100n)就買房,買了房然後把房子租給別人,每個月收取房租m(5n),然後收入變成n+m,然後錢足夠了繼續買房,然後繼續租給訪客,收入變成n+2m
參考答案
//////題目1 const timer = Rx.Observable.interval(2000); const event = Rx.Observable.fromEvent(document, 'click') event.switchMap(() => timer) .map(x => x * 5) .subscribe(x => console.log('第1題:' + x)); 複製程式碼
/////題目2 const fa = (cb) => { setTimeout(() => cb('a'), 1000); } const fb = (cb) => { setTimeout(() => cb('b'), 2000); } const fc = (cb) => { setTimeout(() => cb('c'), 4000); } const oa = Rx.Observable.bindCallback(fa); const ob = Rx.Observable.bindCallback(fb); const oc = Rx.Observable.bindCallback(fc); Rx.Observable.combineLatest(oa(),ob(),oc()) .subscribe(x => console.log('第2題:' + x)); /////同時還可以用forkJoin,zip 複製程式碼
//////題目3 const str = "人和未來大資料"; const param = str.split(''); Rx.Observable.from(param) .takeLast(4) .subscribe(x => console.log('第3題:' + x)); /////////////////////////////////////////////////////// Rx.Observable.from(param).subscribe(new ReplaySubject(3)) 複製程式碼
///////題目4 const house$ = new Subject()///房子 const houseCount$ = house$.scan((acc, num) => acc + num, 0).startWith(0) ///房子數 // 工資始終不漲 const salary$ = Observable.interval(100).mapTo(1) //程式設計師工資n const rent$ = Observable.interval(3000) .withLatestFrom(houseCount$) .map(arr => arr[1] * 5) // 一買了房,就沒現金了…… const income$ = Observable.merge(salary$, rent$) const cash$ = income$ .scan((acc, num) => { const newSum = acc + num const newHouse = Math.floor(newSum / 100) if (newHouse > 0) { house$.next(newHouse) } return newSum % 100 }, 0) houseCount$.subscribe(num => console.log(`houseCount: ${num}`)) cash$.subscribe(num => console.log(`cash: ${num}`)) 複製程式碼
作者簡介: 張栓,人和未來大資料前端工程師,專注於html/css/js的學習與開發。