1. 程式人生 > >Node 中的 events.EventEmitter 模組

Node 中的 events.EventEmitter 模組

http://imweb.io/topic/5973722452e1c21811630609

// 中文api

http://nodejs.cn/api/stream.html#stream_readable_pipe_destination_options

 

// 關於 stream.pipe 你需要知道更多

http://blog.kankanan.com/article/51734e8e-stream.pipe-4f609700898177e5905366f4591a.html

[上面這篇文章有我關心的問題,即當用stream.pipe時,是否需要手動關閉流,結論是需要手動關閉,但是像上面文章的寫法,在實際中好像並沒有真正的被呼叫,也沒有destroy,

    stream.pipe(res).once('close', function () {
        stream.destroy();
    });

所以換個寫法是好用的,不確定是因為版本的問題還是什麼原因

    stream.pipe(res);
    stream.on('close', function () {
        stream.destroy();
    });

]

//Node.js中不可不精的Stream

https://juejin.im/post/5b14aa536fb9a01e681056e5

 

// 深入理解 Node.js Stream 內部機制, 下面的文章比較深入

http://taobaofed.org/blog/2017/08/31/nodejs-stream/

下面這篇文章的例子很不錯

 

Node 中的許多核心 API 都是通過事件驅動的非同步架構實現的,具體來說就是當 emitters 傳送事件後,相應的響應函式( listeners )會被執行。例如:net.Server 會在每次收到連線時發出事件,fs.ReadStram 會在檔案開啟時發出事件,stram會在有資料可讀時發出事件。 所有這些物件都是 EventEmitter 的例項,它們通過向外暴露的 eventEmitter.on()

 介面從而讓不同的事件響應函式得以執行。

基本的使用

on 和 emit 方法

events 模組有且只有一個物件 events.EventEmitter,它的核心功能就是事件的觸發(emit)和事件的監聽(on),一個簡單的例子如下:

const EventEmitter = require('events');

let emitter = new EventEmitter();

emitter.on('hi', (name) => {
    console.log(`hi, my name is ${name}!`);
});

emitter.emit('hi', 'elvin');

在上述的例子中,我們通過 emitter.on('hi', func) 的方式註冊了 hi 事件的監聽函式,通過 emitter.emit('hi', 'elvin') 的方式觸發了 hi 事件,且會向事件處理函式傳遞引數 'elvin',所以最後的執行結果為 hi, my name is elvin!。 這裡需要說明的時,EventEmitter 還有一個 addeListener 的方法,它只不過是 on 方法的別名,兩者沒有任何區別。

once 方法

有些時候,我們希望某些事件響應函式只被執行一次,這個時候就可以使用 once() 方法,它會和 on() 一樣註冊事件的響應函式,不過當響應函式執行一次之後,就會將其移除。

const EventEmitter = require('events');

let emitter = new EventEmitter();

emitter.once('hi', (name) => {
    console.log(`hi, my name is ${name}!`);
});

emitter.emit('hi', 'elvin');
emitter.emit('hi', 'leonard');

上面的例子中只會輸出 hi, my name is elvin!(leonard 很高冷,不屑於向你打招呼┗( ▔, ▔ )┛)。

prependListener 方法

當一個事件綁定了多個響應函式時,會按照函式繫結的順序依次執行,除非響應函式是通過 prependListener() 方法繫結的,它使用的方式和 on() 類似,不過會將響應函式插到當前該事件處理函式佇列的頭部,具體的例子如下:

const EventEmitter = require('events');

let emitter = new EventEmitter();

emitter.on('hi', (name) => {
    console.log(`my name is ${name}!`);
});

emitter.on('hi', () => {
    console.log('I\'m from Wuhan.');
});

emitter.prependListener('hi', () => {
    console.log('nice to meet you!');
});

emitter.on('hi', () => {
    console.log('What\'s your name?');
});

emitter.emit('hi', 'elvin');

// 輸出為:
// nice to meet you!
// my name is elvin.
// I'm from Wuhan.
// What\'s your name?

響應函式的數量

因為繫結過多的響應函式會消耗大量的記憶體,所以為了避免記憶體洩漏,在 Event.EventEmitter中一個事件可以繫結的響應函式數量是存在限制的,相關的屬性和方法如下:

  • EventEmitter.defaultMaxListeners: 預設值為10, 表示每個事件的最多可以繫結的響應函式數量。需要注意的是,當修改它時,會影響所有 EventEmitter 的例項。
  • emitter.listenerCount(eventName):獲取事件 eventName 已繫結的響應函式個數。
  • emitter.setMaxListeners(n):修改 emitter 的每個事件最多可以繫結的響應函式數量,該方法會修改 emitter._maxListeners 的值,其優先順序大於 *EventEmitter.defaultMaxListeners 。
  • emitter.getMaxListeners():獲取 emitter 每個事件最多可以繫結的響應函式數量。

其他相關方法

EventEmitter 還有一些其他的方法和屬性,這裡就不做具體介紹,簡要地說一下。

  • emitter.eventNames():返回當前已經繫結響應函式的事件名組成的陣列。
  • emitter.listeners(eventName):返回 eventName 事件的響應函式組成的陣列。
  • emitter.prependOnceListener(eventName, listener):類似於 once(),不過會將響應函式插到當前該事件處理函式佇列的頭部。
  • emitter.removeAllListeners([eventName]):移除 eventName 事件所有的響應函式。當未傳入 eventName 引數時,所有事件的響應函式都會被移除。
  • emitter.removeListener(eventName, listener):移除 eventName 事件的響應函式 listener

