1. 程式人生 > >EventEmitter 模組原始碼分析

EventEmitter 模組原始碼分析

 

簡介

EventEmitter 模組是node中經常用到模組,主要是為了實現監聽事件。像在koa、express中就經常看到這樣的監聽事件。

app.on('3000', () => {
    console.log('the server is running')
})

因為大多數 Node.js 核心 API 都採用慣用的非同步事件驅動架構,所以需要監聽某個階段來告知當前的程序,或者觸發一些邏輯事件。

// 普通的觸發事件
const EventEmitter = require('events');
class Test extends EventEmitter {}
const test = new Test();

test.on('get', val => {
  console.log('觸發了get事件!', val);
});

test.emit('get', '123');
// 輸出 '觸發了get事件! 123'

原始碼分析

首先先從61行看起,這一行首先定義了3個關鍵詞

EventEmitter.prototype._events = undefined;
EventEmitter.prototype._eventsCount = 0;
EventEmitter.prototype._maxListeners = undefined;

_events:顧名思義是存放的是事件物件組。至於內部結構後面再分析。

_eventsCount:記錄存放事件組的個數

_maxListeners:一個監聽中的最大監聽數

接下來是一些初始化操作


// By default EventEmitters will print a warning if more than 10 listeners are
// added to it. This is a useful default which helps finding memory leaks.
let defaultMaxListeners = 10;

// 初始化:定義defaultMaxListeners引數
Object.defineProperty(EventEmitter, 'defaultMaxListeners', {
  enumerable: true, // 可被列舉的
  get() {
    // 得到最大監聽數
    return defaultMaxListeners;
  },
  set(arg) {
    // 設定set校驗
    if (typeof arg !== 'number' || arg < 0 || NumberIsNaN(arg)) {
      throw new RangeError(
        'The value of "defaultMaxListeners" is out of range. It must be a non-negative number. Received ' +
          arg +
          '.'
      );
    }
    defaultMaxListeners = arg;
  },
});

EventEmitter.init = function() {
  // prototype 沒有events, 或者 getPrototype 上面沒有events(es5)
  if (
    this._events === undefined ||
    this._events === Object.getPrototypeOf(this)._events
  ) {
    // 沒有則建立一個空物件
    this._events = Object.create(null);
    // events數量置為0
    this._eventsCount = 0;
  }

  this._maxListeners = this._maxListeners || undefined;
};

這裡主要就是為了初始化上面三項基礎資料。這樣一個新的例項上的幾項屬性即為:空的監聽物件組、0個監聽事件個數、10個最大同時監聽數(未初始化前是undefined)。

// Obviously not all Emitters should be limited to 10. This function allows
// that to be increased. Set to zero for unlimited.
EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) {
  if (typeof n !== 'number' || n < 0 || NumberIsNaN(n)) {
    throw new RangeError(
      'The value of "n" is out of range. It must be a non-negative number. Received ' +
        n +
        '.'
    );
  }
  this._maxListeners = n;
  return this;
};

這裡即是設定例項中的最大同時監聽數,如果引數非數字的話則會丟擲錯誤。

function $getMaxListeners(that) {
  if (that._maxListeners === undefined) {
    return EventEmitter.defaultMaxListeners;
  }
  return that._maxListeners;
}

EventEmitter.prototype.getMaxListeners = function getMaxListeners() {
  return $getMaxListeners(this);
};

返回當前最大監聽數,如果未例項化則返回的是建構函式上的 defaultMaxListeners ,即是 10。如果例項後則返回的是例項上的defaultMaxListeners。

以上都是對Emitter的一些初始化和暴露出例項的初始化操作,這些都是在例項那一步完成的。光靠這些我們是無法斷定events是如何實現事件監聽的,以及存放的資料結構的,那就接下來繼續往下看。

接下來便是暴露的操作方法,其中便有on、emit、once等。首先這裡先從on開始看起。

EventEmitter.prototype.on = EventEmitter.prototype.addListener;

這裡on指向的是addListener

EventEmitter.prototype.addListener = function addListener(type, listener) {
  return _addListener(this, type, listener, false);
};

呼叫的是_addListener這個方法。這裡傳入了兩個引數,分別是type、listener。

test.on('get', val => {
  console.log('觸發了get事件!', val);
});

