1. 程式人生 > >NodeJS原始碼分析(3)

NodeJS原始碼分析(3)

Node Stream模組

Stream在平時業務開發時很少用到, 但是很多模組都是基於stream實現的,引用官方文件的解釋:

流(stream)在 Node.js 中是處理流資料的抽象介面(abstract interface)。

stream 模組提供了基礎的 API 。使用這些 API 可以很容易地來構建實現流介面的物件。

流可以是可讀的、可寫的,或是可讀寫的。所有的流都是 EventEmitter 的例項。

為什麼應該使用Stream

先來看一段程式碼:

這段程式碼有什麼問題, 看似是沒有問題的。

如果data.txt檔案體積非常大,nodejs讀入記憶體當中,然後全部取出

這樣會對效能造成很大影響

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

經過優化後代碼如下:

這段將data.txt一段一段的傳送到使用者端

這樣減少了很多的伺服器壓力

var http = require
('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { let stream = fs.createReadStream(__dirname + '/data.txt');//創造可讀流 stream.pipe(res);//將可讀流寫入response }); server.listen(8000);

管道流Pipe

管道提供了一個輸出流到輸入流的機制, 從獲取到資料傳入另外一個流
無論哪一種流,都會使用.pipe()方法來實現輸入和輸出。

讀取input.txt檔案流

hello world

var fs = require("fs");

// 建立一個可讀流
var readerStream = fs.createReadStream('input.txt');

// 建立一個可寫流
var writerStream = fs.createWriteStream('output.txt');

// 管道讀寫操作
// 讀取 input.txt 檔案內容,並將內容寫入到 output.txt 檔案中
readerStream.pipe(writerStream);

console.log("程式執行完畢");

檢視輸出檔案流output.txt

hello world

如果多檔案還可進行鏈式操作:

程式碼如下:

a.pipe(b);
b.pipe(c);
c.pipe(d);

//上面程式碼等價於這樣
a.pipe(b).pipe(c).pipe(d)

readable 可讀操作

Readable流可以產出資料流,你可以將這些資料傳送到一個writable,transform, duplex,並呼叫pipe()方法:

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout); //輸出: beep boop

在上面的程式碼中rs.push(null)的作用是告訴rs輸出資料應該結束了。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);//輸出 abcdefghijklmnopqrstuvwxyz

還可以通過監聽事件readable,觸發時手工讀取chunk資料:

一旦註冊了readable事件,必須手工讀取read資料,否則資料就會流失

var Read = require('stream').Readable;
var r = new Read();

r.push('hello');
r.push('world');
r.push(null);

r.on('readable', function () {
    var chunk = r.read();
    console.log('get data by readable event: ', chunk.toString())
});

// get data by readable event:  hello world!

注意:process.stdout之前已經將內容推送進readable流rs中,但是所有的資料依然是可寫的

Readable Stream的模式

Readable Stream 存在兩種模式(flowing mode 與 paused mode),
這兩種模式決定了chunk資料流動的方式—自動流動還是手工流動。那如何觸發這兩種模式呢:

flowing mode: 註冊事件data、呼叫resume方法、呼叫pipe方法

paused mode: 呼叫pause方法(沒有pipe方法)、移除data事件 && unpipe所有pipe

// data事件觸發flowing mode
Readable.prototype.on = function(ev, fn) {
    ...
    if (ev === 'data' && false !== this._readableState.flowing) {
        this.resume();
      }
      ...
}

// resume觸發flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
           debug('resume');
           state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法觸發flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}

結論

兩種方式取決於一個flowing欄位:true –> flowing mode;false –> paused mode

三種方式最後均是通過resume方法,將state.flowing = true