1. 程式人生 > >極簡 Node.js 入門 - 4.5 雙工流

極簡 Node.js 入門 - 4.5 雙工流

> 極簡 Node.js 入門系列教程:[https://www.yuque.com/sunluyong/node](https://www.yuque.com/sunluyong/node) > > 本文更佳閱讀體驗:[https://www.yuque.com/sunluyong/node/duplex-and-transform](https://www.yuque.com/sunluyong/node/duplex-and-transform) 雙工流就是同時實現了 Readable 和 Writable 的流,即可以作為上游生產資料,又可以作為下游消費資料,這樣可以處於資料流動管道的中間部分,即 ```javascript rs.pipe(rws1).pipe(rws2).pipe(rws3).pipe(ws); ``` 在 NodeJS 中雙工流常用的有兩種 1. Duplex
1. Transform
## Duplex ### 實現 Duplex 和 Readable、Writable 實現方法類似,實現 Duplex 流非常簡單,但 Duplex 同時實現了 Readable 和 Writable, NodeJS 不支援多繼承,所以我們需要繼承 Duplex 類 1. 繼承 Duplex 類
1. 實現 _read() 方法
1. 實現 _write() 方法
相信看過前面章節後對 _read()、_write() 方法的實現不會陌生,和 Readable、Writable 完全一樣 ```javascript const Duplex = require('stream').Duplex; const myDuplex = new Duplex({ read(size) { // ... }, write(chunk, encoding, callback) { // ... } }); ``` ### 建構函式引數 Duplex 例項內同時包含可讀流和可寫流,在例項化 Duplex 類的時候可以傳遞幾個引數 - readableObjectMode : 可讀流是否設定為 ObjectMode,預設 false
- writableObjectMode : 可寫流是否設定為 ObjectMode,預設 false
- allowHalfOpen : 預設 true, 設定成 false 的話,當寫入端結束的時,流會自動的結束讀取端
### 小例子 瞭解了 Readable 和 Writable 之後看 Duplex 非常簡單,直接用一個官網的例子 ```javascript const Duplex = require('stream').Duplex; const kSource = Symbol('source'); class MyDuplex extends Duplex { constructor(source, options) { super(options); this[kSource] = source; } _write(chunk, encoding, callback) { // The underlying source only deals with strings if (Buffer.isBuffer(chunk)) chunk = chunk.toString(); this[kSource].writeSomeData(chunk); callback(); } _read(size) { this[kSource].fetchSomeData(size, (data, encoding) =>
{ this.push(Buffer.from(data, encoding)); }); } } ``` 當然這是不能執行的虛擬碼,但是 Duplex 的作用可見一斑,進可以生產資料,又可以消費資料,所以才可以處於資料流動管道的中間環節,Node.js 中常見的 Duplex 流有 - Tcp Scoket
- Zlib
- Crypto
## Transform Transform 同樣是雙工流,看起來和 Duplex 重複了,但兩者有一個重要的區別:Duplex 雖然同時具備可讀流和可寫流,但兩者是相對獨立的;Transform 的可讀流的資料會經過一定的處理過程自動進入可寫流

雖然會從可讀流進入可寫流,但並不意味這兩者的資料量相同,上面說的一定的處理邏輯會決定如果 tranform 可讀流,然後放入可寫流,transform 原義即為轉變,很貼切的描述了 Transform 流作用

最常見的壓縮、解壓縮用的 zlib 即為 Transform 流,壓縮、解壓前後的資料量明顯不同,而流的作用就是輸入一個 zip 包,輸入一個解壓檔案或反過來。我們平時用的大部分雙工流都是 Transform。 ### 實現 Tranform Tranform 類內部繼承了 Duplex 並實現了 writable.write() 和 readable._read() 方法,自定義一個 Transform 流,只需要三個步驟 1. 繼承 Transform 類
1. 實現 _transform() 方法
1. 實現 _flush() 方法(可以不實現)
_transform(chunk, encoding, callback) 方法用來接收資料,併產生輸出,引數我們已經很熟悉了,和 Writable 一樣, chunk 預設是 Buffer,除非 decodeStrings 被設定為 false

在 _transform() 方法內部可以呼叫 this.push(data) 生產資料,交給可寫流,也可以不呼叫,意味著輸入不會產生輸出

當資料處理完了必須呼叫 callback(err, data) ,第一個引數用於傳遞錯誤資訊,第二個引數可以省略,如果被傳入了,效果和 this.push(data) 一樣 ```javascript transform.prototype._transform = function (data, encoding, callback) { this.push(data); callback(); }; transform.prototype._transform = function (data, encoding, callback) { callback(null, data); }; ``` 有些時候,transform 操作可能需要在流的最後多寫入可寫流一些資料。例如, Zlib流會儲存一些內部狀態,以便優化壓縮輸出。在這種情況下,可以使用_flush()方法,它會在所有寫入資料被消費、觸發 'end'之前被呼叫 ## Transform 事件 Transform 流有兩個常用的事件 1. 來自 Writable 的 finish
1. 來自 Readable 的 end
當呼叫 transform.end() 並且資料被 _transform() 處理完後會觸發 finish,呼叫_flush後,所有的資料輸出完畢,觸發end事件 ## 對比 瞭解了 Readable 和 Writable 之後,理解雙工流十分自然,但兩者的區別會讓一些初學者困惑,簡單的區分:Duplex 的可讀流和可寫流之間並沒有直接關係,Transform 中可讀流的資料會經過處理後自動放入可寫流中。
看兩個簡單的例子就能直觀瞭解到 Duplex 和 Transform 的區別 #### TCP socket net 模組可以用來建立 socket,socket 在 NodeJS 中是一個典型的 Duplex,看一個 TCP 客戶端的例子 ```javascript var net = require('net'); //建立客戶端 var client = net.connect({port: 1234}, function() { console.log('已連線到伺服器'); client.write('Hi!'); }); //data事件監聽。收到資料後,斷開連線 client.on('data', function(data) { console.log(data.toString()); client.end(); }); //end事件監聽,斷開連線時會被觸發 client.on('end', function() { console.log('已與伺服器斷開連線'); }); ``` 可以看到 client 就是一個 Duplex,可寫流用於向伺服器傳送訊息,可讀流用於接受伺服器訊息,兩個流內的資料並沒有直接的關係。 #### gulp gulp 非常擅長處理程式碼本地構建流程,看一段官網的示例程式碼 ```javascript function css() { return src('client/templates/*.less') .pipe(less()) .pipe(minifyCSS()) .pipe(dest('build/css')) } ``` 其中 less() 和 minify() 就是典型的 Transform,處理流程大概是 ``` .less 原始碼檔案 -> less() -> css 檔案 -> minify -> 壓縮後的 css ``` 可以看出來,less() 和 minify() 都是對輸入資料做了些特殊處理,然後交給了輸出資料。這樣簡單的對比就能看出 Duplex 和 Transform 的區別,在平時使用的時候,當一個流同時面向生產者和消費者服務的時候會選擇 Duplex,當只是對資料讀取後需要對之做一些轉換工作的時候就會選擇使用 Tranform ## through2 日常資料處理 Transform 使用非常頻繁,通過社群模組 [through2](https://www.npmjs.com/package/through2) 模組可以方便封裝一個 Transform 流,API 十分簡潔 ```javascript through2([ options, ] [ transformFunction ] [, flushFunction ]) ``` 1. options 中 `objectMode` 設定為 true 後可以使用物件模式,也可以直接使用 through2.obj() 方法達到同樣效果 1. transformFunction:用於處理資料轉換部分,有三個引數 1. chunk: 當前處理的資料 Buffer 1. encoding:使用的編碼 1. callback: 3. flushFunction:可選項,在流結束之前呼叫,可以用來處理一些沒有完成或者需要收尾的工作
看個官網示例,實現功能 > 讀取 ex.txt 內容,把字元 a 轉成字元 z,輸出到檔案 out.txt 中 ```javascript fs.createReadStream('ex.txt') .pipe(through2(function (chunk, encoding, callback) { for (var i = 0; i < chunk.length; i++) if (chunk[i] == 97) { chunk[i] = 122 // swap 'a' for 'z' } this.push(chunk); // 內容放到輸出流,必須在 callback 之前呼叫,可以呼叫多次 callback(); // 最後不要忘記呼叫 })) .pipe(fs.createWriteStream('out.txt')) .on('finish', () => doSomethingSpecial()); ``` 瞭解了這五個章節的內容可以簡單使用 Node.js 流了,處於理解成本只介紹了常用功能,全部功能可以閱讀 [Node.js Stream API](http://nodejs.cn/api/stream.html) ## pipeline ```javascript const { pipeline } = require('stream'); ``` pipeline 方法的作用類似於鏈式呼叫 pipe() 方法 ,在管道內傳輸多個流,在管道任務結束後提供回撥
**_stream.pipeline(source[, ...transforms], destination, callback)_** 1. source:可讀流 1. ...tranforms:雙工流 1. destination:可寫流 1. callback:當管道完全地完成時呼叫 ```javascript const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); pipeline( fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), (err) => { if (err) { console.error('管道傳送失敗', err); } else { console.log('管道傳送成功'); } }