即之前在外面呼叫的。然後分析這兩個引數即是我們傳的‘事件名’、‘觸發事件’。

而後又呼叫了_addListener這個方法,將this,‘事件名’,‘觸發事件’,一個布林值傳了進去。現在還不知道這個布林值拿來幹嘛,那就接下來往下看_addListener這個方法。

function _addListener(target, type, listener, prepend) {
  var m;
  var events;
  var existing;

  if (typeof listener !== 'function') {
    throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
  }

  events = target._events;
  if (events === undefined) {
    events = target._events = Object.create(null);
    target._eventsCount = 0;
  } else {
    // To avoid recursion in the case that type === "newListener"! Before
    // adding it to the listeners, first emit "newListener".
    if (events.newListener !== undefined) {
      target.emit('newListener', type,
                  listener.listener ? listener.listener : listener);

      // Re-assign `events` because a newListener handler could have caused the
      // this._events to be assigned to a new object
      events = target._events;
    }
    existing = events[type];
  }

  if (existing === undefined) {
    // Optimize the case of one listener. Don't need the extra array object.
    existing = events[type] = listener;
    ++target._eventsCount;
  } else {
    if (typeof existing === 'function') {
      // Adding the second element, need to change to array.
      existing = events[type] =
        prepend ? [listener, existing] : [existing, listener];
      // If we've already got an array, just append.
    } else if (prepend) {
      existing.unshift(listener);
    } else {
      existing.push(listener);
    }

    // Check for listener leak
    m = $getMaxListeners(target);
    if (m > 0 && existing.length > m && !existing.warned) {
      existing.warned = true;
      // No error code for this since it is a Warning
      // eslint-disable-next-line no-restricted-syntax
      var w = new Error('Possible EventEmitter memory leak detected. ' +
                          existing.length + ' ' + String(type) + ' listeners ' +
                          'added. Use emitter.setMaxListeners() to ' +
                          'increase limit');
      w.name = 'MaxListenersExceededWarning';
      w.emitter = target;
      w.type = type;
      w.count = existing.length;
      ProcessEmitWarning(w);
    }
  }

  return target;
}

這裡先初始化m、events、existing這三個引數。

然後判斷觸發事件是否為一個function型別,因此在on這一步中第二個引數就必須傳一個方法型別,不然就會丟擲錯誤。

下面初始化事件組:

events = target._events

這裡即會得到一個物件,物件中存放了觸發事件組或空物件(即沒有觸發值)。

而後又對events作了一系列判斷

if (events === undefined) {
    events = target._events = Object.create(null);
    target._eventsCount = 0;
  } else {
    // newListener為監聽欄位,當建立好newListener後會執行newListener監聽事件
    if (events.newListener !== undefined) {
      target.emit(
        'newListener',
        type,
        listener.listener ? listener.listener : listener
      );
      // 重新註冊events 因為 newListener 鉤子可能導致this._events 去重新註冊個新物件。詳情請看EventEmitter.prototype.emit
      events = target._events;
    }
    existing = events[type];
  }

從這裡即可看出當有時間被掛載上,即觸發on事件後, 會觸發一個 newListener 事件。因此 newListener 可以用來檢測掛載的事件是否正確觸發。

這裡將上一次這個觸發事件賦值給existing這個變數,當然也存在上一次不存在這個事件key值。

  if (existing === undefined) {
    existing = events[type] = listener;
    // 遞增呼叫次數
    ++target._eventsCount;
  } else {
    if (typeof existing === 'function') {
      // 如果有prepend引數,則現在插入引數放在最前面呼叫
      existing = events[type] = prepend
        ? [ listener, existing ]
        : [ existing, listener ];
      // If we've already got an array, just append.
    } else if (prepend) {
      // 如果是個陣列,並且設了prepend,則unshift至陣列
      existing.unshift(listener);
    } else {
      existing.push(listener);
    }

    // 3,校驗當前監聽數量
    m = $getMaxListeners(target);
    if (m > 0 && existing.length > m && !existing.warned) {
      existing.warned = true;
      const w = new Error(
        'Possible EventEmitter memory leak detected. ' +
          existing.length +
          ' ' +
          String(type) +
          ' listeners ' +
          'added. Use emitter.setMaxListeners() to ' +
          'increase limit'
      );
      w.name = 'MaxListenersExceededWarning';
      w.emitter = target;
      w.type = type;
      w.count = existing.length;
      ProcessEmitWarning(w);
    }
  }

