深入學習 Node.js EventEmitter
觀察者模式是 ofollow,noindex">軟體設計模式 的一種。在此種模式中,一個目標物件管理所有相依於它的觀察者物件,並且在它本身的狀態改變時主動發出通知。這通常透過呼叫各觀察者所提供的方法來實現。此種模式通常被用來實時事件處理系統。 —— 維基百科
觀察者模式,它定義了一種一對多的關係,讓多個觀察者物件同時監聽某一個主題物件,這個主題物件的狀態發生變化時就會通知所有的觀察者物件,使得它們能夠自動更新自己。
我們可以使用日常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,它們之間的關係如下:
- 期刊出版方 - 負責期刊的出版和發行工作。
- 訂閱者 - 只需執行訂閱操作,新版的期刊釋出後,就會主動收到通知,如果取消訂閱,以後就不會再收到通知。
在觀察者模式中也有兩個主要角色:主題和觀察者,分別對應期刊訂閱例子中的期刊出版方和訂閱者,它們之間的關係圖如下:
觀察者模式的優缺點、應用和實現,這裡就不詳細展開,有興趣的小夥伴可以閱讀本人之前整理的文章 Observable詳解 - Observer Pattern 。
釋出/訂閱模式
在 軟體架構 中, 釋出-訂閱 是一種 訊息 正規化 ,訊息的傳送者(稱為釋出者)不會將訊息直接傳送給特定的接收者(稱為訂閱者)。而是將釋出的訊息分為不同的類別,無需瞭解哪些訂閱者(如果有的話)可能存在。同樣的,訂閱者可以表達對一個或多個類別的興趣,只接收感興趣的訊息,無需瞭解哪些釋出者(如果有的話)存在。—— 維基百科
釋出/訂閱模式與觀察者模式非常類似,它們最大的區別是:釋出者和訂閱者不知道對方的存在。它們之間需要一個第三方元件,叫做資訊中介,它將訂閱者和釋出者串聯起來,它過濾和分配所有輸入的訊息。換句話說,釋出/訂閱模式用來處理不同系統元件的資訊交流,即使這些元件不知道對方的存在。
那麼資訊中介是如何過濾訊息呢?在釋出/訂閱模型中,訂閱者通常接收所有釋出的訊息的一個子集。選擇接受和處理的訊息的過程被稱作過濾。有兩種常用的過濾形式:基於主題的和基於內容的。
- 在 基於主題 的系統中,訊息被髮布到主題或命名通道上。訂閱者將收到其訂閱的主題上的所有訊息,並且所有訂閱同一主題的訂閱者將接收到同樣的訊息。釋出者負責定義訂閱者所訂閱的訊息類別。
- 在 基於內容 的系統中,訂閱者定義其感興趣的訊息的條件,只有當訊息的屬性或內容滿足訂閱者定義的條件時,訊息才會被投遞到該訂閱者。訂閱者需要負責對訊息進行分類。
一些系統支援兩者的混合:釋出者釋出訊息到主題上,而訂閱者將基於內容的訂閱註冊到一個或多個主題上。基於主題的通訊基礎結構圖如下:
最後我們再來總結一下觀察者模式與釋出/訂閱模式之間的區別。
觀察者模式 vs 釋出/訂閱模式
(圖片來源 - developers-club )
觀察者模式與釋出/訂閱模式之間的區別:
- 在觀察者模式中,觀察者知道 Subject 的存在,Subject 一直保持對觀察者進行記錄。然而,在釋出/訂閱模式中,釋出者和訂閱者不知道對方的存在,它們只有通過資訊中介進行通訊。
- 在釋出訂閱模式中,元件是鬆散耦合的,正好和觀察者模式相反。
- 觀察者模式大多數時候是同步的,比如當事件觸發,Subject 就會去呼叫觀察者的方法。而釋出/訂閱模式大多數時候是非同步的(使用訊息佇列)。
Node.js EventEmitter
大多數 Node.js 核心 API 都採用慣用的非同步事件驅動架構,其中某些型別的物件(觸發器)會週期性地觸發命名事件來呼叫函式物件(監聽器)。
例如, net.Server
物件會在每次有新連線時觸發事件; fs.ReadStream
會在檔案被開啟時觸發事件; 流物件 會在資料可讀時觸發事件。
所有能觸發事件的物件都是 EventEmitter
類的例項。 這些物件開放了一個 eventEmitter.on()
函式,允許將一個或多個函式繫結到會被物件觸發的命名事件上。 事件名稱通常是駝峰式的字串,但也可以使用任何有效的 JavaScript 屬性名。
當 EventEmitter
物件觸發一個事件時,所有繫結在該事件上的函式都被同步地呼叫 。 監聽器的返回值會被丟棄。
EventEmitter 基本使用
const EventEmitter = require('events'); class MyEmitter extends EventEmitter {} const myEmitter = new MyEmitter(); myEmitter.on('event', () => { console.log('觸發了一個事件!'); }); myEmitter.emit('event');
以上示例,我們自定義 MyEmitter 類,該類繼承於 EventEmitter 類,接著我們通過使用 new
關鍵字建立了 myEmitter
例項,然後使用 on()
方法監聽 event 事件,最後利用 emit()
方法觸發 event 事件。
小夥伴們,是不是覺得示例很簡單。覺得簡單就對了,我們就從簡單的入手,慢慢深入學習 EventEmitter 類。
EventEmitter 建構函式
function EventEmitter() { EventEmitter.init.call(this); } EventEmitter.usingDomains = false; EventEmitter.prototype._events = undefined; EventEmitter.prototype._eventsCount = 0; // 事件數 EventEmitter.prototype._maxListeners = undefined; // 最大的監聽器數
在 EventEmitter 建構函式內部,會呼叫 EventEmitter.init
方法執行初始化操作, EventEmitter.init
的具體實現如下:
EventEmitter.init = function() { if (this._events === undefined || this._events === Object.getPrototypeOf(this)._events) { this._events = Object.create(null); this._eventsCount = 0; } this._maxListeners = this._maxListeners || undefined; };
在 EventEmitter.init 內部,會根據條件執行初始化操作,比較重要的這行程式碼 this._events = Object.create(null)
,實現過簡單釋出/訂閱模式的小夥伴,估計已經猜到 _events
屬性的作用了,這裡我們就先不繼續討論,我們先來看一下 on()
方法。
EventEmitter on() 方法
EventEmitter.prototype.on = EventEmitter.prototype.addListener; EventEmitter.prototype.addListener = function addListener(type, listener) { return _addListener(this, type, listener, false); };
通過程式碼我們可以發現 EventEmitter 例項上 addListener
和 on
的實現是一樣的,執行時都是呼叫 events.js
檔案內的 _addListener()
函式,它的具體實現如下(程式碼片段):
/** * 新增事件監聽器 * target:EventEmitter 例項 * type:事件型別 * listener:事件監聽器 * prepend:是否新增在前面 */ function _addListener(target, type, listener, prepend) { var m; var events; var existing; // 若監聽器不是函式物件,則丟擲異常 if (typeof listener !== 'function') { const errors = lazyErrors(); throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function'); } events = target._events; // 若target._events物件未定義,則使用Object.create建立一個新的物件 if (events === undefined) { events = target._events = Object.create(null); target._eventsCount = 0; } else { // To avoid recursion in the case that type === "newListener"! Before // adding it to the listeners, first emit "newListener". if (events.newListener !== undefined) { target.emit('newListener', type, listener.listener ? listener.listener : listener); // Re-assign `events` because a newListener handler could have caused the // this._events to be assigned to a new object events = target._events; } existing = events[type]; // 獲取type型別儲存的物件 } if (existing === undefined) { // Optimize the case of one listener. Don't need the extra array object. // 優化單個監聽器的場景,不需使用額外的陣列物件。 existing = events[type] = listener; ++target._eventsCount; } else { if (typeof existing === 'function') { // 新增type前已有繫結監聽器 // Adding the second element, need to change to array. existing = events[type] = prepend ? [listener, existing] : [existing, listener]; // If we've already got an array, just append. } else if (prepend) { // 新增到前面 existing.unshift(listener); } else { // 新增到後面 existing.push(listener); } } return target; }
現在我們來簡單總結一下 _addListener() 方法內部的主要流程:
- 驗證監聽器是否為函式物件。
- 避免型別為 newListener 的事件型別,造成遞迴呼叫。
- 優化單個監聽器的場景,不需使用額外的陣列物件。
- 基於 prepend 引數的值,控制監聽器的新增順序。
這時,相信你已經知道 EventEmitter 例項中 _events
屬性的作用了,即用來以 Key-Value 的形式來儲存指定的事件型別與對應的監聽器。具體可以參考下圖(myEmitter.on(‘event’, ()=>{} 內部執行情況):
繫結完事件,如果要派發事件,就可以呼叫 EventEmitter 例項的 emit() 方法,該方法的實現如下(程式碼片段):
EventEmitter.prototype.emit = function emit(type, ...args) { let doError = (type === 'error'); const events = this._events; const handler = events[type]; // 獲取type型別對應的處理器 if (handler === undefined) return false; // 若事件處理器為函式物件,則使用Reflect.apply進行呼叫 if (typeof handler === 'function') { Reflect.apply(handler, this, args); } else { const len = handler.length; const listeners = arrayClone(handler, len); for (var i = 0; i < len; ++i) Reflect.apply(listeners[i], this, args); } return true; }; // 陣列淺拷貝 function arrayClone(arr, n) { var copy = new Array(n); for (var i = 0; i < n; ++i) copy[i] = arr[i]; return copy; }
emit() 方法內部實現還是挺簡單的,先根據事件型別獲取對應的處理器,然後根據事件處理器的型別,進行進一步處理。需要注意的是,呼叫處理器是通過 Reflect 物件提供的 apply()
方法來實現。
Reflect.apply() 方法的簽名如下:
Reflect.apply(target, thisArgument, argumentsList)
- target —— 目標函式。
- thisArgument —— target 函式呼叫時繫結的 this 物件。
- argumentsList —— target 函式呼叫時傳入的實參列表,該引數應該是一個類陣列的物件。
如果對 Reflect 物件感興趣的小夥伴,可以參考 JavaScript/Reference/Global_Objects/Reflect" target="_blank" rel="nofollow,noindex">MDN - Reflect 物件 。
到這裡前面的簡單的示例,我們已經分析完了。我們已經知道通過 EventEmitter 例項的 on()
方法可以用來新增事件監聽,但有些時候,我們也需要在某些情況下移除對應的監聽。針對這種需求,我們就需要利用 EventEmitter 例項的 removeListener()
方法了。
EventEmitter removeListener() 方法
removeListener()
方法最多隻會從監聽器數組裡移除一個監聽器例項。 如果任何單一的監聽器被多次新增到指定 type
的監聽器陣列中,則必須多次呼叫 removeListener()
方法才能移除每個例項。為了方便一次性移除 type
對應的監聽器,EventEmitter 為我們提供了 removeAllListeners()
方法。
下面我們來看一下 removeListener() 方法的具體實現(程式碼片段):
// Emits a 'removeListener' event if and only if the listener was removed. EventEmitter.prototype.removeListener = function removeListener(type, listener) { var list, events, position, i, originalListener; // 若監聽器不是函式物件,則丟擲異常 if (typeof listener !== 'function') { const errors = lazyErrors(); throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function'); } events = this._events; if (events === undefined) return this; list = events[type]; // 獲取type對應的繫結物件 if (list === undefined) return this; if (list === listener || list.listener === listener) { if (--this._eventsCount === 0) // 只繫結一個監聽器 this._events = Object.create(null); else { delete events[type]; // 若已設定removeListener監聽器,則觸發removeListener事件 if (events.removeListener) this.emit('removeListener', type, list.listener || listener); } } else if (typeof list !== 'function') { // 包含多個監聽器 position = -1; for (i = list.length - 1; i >= 0; i--) { // 獲取需移除listener對應的索引值 if (list[i] === listener || list[i].listener === listener) { originalListener = list[i].listener; position = i; break; } } if (position < 0) return this; if (position === 0) list.shift(); else { if (spliceOne === undefined) spliceOne = require('internal/util').spliceOne; // 呼叫內建的spliceOne移除position對應的值 spliceOne(list, position); } if (list.length === 1) events[type] = list[0]; if (events.removeListener !== undefined) this.emit('removeListener', type, originalListener || listener); } return this; };
通過程式碼我們發現在呼叫 removeListener()
方法時,若 type 事件型別上繫結多個事件處理器,那麼內部處理程式會先根據 listener
事件處理器,查詢該事件處理器對應的索引值,若該索引值大於 0,則會呼叫 Node.js 內部工具庫提供的 spliceOne() 方法,移除對應的事件處理器。為什麼不直接利用 Array#splice() 方法呢?官方的回答是 spliceOne() 方法的執行速度比 Array#splice() 快大約 1.5 倍。
spliceOne() 方法具體實現如下:
// About 1.5x faster than the two-arg version of Array#splice(). function spliceOne(list, index) { for (var i = index, k = i + 1, n = list.length; k < n; i += 1, k += 1) list[i] = list[k]; list.pop(); // 把最後面的空位移除 }
感興趣的小夥伴,可以實際對比一下 Array#splice() 與 spliceOne() 的效能哈。最後我們來介紹一下 EventEmitter 另一個常用的方法 once()。
EventEmitter once() 方法
有些時候,對於一些特殊的事件型別,我們只需執行一次事件處理器,這時我們就可以使用 once() 方法:
const myEmitter = new MyEmitter(); let m = 0; myEmitter.once('event', () => { console.log(++m); }); myEmitter.emit('event'); // 列印: 1 myEmitter.emit('event'); // 無輸出
以上程式碼很簡單,廢話不多說,我們直接看一下 once 函式的具體實現:
EventEmitter.prototype.once = function once(type, listener) { if (typeof listener !== 'function') { const errors = lazyErrors(); throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function'); } this.on(type, _onceWrap(this, type, listener)); return this; };
通過原始碼可以發現,once() 函式內部也是通過呼叫 on()
方法來繫結事件監聽器。特別之處是,內部使用 _onceWrap
函式對 listener 函式進行進一步封裝。那我們只能繼續發掘 _onceWrap
函式,該函式的實現如下:
function _onceWrap(target, type, listener) { var state = { fired: false, wrapFn: undefined, target, type, listener }; var wrapped = onceWrapper.bind(state); // 繫結this上下文 wrapped.listener = listener; state.wrapFn = wrapped; return wrapped; }
在 _onceWrap 函式內部,我們建立了一個 state 物件,該物件有一個 fired
屬性,用來標識是否已觸發,其預設值是 false。一開始還以為內部實現都包含在 _onceWrap 函式內,沒想到竟然又來了個 onceWrapper 函式物件。為了能夠揭開 once() 的神祕面紗,只能繼續前進了。onceWrapper 函式的實現如下:
function onceWrapper(...args) { if (!this.fired) { this.target.removeListener(this.type, this.wrapFn); this.fired = true; Reflect.apply(this.listener, this.target, args); } }
守得雲開見月明,終於見到 onceWrapper 函式的廬山真面目。在函式體中,若發現事件處理器未被呼叫,則先移除事件監聽器並設定 fired 欄位值為 true,然後利用之前介紹的 Reflect.apply() 方法呼叫 type 事件型別,對應的事件處理器。至此,EventEmitter 的探索之旅,就落下的帷幕,想繼續瞭解 EventEmitter 的小夥伴,可以查閱官方文件或 EventEmitter 對應的原始碼。
總結
為了能夠更好地理解 EventEmitter 的設計思想,首先我們介紹了觀察者模式與釋出/訂閱模式,然後對比了它們之間的區別。接著我們以一個簡單的示例為切入點,介紹了 EventEmitter 的 on()、emit()、removeListener() 和 once() 方法的使用及內部實現。
如果小夥伴們也對 EventEmitter 原始碼感興趣,建議採用閱讀和除錯相結合的方式,進行原始碼學習。詳細的除錯方式,請參考 Debugging Node.js Apps 文章。