RxJS Subject
觀察者模式,它定義了一種一對多的關係,讓多個觀察者物件同時監聽某一個主題物件,這個主題物件的狀態發生變化時就會通知所有的觀察者物件,使得它們能夠自動更新自己。
我們可以使用日常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,他們之間的關係如下:
- 期刊出版方 —— 負責期刊的出版和發行工作。
- 訂閱者 —— 只需執行訂閱操作,新版的期刊釋出後,就會主動收到通知,如果取消訂閱,以後就不會再收到通知。
在觀察者模式中也有兩個主要角色:Subject(主題)和 Observer (觀察者),它們分別對應例子中的期刊出版方和訂閱者。
訂閱 Observable
在介紹 RxJS Subject 之前,我們先來看個示例:
import { interval } from "rxjs"; import { take } from "rxjs/operators"; const interval$ = interval(1000).pipe(take(3)); interval$.subscribe(value => console.log("Observer A get value: " + value)); setTimeout(() => { interval$.subscribe(value => console.log("Observer B get value: " + value)); }, 1000);
以上程式碼執行後,控制檯的輸出結果:
Observer A get value: 0 Observer A get value: 1 Observer B get value: 0 Observer A get value: 2 Observer B get value: 1 Observer B get value: 2
通過以上示例,我們可以得出以下結論:
- Observable 物件可以被重複訂閱。
- Observable 物件每次被訂閱後,都會重新執行。
上面的示例,我們可以簡單地認為兩次呼叫普通的函式,具體參考以下程式碼:
function interval() { setInterval(() => console.log('..'), 1000); } interval(); setTimeout(() => { interval(); }, 1000);
Observable 物件的預設行為,適用於大部分場景。但有些時候,我們會希望在第二次訂閱的時候,不會從頭開始接收 Observable 發出的值,而是從第一次訂閱當前正在處理的值開始傳送,我們把這種處理方式成為組播。
上述的需求要如何實現呢?我們已經知道了觀察者模式定義了一對多的關係,我們可以讓多個觀察者物件同時監聽同一個主題,這裡就是我們的時間序列流。當資料來源發出新值的時,所有的觀察者就能接收到新的值。
自定義 Subject
- Subject 類定義
class Subject { observers = []; addObserver(observer) { this.observers.push(observer); } next(value) { this.observers.forEach(o => o.next(value)); } error(error) { this.observers.forEach(o => o.error(error)); } complete() { this.observers.forEach(o => o.complete()); } }
- 使用示例
const interval$ = interval(1000).pipe(take(3)); const subject = new Subject(); let observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; var observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.addObserver(observerA); // 新增觀察者A interval$.subscribe(subject); // 訂閱interval$物件 setTimeout(() => { subject.addObserver(observerB); // 新增觀察者B }, 1000);
以上程式碼執行後,控制檯的輸出結果:
Observer A get value: 0 Observer A get value: 1 Observer B get value: 1 Observer A get value: 2 Observer B get value: 2 Observer A complete! Observer B complete!
RxJS Subject
其實 RxJS 也為我們提供了 Subject 類,接下我們來利用 RxJS 的 Suject 重寫一下上面的示例:
import { interval, Subject } from "rxjs"; import { take } from "rxjs/operators"; const interval$ = interval(1000).pipe(take(3)); const subject = new Subject(); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); // 新增觀察者A interval$.subscribe(subject); // 訂閱interval$物件 setTimeout(() => { subject.subscribe(observerB); // 新增觀察者B }, 1000);
以上程式碼執行後,控制檯的輸出結果:
Observer A get value: 0 Observer A get value: 1 Observer B get value: 1 Observer A get value: 2 Observer B get value: 2 Observer A complete! Observer B complete!
根據上述的示例程式碼和控制檯的輸出結果,我們來總結一下 Subject 的特點:
- Subject 既是 Observable 物件,又是 Observer 物件。
- 當有新訊息時,Subject 會通知內部的所有觀察者。
RxJS Subject & Observable
Subject 其實是觀察者模式的實現,所以當觀察者訂閱 Subject 物件時,Subject 物件會把訂閱者新增到觀察者列表中,每當有 subject 物件接收到新值時,它就會遍歷觀察者列表,依次呼叫觀察者內部的next()
方法,把值一一送出。
Subject 之所以具有 Observable 中的所有方法,是因為 Subject 類繼承了 Observable 類,在 Subject 類中有五個重要的方法:
- next —— 每當 Subject 物件接收到新值的時候,next 方法會被呼叫。
- error —— 執行中出現異常,error 方法會被呼叫。
- complete —— Subject 訂閱的 Observable 物件結束後,complete 方法會被呼叫。
- subscribe —— 新增觀察者。
- unsubscribe —— 取消訂閱(設定終止識別符號、清空觀察者列表)。
除了 Subject 之外,RxJS 還為我們提供了 Subject 的幾種變體,如 BehaviorSubject、ReplaySubject 和 AsyncSubject。下面我們來分別介紹一下它們。
BehaviorSubject
有些時候我們會希望 Subject 能儲存當前的最新狀態,而不是單純的進行事件傳送,也就是說每當新增一個觀察者的時候,我們希望 Subject 能夠立即發出當前最新的值,而不是沒有任何響應。
為了說明上述的情景,我們先來分析一下以下示例:
import { Subject } from "rxjs"; const subject = new Subject(); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
以上程式碼執行後,控制檯的輸出結果:
Observer A get value: 1 Observer A get value: 2 Observer A get value: 3
通過輸出結果,我們發現在 observerB 訂閱 Subject 物件後,它再也沒有收到任何值了。因為 Subject 物件沒有再呼叫next()
方法。但很多時候我們會希望 Subject 物件能夠儲存當前的狀態,當新增訂閱者的時候,自動把當前最新的值傳送給訂閱者。要實現這個功能,我們就需要使用 BehaviorSubject。
BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用來儲存當前最新的值,而不是單純的傳送事件。BehaviorSubject 會記住最近一次傳送的值,並把該值作為當前值儲存在內部的屬性中。
下面我們來使用 BehaviorSubject 重寫上面的示例:
import { BehaviorSubject } from "rxjs"; const subject = new BehaviorSubject(0); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
以上程式碼執行後,控制檯的輸出結果:
Observer A get value: 0 Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 3
通過以上示例,我們知道 BehaviorSubject 會記住最近一次傳送的值,當新的觀察者進行訂閱時,就會接收到最新的值。然後有些時候,我們新增的訂閱者,可以接收到資料來源最近傳送的幾個值,針對這種場景,我們就需要使用 ReplaySubject。
ReplaySubject
import { ReplaySubject } from "rxjs"; const subject = new ReplaySubject(2); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
以上程式碼執行後,控制檯的輸出結果:
Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 2 Observer B get value: 3
可能會有人認為 ReplaySubject(1) 是不是等同於 BehaviorSubject,其實它們是不一樣的。在建立BehaviorSubject 物件時,是設定初始值,它用於表示 Subject 物件當前的狀態,而 ReplaySubject 只是事件的重放 。
AsyncSubject
AsyncSubject 類似於last
操作符,它會在 Subject 結束後發出最後一個值,具體示例如下:
import { AsyncSubject } from "rxjs"; const subject = new AsyncSubject(); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); subject.complete(); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
最後我們來介紹一下在 Angular 專案中,RxJS Subject 的應用。
Angular RxJS Subject 應用
在 Angular 中,我們可以利用 RxJS Subject 來實現元件間通訊,具體示例如下:
- message.service.ts
import { Injectable } from '@angular/core'; import { Observable, Subject } from 'rxjs'; @Injectable({ providedIn: 'root' }) export class MessageService { private subject = new Subject<any>(); sendMessage(message: string) { this.subject.next({ text: message }); } clearMessage() { this.subject.next(); } getMessage(): Observable<any> { return this.subject.asObservable(); } }
- home.component.ts
import { Component } from '@angular/core'; import { MessageService } from '../message.service'; @Component({ selector: 'app-home', template: ` <div> <h1>Home</h1> <button (click)="sendMessage()">Send Message</button> <button (click)="clearMessage()">Clear Message</button> </div> ` }) export class HomeComponent { constructor(private messageService: MessageService) { } sendMessage(): void { // 傳送訊息 this.messageService.sendMessage('Message from Home Component to App Component!'); } clearMessage(): void { // 清除訊息 this.messageService.clearMessage(); } }
- app.component.ts
import { Component, OnDestroy } from '@angular/core'; import { Subscription } from 'rxjs'; import { MessageService } from './message.service'; @Component({ selector: 'my-app', template: ` <div *ngIf="message">{{message.text}}</div> <app-home></app-home> ` }) export class AppComponent implements OnDestroy { message: any; subscription: Subscription; constructor(private messageService: MessageService) { this.subscription = this.messageService.getMessage().subscribe(message => { this.message = message; }); } ngOnDestroy() { this.subscription.unsubscribe(); } }
感興趣的同學可以檢視ofollow,noindex">Stackblitz 完整示例。