Node.js Streams 基礎總結
前段時間遇到專案上需要請求資源方獲取opus編碼的音訊檔案,然後置入ogg容器中傳輸給前端標準化播放器進行播放的需求。流程模式是,通過服務上建立的socket連線不斷接收資源方傳送的檔案塊。而前端請求中層服務是HTTP請求。
一個簡單的需求,在Node.js服務中,比較適合處理方式是使用Stream,通過pipe不同的加解密流以及最後的HTTP responses傳輸給前端標準格式的檔案。由於用到很多流處理方式,所以在此總結一下Node.js Streams的模組使用基礎。
1. Node.js Stream 使用場景
Stream就是資料資訊的一個傳輸集合。適合進行大檔案和連續的傳輸檔案塊的處理,但不僅限於此。比如我們要將一個檔案進行加密然後再壓縮再傳輸的,通過讓資料在不同的Stream中傳輸處理,無論在書寫還是處理效率上都很有優勢。
2. Stream到底是什麼
Node.js中,Buffer是非常重要的一個模組。在很多資料處理中會有所應用。它進行記憶體分配不是通過V8實現的,而是分配的外部對外記憶體,主要原因可能是V8的垃圾回收機制過於影響效能。Buffer,效能部分是C++實現,而於Node.js進行了非效能方面的實現以及開放呼叫。
簡單理解,Stream其實就是Buffer的一個更高階封裝加另外實現。但Stream和Buffer在使用中,還是具有不同。比如,Buffer在分配完成讀取所有資料後,才能進行使用,而Stream只要建立,當消費者需要使用時便可以使用。同時,資料可以進入緩衝區,而這個緩衝區,其實就是Buffer。
不難看出,在這樣的實現下,在處理過程中,特別是大檔案處理,Stream的佔用記憶體會更低,而處理效率會更高。
我們通過兩段程式碼來檢視兩者的區別
// 使用Buffer拷貝test.file大檔案大小 556641 KB const fs = require('fs') const time = Date.now() fs.readFile('./test.file', (err, buffer) => { fs.writeFile('./test.buffer.file', buffer, err => { console.log('memory use: ', process.memoryUsage()) console.log('buffer', Date.now() - time) console.log('finish...') }) }) 複製程式碼
// 使用Stream拷貝test.file大檔案大小 556641 KB const fs = require('fs') const time = Date.now() fs.createReadStream('./test.file') .pipe(fs.createWriteStream('./test.stream.file')) .on('finish', () => { console.log('memory use: ', process.memoryUsage()) console.log('stream', Date.now() - time) console.log('finish...') }) 複製程式碼
以下是輸出結果: (只是一個簡單的測試,但是區別還是比較明顯的)