newListener 和 removeListener 事件

當 emitter 被註冊響應函式時,會觸發 newListener 事件;被移除響應函式時,會觸發 removeListener 事件。兩個事件的響應函式會被傳入兩個引數:註冊的事件名和響應的響應函式。具體的例子如下:

const EventEmitter = require('events');

let emitter = new EventEmitter();

// 注意:此處使用 emitter.on 方法的話會陷入迴圈呼叫,導致棧溢位
emitter.once('newListener', (event, listener) => {
    if (event === 'hi') {
        emitter.on('hi', () => {
            console.log('Nice to meet you.');
        });
    }
});

emitter.on('hi', (name) => {
    console.log(`My name is ${name}!`);
});

emitter.emit('hi', 'elvin');

執行結果為 Nice to meet you. My name is elvin.。實際上, newListener 事件被觸發時,響應函式還未被註冊至 emitter,因而我們就可以在在目標響應函式之前插入其他響應函式,例如上面的例子中 Nice to meet you. 就在 My name is elvin. 之前進行輸出。

在 Node 原始碼中的使用

如在開頭所說,net.Serverfs.ReadStramstream 等 Node 內建物件都是 EventEmitter 的例項,它們通過向外暴露的 eventEmitter.on() 介面從而讓不同的事件響應函式得以執行。這裡以 stream 的部分原始碼為例,講講 events.EventEmitter 在 Node 中的使用。

// stream 部分原始碼 2017/02/22 版
// source: https://github.com/nodejs/node/blob/master/lib/internal/streams/legacy.js
const EE = require('events');
const util = require('util'); // Node 的輔助函式庫

function Stream() {
  EE.call(this);
}
util.inherits(Stream, EE);

Stream.prototype.pipe = function(dest, options) {
  var source = this;

  function ondata(chunk) {
    if (dest.writable) {
      if (false === dest.write(chunk) && source.pause) {
        source.pause();
      }
    }
  }

  source.on('data', ondata);

  function ondrain() {
    if (source.readable && source.resume) {
      source.resume();
    }
  }

  dest.on('drain', ondrain);

  // If the 'end' option is not supplied, dest.end() will be called when
  // source gets the 'end' or 'close' events.  Only dest.end() once.
  if (!dest._isStdio && (!options || options.end !== false)) {
    source.on('end', onend);
    source.on('close', onclose);
  }

  var didOnEnd = false;
  function onend() {
    if (didOnEnd) return;
    didOnEnd = true;

    dest.end();
  }


  function onclose() {
    if (didOnEnd) return;
    didOnEnd = true;

    if (typeof dest.destroy === 'function') dest.destroy();
  }

  // don't leave dangling pipes when there are errors.
  function onerror(er) {
    cleanup();
    if (EE.listenerCount(this, 'error') === 0) {
      throw er; // Unhandled stream error in pipe.
    }
  }

  source.on('error', onerror);
  dest.on('error', onerror);

  // remove all the event listeners that were added.
  function cleanup() {
    source.removeListener('data', ondata);
    dest.removeListener('drain', ondrain);

    source.removeListener('end', onend);
    source.removeListener('close', onclose);

    source.removeListener('error', onerror);
    dest.removeListener('error', onerror);

    source.removeListener('end', cleanup);
    source.removeListener('close', cleanup);

    dest.removeListener('close', cleanup);
  }

  source.on('end', cleanup);
  source.on('close', cleanup);

  dest.on('close', cleanup);
  dest.emit('pipe', source);

  // Allow for unix-like usage: A.pipe(B).pipe(C)
  return dest;
};

module.exports = Stream;

上述程式碼主要完成了三件事情:

  1. 通過在 Stream 的建構函式中呼叫 EE.call(this) 和利用 util.inherits(Stream, EE); 呼叫,讓 Sever 繼承自 EventEmitter;
  2. 通過 on 方法在資料來源 sourse 上註冊了 data、end、close、error 等事件的響應函式,在資料目的源 dest 上註冊了 drain、end、close、error 等事件的響應函式;
  3. 在完成初始化和響應函式註冊後,向資料目的源發出 pipe 事件。

ES6 中的使用方式

如上節所示,在 Node 中都是通過 util.inherits(Stream, EventEmitter); 來實現繼承,但是在 Node 的官方文件中,該種方式已不被推薦。更推薦的做法是通過 ES6 中的 class 和 extends 來實現繼承:

const EventEmitter = require('events');

class MyStream extends EventEmitter {
  write(data) {
    this.emit('data', data);
  }
}

const stream = new MyStream();

stream.on('data', (data) => {
  console.log(`Received data: "${data}"`);
});
stream.write('With ES6');


// output: Received data: "With ES6"

寫在最後

事件驅動、非非同步 I/O 的特點成就瞭如今的 Node,而 Node 中事件驅動依靠的就是 events.EventEmitter!需要說明的是,events 和 events.EventEmitter 其實指向的都是 EventEmitter,之所以會有 events.EventEmitter 只是為了保持對 Node 0.10.x 版本的相容。

參考連結

  1. Node.js Documentation - evnets