[譯] 認識 rxjs 中的 BehaviorSubject、ReplaySubject 以及 AsyncSubject
原文連結: Understanding rxjs BehaviorSubject, ReplaySubject and AsyncSubject
原文作者:Luuk Gruijs;發表於2018年5月4日
Subject 的作用是實現 Observable 的多播。由於其 Observable execution 是在多個訂閱者之間共享的,所以它可以確保每個訂閱者接收到的資料絕對相等。不僅使用 Subject 可以實現多播,RxJS 還提供了一些 Subject 的變體以應對不同場景,那就是:BehaviorSubject、ReplaySubject 以及 AsyncSubject。
如果你還不知道 Subject(主題)是什麼的話,建議先讀讀我的上一篇文章: 認識 rxjs 中的 Subject 。如果你覺得 OK 沒問題,那咱繼續!

攝影:Cory Schadt,來自Unsplash
BehaviorSubject
BehaviorSubject 是 Subject 的變體之一。 BehaviorSubject 的特性就是它會儲存“當前”的值。這意味著你始終可以直接拿到 BehaviorSubject 最後一次發出的值。
有兩種方法可以拿到 BehaviorSubject “當前”的值:訪問其 .value
屬性或者直接訂閱。如果你選擇了訂閱,那麼 BehaviorSubject 將直接給訂閱者傳送當前儲存的值,無論這個值有多麼“久遠”。請看下面的例子:
import * as Rx from "rxjs"; const subject = new Rx.BehaviorSubject(Math.random()); // 訂閱者 A subject.subscribe((data) => { console.log('Subscriber A:', data); }); subject.next(Math.random()); // 訂閱者 B subject.subscribe((data) => { console.log('Subscriber B:', data); }); subject.next(Math.random()); console.log(subject.value) // 輸出 // Subscriber A: 0.24957144215097515 // Subscriber A: 0.8751123892486292 // Subscriber B: 0.8751123892486292 // Subscriber A: 0.1901322109907977 // Subscriber B: 0.1901322109907977 // 0.1901322109907977 複製程式碼
詳細講解一下:
.value
另外,你可能發現了 BehaviorSubject 在建立時是需要設定一個初始值的。這一點在 Observable 上就非常難實現,而在 BehaviorSubject 上,只要傳遞一個值就行了。
譯者注:當前版本的 RxJS 中 BehaviorSubject()
是必須要設定初始值的,否則會導致執行錯誤,而原文並沒有體現這一點。所以我在這一段做了較多改動,以免誤導讀者。詳見BehaviorSubject。
ReplaySubject
相比 BehaviorSubject 而言,ReplaySubject 是可以給新訂閱者傳送“舊”資料的。另外,ReplaySubject 還有一個額外的特性就是它可以記錄一部分的 observable execution,從而儲存一些舊的資料用來“重播”給新來的訂閱者。
當建立 ReplaySubject 時,你可以指定儲存的資料量以及資料的過期時間。也就是說,你可以實現:給新來的訂閱者“重播”訂閱前一秒內的最後五個已廣播的值。示例程式碼如下:
import * as Rx from "rxjs"; const subject = new Rx.ReplaySubject(2); // 訂閱者 A subject.subscribe((data) => { console.log('Subscriber A:', data); }); subject.next(Math.random()) subject.next(Math.random()) subject.next(Math.random()) // 訂閱者 B subject.subscribe((data) => { console.log('Subscriber B:', data); }); subject.next(Math.random()); // Subscriber A: 0.3541746356538569 // Subscriber A: 0.12137498878080955 // Subscriber A: 0.531935186034298 // Subscriber B: 0.12137498878080955 // Subscriber B: 0.531935186034298 // Subscriber A: 0.6664809293975393 // Subscriber B: 0.6664809293975393 複製程式碼
簡單解讀一下程式碼:
- 我們建立了一個 ReplaySubject 並指定其只儲存最近兩次廣播的值;
- 訂閱 subject 並稱其為訂閱者A;
- subject 連續廣播三次,同時訂閱者A 也會跟著連續列印三次;
- 這一步就輪到 ReplaySubject 展現魔力了。我們再次訂閱 subject 並稱其為訂閱者B,因為之前我們指定 subject 儲存最近兩次廣播的值,所以 subject 會將上兩個值“重播”給訂閱者B。我們可以看到訂閱者B 隨即列印了這兩個值;
- subject 最後一次廣播,兩個訂閱者收到值並列印。
之前提到了你還可以設定 ReplaySubject 的資料過期時間。讓我們來看看下面這個例子:
import * as Rx from "rxjs"; const subject = new Rx.ReplaySubject(2, 100); // 訂閱者A subject.subscribe((data) => { console.log('Subscriber A:', data); }); setInterval(() => subject.next(Math.random()), 200); // 訂閱者B setTimeout(() => { subject.subscribe((data) => { console.log('Subscriber B:', data); }); }, 1000) // Subscriber A: 0.44524184251927656 // Subscriber A: 0.5802631630066313 // Subscriber A: 0.9792165506699135 // Subscriber A: 0.3239616040117268 // Subscriber A: 0.6845077617520203 // Subscriber B: 0.6845077617520203 // Subscriber A: 0.41269171141525707 // Subscriber B: 0.41269171141525707 // Subscriber A: 0.8211466186035139 // Subscriber B: 0.8211466186035139 複製程式碼
同樣解讀一下程式碼:
- 我們建立了一個 ReplaySubject,指定其儲存最近兩次廣播的值,但只保留 100ms;
- 訂閱 subject 並稱其為訂閱者A;
- 我們讓這個 subject 每 200ms 廣播一次。訂閱者A 每次都會收到值並列印;
- 我們設定在程式執行一秒後再次訂閱 subject,並稱其為訂閱者B。這意味著在它開始訂閱之前,subject 就已經廣播過五次了。由於我們在建立 subject 時就設定了資料的過期時間為 100ms,而廣播間隔為 200ms,所以訂閱者B 在開始訂閱後只會收到前五次廣播中最後一次的值。
AsyncSubject
BehaviorSubject 和 ReplaySubject 都可以用來儲存一些資料,而 AsyncSubject 就不一樣了。AsyncSubject 只會在 Observable execution 完成後,將其最終值發給訂閱者。請看程式碼:
import * as Rx from "rxjs"; const subject = new Rx.AsyncSubject(); // 訂閱者A subject.subscribe((data) => { console.log('Subscriber A:', data); }); subject.next(Math.random()) subject.next(Math.random()) subject.next(Math.random()) // 訂閱者B subject.subscribe((data) => { console.log('Subscriber B:', data); }); subject.next(Math.random()); subject.complete(); // Subscriber A: 0.4447275989704571 // Subscriber B: 0.4447275989704571 複製程式碼
雖然這段程式碼的輸出並不多,但我們還是照例解讀一下:
- 建立 AsyncSubject;
- 訂閱 subject 並稱其為訂閱者A;
- subject 連續廣播三次,但什麼也沒發生;
- 再次訂閱 subject 並稱其為訂閱者B;
- subject 又廣播了一次,但還是什麼也沒發生;
- subject 完成。兩個訂閱者這才收到傳來的值並列印至終端。