如果需要,兩者之間也可以進行轉換
// stream to buffer function streamToBuffer(stream, cb) { let buffers = []; stream.on('error', function(err) { console.log(err) }) stream.on('data', function(data) { buffers.push(data) }) stream.on('end', function() { cb(buffers) }) } 複製程式碼
// buffer to stream var stream = require('stream') function bufferToStream(buffer) { var stream = new stream.Duplex() stream.push(buffer)// 讀入 stream.push(null)// null 代表讀入結束 return stream } 複製程式碼
3. Stream 分類和基礎用法
Node.js中,Stream有4種類型
-
Readable: 可讀流
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdout and stderr
- process.stdin
-
Writable: 可寫流
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout
- process.stderr
-
Duplex: 可讀可寫
- TCP sockets
- zlib streams
- crypto streams
-
Tranform: 讀寫過程中處理
- zlib streams
- crypto streams
雖然,上述Duplex和Tranform分類中包含了同樣的Node.js實現的庫,但是其實兩種Stream存在區別。Duplex,可讀可寫,而在讀寫上的操作是相互獨立存在的,互相不影響。而Tranform,讀寫是統一的,也就是說,Tranform中讀入的資料經過處理,會直接寫入。二者都是繼承了Readable和Writable後進行實現的。
基本使用
// 檔案系統下 const fs = require('fs') var rStream = fs.createReadStream(file) var wStream = fs.createWriteStream(file) // 直接使用stream模組 const stream = require('stream') var rStream = new stream.Readable() var wStream = new stream.Writable() var dStream = new stream.Duplex() var tStream = new stream.Transform() // 讀寫流的事件監聽 rOrWStream.on('open', function() { // 監聽開啟 }) rOrWStream.on('data', function(data) { // 監聽資料 }) rOrWStream.on('error', function(error) { // 監聽讀寫錯誤 }) rOrWStream.on('end', function(end) { // 監聽讀取或寫入結束 }) rOrWStream.on('close', function() { // 監聽關閉流 }) 複製程式碼
stream模組中,不同型別stream提供了多種的方法可以呼叫,不在此過多贅述,可以檢視官方文件。
其中比較特別的是,在stream中,引數highWaterMark設定值,是stream儲存的最大值,觸發了stream儲存設定的highWaterMark後,Writable和Readable兩者表現有些許不同。
const fs = require('fs') var rStream = fs.createReadStream(file, { flags: 'r',// 指定用什麼模式開啟檔案,’w’代表寫,’r’代表讀,類似的還有’r+’、’w+’、’a’等 encoding: 'utf8',// 編碼格式 autoClose: true,// 是否發生錯誤或結束時自動關閉 highWaterMark: 9,// 單位KB,不設定預設為16KB start:0,// 開始讀取範圍 end:0// 結束讀取範圍,從檔案中讀取一個位元組範圍,而不是整個檔案 }) 複製程式碼
Writable觸發後,不能繼續寫入,呼叫Writable write方法返回false,而當快取區可以繼續寫入資料的時候,是會觸發'drain'事件。
Readable,存在三種狀態
- readable.readableFlowing === null
- readable.readableFlowing === false
- readable.readableFlowing === true
null表示沒有提供消費流資料的機制,所以流不會產生資料。監聽 'data' 事件、呼叫pipe、resume方法都會使狀態切換到 true,可讀流開始主動地產生資料並觸發事件。 呼叫pause、unpipe,或接收到背壓(也就是快取區達到highWaterMark值,如上writable同樣),則狀態會被設為 false,暫時停止事件流動但不會停止資料的生成。 在這個狀態下,為 'data' 事件繫結監聽器不會使狀態切換到 true。
4. 定製化Stream
Stream中的pipe方法猶如管道一樣,讓資料可以連續通過不同的流處理,比如
const fs = require('fs') var rStream = fs.createReadStream(file) var wStream = fs.createWriteStream(renamefile) rStream.pipe(wStream) 複製程式碼
在具體的開發中,常常需要對資料進行處理,我們需要重寫模組中的方式
用例 | 類 | 方法 |
只讀流 | Readable | _read |
只寫流 | Writable | _write _writev _final |
可讀可寫 | Duplex | _read _write _writev _final |
讀入處理,寫入 | Transform | _transform _flush _final |
寫法有很多種,以Writable為例
// prototype繼承 var stream = require('stream') var util = require('util') function MyStream () { stream.Writable.call(this) } util.inherits(MyStream, stream.Writable) MyStream.prototype._write = function (chunk, encoding, callback) { console.log(chunk.toString()) callback() } var myStream = new MyStream() process.stdin.pipe(myStream) 複製程式碼
// 使用例項 var stream = require('stream') var myStream = new stream.Writable() myStream._write = function (chunk, encoding, callback) { console.log(chunk.toString()) callback() } process.stdin.pipe(myStream) 複製程式碼
// 使用Constructor API var myStream = new stream.Writable({ _write: function(chunk, encoding, callback) { console.log(chunk.toString()) callback() } }) 複製程式碼
// ES6類寫法, Node 4+ class MyStream extends stream.Writable { _write(chunk, enc, callback) { console.log(chunk.toString()) callback() } } var myStream = new MyStream() 複製程式碼
以下以ES6的寫法為主
Writable
const { Writable } = require('stream') class OutStream extends Writable { constructor(option) { super() this.encode = option.encode } _write(chunk, enc = this.encode, next) { console.log(chunk.toString()) next && next() } } const outStream = new OutStream({ encode: 'utf-8', }) process.stdin.pipe(outStream) 複製程式碼
Readable
const { Readable } = require('stream') class InStream extends Readable { constructor() { super() } _read(size) { // size就是highWaterMark值 } } const inStream = new InStream() inStream.push('ABCDEFG')// push讀入 inStream.push('HIJKLMN') inStream.push(null)// null表示已經無資料 inStream.pipe(process.stdout) 複製程式碼
可以重寫 _read
class InStream extends Readable { constructor() { super() } // _read會持續觸發 _read(size) { this.push(this.num++) if(this.num > 20) { this.push(null) // 當num > 20,push(null)結束讀入 } } } const inStream = new InStream() inStream.num = 0 inStream.pipe(process.stdout) 複製程式碼
Duplex
const { Duplex } = require('stream') class IoStream extends Duplex { constructor(option) { super() let op = option || {} this.encode = option.encode } // 同時重寫 _write _read _write(chunk, enc, next) { console.log(chunk.toString()) next && next() } _read(size) { this.push(this.num++) if(this.num > 20) { this.push(null) } } } const iostream = new IoStream({ encode: 'utf-8', }) iostream.num = 0 process.stdin.pipe(iostream).pipe(process.stdout) 複製程式碼
Transform
const { Transform } = require('stream') class MyTransform extends Transform { constructor() { super() } // 重寫 _transform 讀入處理後寫入 _transform(chunk, enc, next) { this.push(chunk.toString().toUpperCase()) next && next() } } const mytransfrom = new MyTransform() process.stdin.pipe(mytransfrom).pipe(process.stdout) 複製程式碼
5. objectMode
有時我們需要處理的不僅僅是字串,還包括特殊資料。
比如,如果有一個需要和C++伺服器通訊或互通訊息的Node.js伺服器,通過一個cache儲存二進位制資料,要求存入的是C++ struct結構資料。這時候,需要通過Node.js的Object處理實現同樣結構資料。而同時又需要流處理的話,則需要使用到Stream的objectMode。
通過Transform進行簡單介紹
const { Transform } = require('stream') class CreateArray extends Transform { constructor() { // 開啟objectMode模式 super({ readableObjectMode: true, writableObjectMode: true }) } _transform(chunk, enc, next) { // 可以直接push Object資料 this.push(chunk.toString().trim().split(',')) next && next() } } class ObjToString extends Transform { constructor() { super({ readableObjectMode: true, writableObjectMode: true }) } _transform(chunk, enc, next) { // chunk可以是Object資料 this.push(JSON.stringify(chunk)) next && next() } } const createArray = new CreateArray() const objtostring = new ObjToString() process.stdin.pipe(createArray).pipe(objtostring).pipe(process.stdout) 複製程式碼
6. 小結
Stream是Node.js中很重要的模組,處理資料高效,在專案中需要更靈活的使用。