流“忙”
這裡就要提到一個網上的例子了。首先,我們需要先生成一個大檔案:
const fs = require('fs'); const file = fs.createWriteStream('./test.txt'); for(let i=0; i<= 1e6; i++) { file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n'); } file.end(); 複製程式碼
這就生成了一個大概 450MB 的檔案。第二步,我們需要啟動一個簡單的 Node 伺服器。
const fs = require('fs'); const http = require('http'); const server = http.createServer(); server.on('request', (req, res) => { fs.readFile('./test.txt', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(3000); 複製程式碼
把程式碼寫好之後,我在本地全域性安裝了一個 pm2,當然了,你也可以使用系統自帶工作管理員、活動監視器等等來監控記憶體消耗的變化,pm2 比較直觀好用,我就直接使用了 pm2。
這是初始狀態:

我們開啟命令列,使用 curl 來訪問伺服器: curl http://localhost:3000
。
這時候,我們可以通過 pm2 monit index.js
來檢視。
可以看到現在的記憶體消耗為:

可以看到,用這種方法是將整個檔案都放在了記憶體當中,如果檔案小還好,但是當資料量非常大的時候,就會造成記憶體不夠用的問題,並且當你將記憶體佔個七七八八也會影響到使用者訪問的速度,針對這種情況,我們使用流來解決這個問題。
首先,HTTP 響應物件也是一種可寫流,我們可以建立一個檔案的可讀流,通過 pipe 連線這兩個流,我們可以看下這樣的記憶體消耗。
const fs = require('fs'); const http = require('http'); const server = http.createServer(); server.on('request', (req, res) => { fs.createReadStream('./test.txt').pipe(res); }); server.listen(3000); 複製程式碼

可以看到在使用流的情況下,我們記憶體的消耗是十分少的。感受到流的魅力了麼?一起來學習吧。
從小學數學題開始
大家應該都記得,小學的時候經常碰到這樣的題:有一個 x 立方米的水池,一個進水管每秒進水 a 立方米,一個出水管每秒出水 b 立方米,問啥時候水池積滿?不知道大家小時候有沒有罵過這道題,一邊進一邊出這不是有病麼?哈哈,不扯遠了,繼續回到我們的流,這個題的模型其實和我們的流很相似,當然了,上面這個題還涉及到了流控的問題,下面再說。
在 Node.js 中,有四種基本的流型別:
- Readable: 可讀流,你可以把它想象成進水管,它將資料來源以流的方式一點一點的提供給下游。
- Writable: 可寫流,這是作為下游來消耗上游提供的資料,你可以把它想象成出水管。
- Duplex: 雙工流,即可讀也可寫。
- Transform: 繼承自 Duplex 的雙工流,它可以在寫入或者讀取資料的時候修改資料。
可讀流
說可讀流之前,先劃重點:
- read() 和 _read()
- flowing 和 pause 模式
- 事件 readable 和 data
重點畫出來了我們開始一點一點扒可讀流,首先,我們要說的就是 read
和 _read
這兩個函式,正常情況下,我們在使用可讀流的時候,需要提供一個 _read
方法,負責向緩衝區補充資料,它要從資料來源拉取資料,然後在這個方法中呼叫 push
方法把資料推到緩衝池中,當流結束時,我們需要 push(null)
,而 read(size) 函式是我們要消耗緩衝區的資料的時候使用的方法,不需要我們自己實現。
下面要說的是可讀流分兩種模式:flowing 和 pause,這兩種模式決定了資料流動的方式。
flowing 模式
在 flowing 模式下,資料會源源不斷的產生,每次都會觸發 data
事件 ,通過監聽這個事件來獲取資料。
那麼在什麼情況下會進入 flowing 模式呢?OK,扒原始碼,通過看可讀流的原始碼,知道在流的屬性裡有一個 flowing
的屬性,這個屬性初始化的時候為 null
。這時候是處於 pause
模式,我們在當前檔案全域性搜一下 flowing =
發現當我們呼叫 resume()
的時候會將這個標誌位設為 true
,這時候就處於 flowing
模式了,那麼還有沒有其他的方法呢?答案是肯定的,是有的,看原始碼:
Readable.prototype.on = function(ev, fn) { const res = Stream.prototype.on.call(this, ev, fn); const state = this._readableState; if(ev === 'data') { state.readableListening = this.listenerCount('readable') > 0; if (state.flowing !== false) this.resume(); } else if ('readable'){ //... } return res; } 複製程式碼
我們可以關注到,當我們監聽 data
事件的時候,因為當前初始化標誌位為 null
,所以會去呼叫 resume()
,這時候就會進入 flowing 模式,同時,當可讀流呼叫 pipe
的時候會去監聽 data
事件,也會進入 flowing
模式。
那麼當你監聽 data
事件進入 flowing
模式時,整個程式碼流程是什麼呢?

從這張圖,我們能看出 flowing
模式的一個大概流程,從初始化開始, flowing = null
,然後當我們監聽 data
事件,會去呼叫 this.resume()
,這時候就將 flowing
變為 true
,然後呼叫了 resume
,在這個函式裡,呼叫了 read(0)
去觸發 _read()
向緩衝區補充資料,這裡要提一點的是當我們呼叫 read(0)
的時候,不會破壞緩衝區的資料和狀態,並觸發 _read
去讀取資料到緩衝區。接下來就是不斷的迴圈往復,直到 push(null)
則流結束。
pause 模式
現在知道了 flowing
模式,那麼 pause
模式又是怎樣的呢?首先我們來看如何進入 pause
模式:
pause pause unpipe
一般來說我們很少會去使用到 pause
模式,在 pause
模式下,我們需要手動的呼叫 read()
函式去獲取資料。
readable 和 data
這兩個都是關於資料的事件,至於 end
事件,很簡單,就不多說了。
那麼 readable
事件代表了什麼呢? readable
只負責通知消費者流裡有新資料或者流讀完了,至於如何使用則是消費者自己的事情了,這時候 read()
就會返回新資料或者是 null。
至於 data
事件,我們看一下上面那張圖,這個事件是在流把資料傳遞給消費者的時候觸發的。
那麼我們同時監聽 data
和 readable
事件會怎麼樣呢?從上面的圖我們可以得知,當監聽 data
事件的時候,流直接將資料傳遞給了消費者,並沒有進入緩衝區,只會觸發 data
事件,而只有當資料消耗完成時 push(null)
會觸發 readable
事件。
可寫流
可寫流是作為下流來消耗上游的資料,那麼開始劃重點:
- _write 和 write
- finish 和 prefinishi 事件
和可讀流一樣,我們需要在初始化流的時候提供一個 _write()
方法,用來向底層寫資料,而 write()
方法是用來向可寫流提供資料的,注意在 _write
方法中的第三個引數在原始碼中是一個叫 onwrite
的方法,這是為了表明當前寫入資料已經完成了,可以開始寫入下面的資料了。可寫流的終止訊號是呼叫 end()
方法。
那麼可寫流是如何監聽流結束事件呢?答案是有兩個事件可以監聽,一個是 prefinish
,另一個是 finish
。
這兩個事件的區別是, finish
是在流的所有資料都寫入底層並且回撥函式都執行了才會觸發,而 prefinish
的觸發條件是所有的資料都寫入底層,這兩者之間還是有一定差異的。
Duplex 和 Tranform
Duplex 的程式碼量非常少,因為它同時繼承了可讀流和可寫流,它同時包含了這兩種流原型上的方法,同時包含了兩種流的屬性。所以我們既可以實現 _read
將它當成可讀流也可以實現 _write
將其當成可寫流來使用。
而 Transform 繼承了 Duplex,並且關聯了兩個快取池,我們向流中寫入資料,就能夠進行轉換,然後再讀取,那為什麼可以這樣操作呢?
我們去看看原始碼,Transform 自己實現了 _write
和 _read
方法,注意的是這裡使用的是同一個快取,我們來看這麼一段程式碼。
const { Transform } = require('stream') var transform = Transform({ transform: function (buf, _, next) { next(null, buf.toString().replace('a', 'b')) } }) // 'b' transform.pipe(process.stdout) transform.write('a') transform.end() 複製程式碼
上面的程式碼主要流程是這樣的,Transform 呼叫了繼承自可寫流的 write
方法,然後這個方法呼叫自己實現的 _write
將寫入的資料存到了 Transform 的快取中,然後將其轉換成 buffer,在其後 _read
函式被呼叫,在這個函式中呼叫了在初始化的時候傳入轉換函式 _transform
對資料進行轉換,在轉換過後就是 readable.pipe(writable)
的模式了。
還有一點是,Transform 還有一個 _flush
函式,在 prefinish
觸發時就會呼叫它,說明寫流結束了。
神器 pipe
在我們進行可寫流和可讀流的對接的時候我們要處理各種事件,以及流控的問題,就像我們在上面提到的那道題,如果讀流速度太快,而寫流速度慢,就會導致速度不匹配的問題,而 pipe
實現了一套背壓平衡機制來控制兩邊的速度。
那關於 pipe 的原始碼解析等等可以去看看這篇文章。
總結
在 Node 裡,流是非常重要的一個模組,它能夠很好的處理大檔案,以及對資料的處理能力。這次對流的學習也是收穫了不少東西,與君共勉!