1. 程式人生 > >RxJS 核心概念之Subject

RxJS 核心概念之Subject

本文出處:https://segmentfault.com/a/1190000005069851,我做了一部分修改

我們在接觸到RxJS的時候,不免會有點暈頭轉向的感覺,對於什麼是Subject,什麼是Observer,什麼是Observable,總感覺暈乎乎的。下面我引用一篇為自認為比較通俗易懂的博文,再加上自己的描述來給大家解釋下,弄明白之後對於學習Angular2+有很大的幫助,因為在angular2+裡,在很多場景裡,RxJS把promise取代了。至於為什麼取代了。請大家檢視這篇文章的前半部分:http://blog.csdn.net/ac_hell/article/details/68069224。

什麼是Subject?

 在RxJS中,Subject是一類特殊的Observable,它可以向多個Observer多路推送數值。普通的Observable並不具備多路推送的能力(每一個Observer都有自己獨立的執行環境),而Subject可以共享一個執行環境。舉個例子:Subject就相當於微信公眾號,它釋出訊息後,當進行推送的時候,Observer就相當於關注了Subject的微信,只要關注它的都能收到訊息,也就是多路推送的意思,而Observable,就相當於個人微訊號,發一次訊息只有另外一個微訊號能收到,這就是Subject和Observable的主要區別

Subject是一種可以多路推送的可觀察物件。與EventEmitter類似,Subject維護著自己的Observer。

每一個Subject都是一個Observable(可觀察物件) 對於一個Subject,你可以訂閱(subscribe)它,Observer會和往常一樣接收到資料。從Observer的視角看,它並不能區分自己的執行環境是普通Observable的單路推送還是基於Subject的多路推送。

Subject的內部實現中,並不會在被訂閱(subscribe)後建立新的執行環境。它僅僅會把新的Observer註冊在由它本身維護的Observer列表中,這和其他語言、庫中的addListener機制類似。

每一個Subject也可以作為Observer(觀察者) Subject同樣也是一個由next(v)

error(e),和 complete()這些方法組成的物件。呼叫next(theValue)方法後,Subject會向所有已經在其上註冊的Observer多路推送theValue

下面的例子中,我們在Subject上註冊了兩個Observer,並且多路推送了一些數值:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

控制檯輸出結果如下:


observerA: 1
observerB: 1
observerA: 2
observerB: 2

既然Subject是一個Observer,你可以把它作為subscribe(訂閱)普通Observable時的引數,如下面例子所示:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以傳遞Subject來訂閱observable

執行後結果如下:


observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

通過上面的實現:我們發現可以通過Subject將普通的Observable單路推送轉換為多路推送。這說明了Subject的作用——作為單路Observable轉變為多路Observable的橋樑。

還有幾種特殊的Subject 型別,分別是BehaviorSubjectReplaySubject,和 AsyncSubject

多路推送的Observable

在以後的語境中,每當提到“多路推送的Observable”,我們特指通過Subject構建的Observable執行環境。否則“普通的Observable”只是一個不會共享執行環境並且被訂閱後才生效的一系列值。

通過使用Subject可以建立擁有相同執行環境的多路的Observable。

下面展示了多路的運作方式:Subject從普通的Observable訂閱了資料,然後其他Observer又訂閱了這個Subject,示例如下:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 通過`subject.subscribe({...})`訂閱Subject的Observer:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 讓Subject從資料來源訂閱開始生效:
multicasted.connect();

multicast方法返回一個類似於Observable的可觀察物件,但是在其被訂閱後,它會表現Subject的特性。 multicast 返回的物件同時是ConnectableObservable型別的,擁有connect() 方法。

connect()方法非常的重要,它決定Observable何時開始執行。由於呼叫connect()後,Observable開始執行,因此,connect()會返回一個Subscription供呼叫者來終止執行。

引用計數

通過手動呼叫connect()返回的Subscription控制執行十分繁雜。通常,我們希望在有第一個Observer訂閱Subject後自動connnect,當所有Observer都取消訂閱後終止這個Subject。

我們來分析一下下面例子中subscription的過程:

  1. 第一個Observer 訂閱了多路推送的 Observable

  2. 多路Observable被連線

  3. 向第一個Observer傳送 值為0next通知

  4. 第二個Observer訂閱了多路推送的 Observable

  5. 向第一個Observer傳送 值為1next通知

  6. 向第二個Observer傳送 值為1next通知

  7. 第一個Observer取消了對多路推送的Observable的訂閱

  8. 向第二個Observer傳送 值為2next通知

  9. 第二個Observer取消了對多路推送的Observable的訂閱

  10. 取消對多路推送的Observable的連線

通過顯式地呼叫connect(),程式碼如下:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); 
}, 2000);

如果你不想顯式地呼叫connect()方法,可以在ConnectableObservable型別的Observable上呼叫refCount()方法。方法會進行引用計數:記錄Observable被訂閱的行為。當訂閱數從 0 到 1refCount() 會呼叫connect() 方法。到訂閱數從1 到 0,他會終止整個執行過程。這裡要解釋一下,這個0,1這些數是怎麼來的,因為interval操作符返回一個在規定的毫秒間隔時間內產生增加數的Observable。通俗點來說,這裡就是每隔500ms就產生一個從0開始自增的數字。

refCount 使得多路推送的Observable在被訂閱後自動執行,在所有觀察者取消訂閱後,停止執行。

下面是示例:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

執行輸出結果如下:


observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

只有ConnectableObservables擁有refCount()方法,呼叫後會返回一個Observable而不是新的ConnectableObservable。

BehaviorSubject

BehaviorSubject是Subject的一個衍生類,具有“最新的值”的概念。它總是儲存最近向資料消費者傳送的值,當一個Observer訂閱後,它會即刻從BehaviorSubject收到“最新的值”。

BehaviorSubjects非常適於表示“隨時間推移的值”。舉一個形象的例子,Subject表示一個人的生日,而Behavior則表示一個人的歲數。(生日只是一天,一個人的歲數會保持到下一次生日之前。)

下面例子中,展示瞭如何用 0初始化BehaviorSubject,當Observer訂閱它時,0是第一個被推送的值。緊接著,在第二個Observer訂閱BehaviorSubject之前,它推送了2,雖然訂閱在推送2之後,但是第二個Observer仍然能接受到2

var subject = new Rx.BehaviorSubject(0 /* 初始值 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

輸出結果如下:


observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject 如同於BehaviorSubject是 Subject 的子類。通過 ReplaySubject可以向新的訂閱者推送舊數值,就像一個錄影機ReplaySubject可以記錄Observable的一部分狀態(過去時間內推送的值)。

.一個ReplaySubject可以記錄Observable執行過程中推送的多個值,並向新的訂閱者回放它們。

你可以指定回放值的數量:

var subject = new Rx.ReplaySubject(3 /* 回放數量 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

輸出如下:


observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

除了回放數量,你也可以以毫秒為單位去指定“視窗時間”,決定ReplaySubject記錄多久以前Observable推送的數值。下面的例子中,我們把回放數量設定為100,把視窗時間設定為500毫秒:

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

第二個Observer接受到3(600ms), 4(800ms) 和 5(1000ms),這些值均在訂閱之前的500毫秒內推送(視窗長度 1000ms - 600ms = 400ms < 500ms):


observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject是Subject的另外一個衍生類,Observable僅會在執行完成後,推送執行環境中的最後一個值。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

輸出結果如下:


observerA: 5
observerB: 5

AsyncSubject 與 last() 操作符相似,等待完成通知後推送執行過程的最後一個值。