這裡則對上面的 existing 做了判斷,這裡共有兩種情況

如果該觸發事件沒有,則在 this._events 上面進行新增,並且自增一個觸發事件對。

如果該觸發事件存在,則把這個 之前Function( 觸發事件 ) 和現在 Function , 放在一個數組中,這裡還用到prepend引數,之前給的布林值會在這邊做出判斷到底是push 還是 unshift,不同的放置,後面的觸發順序也會不同。那這裡 addListener 方法對 prepend 已經固定是 false 了,也就是說以 on 方法來新增觸發事件預設是放在陣列後面。那後面也有用來前置的方式:

EventEmitter.prototype.prependListener =
  // prepend 呼叫
  function prependListener(type, listener) {
    return _addListener(this, type, listener, true);
  };

prependListener即可以將同名的觸發事件前置。這也是為什麼之前將prependListener這個方法提取出來。

從以上可以即可分析出 this._events 物件中的格式:

_events = {
    type: Function,
    type2: [ Function, Function, ... ],
}

繼續看下面:

m = $getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
  const w = new Error(
    'Possible EventEmitter memory leak detected. ' +
      existing.length +
      ' ' +
      String(type) +
      ' listeners ' +
      'added. Use emitter.setMaxListeners() to ' +
      'increase limit'
  );
  w.name = 'MaxListenersExceededWarning';
  w.emitter = target;
  w.type = type;
  w.count = existing.length;
  ProcessEmitWarning(w);
}

function $getMaxListeners(that) {
  // 還未例項化的話則返回建構函式的預設最大監聽數
  if (that._maxListeners === undefined) {
    return EventEmitter.defaultMaxListeners;
  }
  return that._maxListeners;
}

function ProcessEmitWarning(warning) {
  if (console && console.warn) console.warn(warning);
}

在這裡檢測了當前連線數,如果超過則會丟擲警告,但並不會進行攔截觸發事件。

最後返回該例項。

接著來看emit觸發事件。

EventEmitter.prototype.emit = function emit(type) {
  const args = [];
  for (var i = 1; i < arguments.length; i++) args.push(arguments[i]);
  let doError = (type === 'error');

  const events = this._events;
  if (events !== undefined) { doError = (doError && events.error === undefined); } else if (!doError) { return false; }

  // If there is no 'error' event listener then throw.
  if (doError) {
    let er;
    if (args.length > 0) { er = args[0]; }
    if (er instanceof Error) {
      // Note: The comments on the `throw` lines are intentional, they show
      // up in Node's output if this results in an unhandled exception.
      throw er; // Unhandled 'error' event
    }
    // At least give some kind of context to the user
    const err = new Error('Unhandled error.' + (er ? ' (' + er.message + ')' : ''));
    err.context = er;
    throw err; // Unhandled 'error' event
  }

  const handler = events[type];

  if (handler === undefined) { return false; }

  if (typeof handler === 'function') {
    ReflectApply(handler, this, args);
  } else {
    const len = handler.length;
    const listeners = arrayClone(handler, len);
    for (var i = 0; i < len; ++i) { ReflectApply(listeners[i], this, args); }
  }

  return true;
};

從這邊可以想到之前emit是如何傳參的。

test.emit('get', '123');

get即為監聽名,123為觸發事件的傳的引數,其實這裡後面可以寫多個引數,類似test.emit('get', '123', '456', '789'); 這樣除第一個引數都可以在觸發器中接收到。

這裡首先將所有的引數放置在一個數組中,然後接下來對觸發名稱為' error ' 的事件做了單獨的判斷,如果之前自己沒有定義' error ' 監聽事件的話,EventEmitter 會把這個監聽處理成一個異常觸發事件,當error被觸發時,nodejs就會退出程式並輸出錯誤資訊,因此一般吧error事件設定成監聽器,避免遇到錯誤後整個程式崩潰。

然後檢視之前的事件物件組是否有這個觸發事件,由於之前新增觸發事件只會存在Function或陣列Function,因此如果存在Function則直接呼叫ReflectApply方法,如果是個陣列的話則去迴圈執行ReflectApply。

ReflectApply方法:

const R = typeof Reflect === 'object' ? Reflect : null;
const ReflectApply =
  R && typeof R.apply === 'function'
    ?
    R.apply
    : function ReflectApply(target, receiver, args) {
      return Function.prototype.apply.call(target, receiver, args);
    };

這裡可以看到有個Reflect.apply,其方式就類似於Function.prototype.apply.call,這裡可以理解為[ target ].apply(receiver, arg)。

因此這裡就是觸發例項上的觸發事件。

 

再看移除監聽  off  方法:

EventEmitter.prototype.off = EventEmitter.prototype.removeListener;

EventEmitter.prototype.removeListener =
  function removeListener(type, listener) {
    let list, events, position, i, originalListener;

    if (typeof listener !== 'function') {
      throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
    }

    events = this._events;
    if (events === undefined)
      {return this;}

    list = events[type];
    if (list === undefined)
      {return this;}
    // 如果匹配到監聽事件key: Function
    if (list === listener || list.listener === listener) {
      if (--this._eventsCount === 0) // 監聽事件組的個數減一
        // 如果為0 則已經移除所有監聽,這裡重置監聽物件組
        {this._events = Object.create(null);}
      else {
        // 常規刪除
        delete events[type];
        if (events.removeListener) // 觸發reomoveListener,可以在外部對其進行監聽
          {this.emit('removeListener', type, list.listener || listener);}
      }
    } else if (typeof list !== 'function') {
      /* 如果匹配到的是個陣列
          下面的操作即是刪除陣列中對應的觸發事件
      */
      position = -1;

      for (i = list.length - 1; i >= 0; i--) {
        if (list[i] === listener || list[i].listener === listener) {
          originalListener = list[i].listener;
          position = i;
          break;
        }
      }

      if (position < 0)
        {return this;}

      if (position === 0)
        {list.shift();}
      else {
        spliceOne(list, position);
      }
      // 如果還剩唯一一個,則把陣列Function 再次轉化為 Function
      if (list.length === 1)
        {events[type] = list[0];}

      if (events.removeListener !== undefined)
        {this.emit('removeListener', type, originalListener || listener);}
    }

    return this;
  };

在刪除中,必須傳入監聽名和觸發函式,這樣主要目的就是要精確在一些陣列Function中刪除指定的觸發事件。

once  方法:

EventEmitter.prototype.once = function once(type, listener) {
  if (typeof listener !== 'function') {
    throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
  }
  this.on(type, _onceWrap(this, type, listener));
  return this;
};

function _onceWrap(target, type, listener) {
  let state = { fired: false, wrapFn: undefined, target, type, listener };
  let wrapped = onceWrapper.bind(state);    
  wrapped.listener = listener;
  state.wrapFn = wrapped;
  return wrapped;
}

function onceWrapper() {
  let args = [];
  for (let i = 0; i < arguments.length; i++) args.push(arguments[i]);
  if (!this.fired) {
    this.target.removeListener(this.type, this.wrapFn);
    this.fired = true;
    ReflectApply(this.listener, this.target, args);
  }
}

這裡用到了閉包,通過onceWrapper.bind(state)掛載了一個返回值為{ [Function: bound onceWrapper] listener: [Function] } 的可執行的物件,如果直接執行該物件可觸發 onceWrapper 函式,而 onceWrapper 函式上的this綁定了該觸發事件的的一些屬性,其中 fired 便這個屬性通過第一次執行後將置為true,並且移除了該觸發事件。而後面listener則是記錄的觸發事件,由於這個函式並非常規的Function 或 陣列Function ,因此需要一個listener來進行儲存,當新增或刪除的時候,就需要這個listener來代替。

// 新增
if (events.newListener !== undefined) {
  target.emit('newListener', type,
      listener.listener ? listener.listener : listener);
}

// 刪除  
if (events.removeListener)
    this.emit('removeListener', type, list.listener || listener);
}

 

關鍵的幾個就是這些啦,其他的例如removeAllListeners、listenerCount、eventNames都是相對比較簡單的,可以從上面的內容進行推導,就不多做分析了。

總結

EventEmitter是屬於比較元老級的模組,所以在程式碼中很多功能都是用原生js實現的,但相容性是非常好。適合用在非同步IO中。對於普通的回撥比起來,EventEmitter表現更為獨立。原始碼並不複雜,但是其中的很多思想值得去學習和借鑑。