NodeJS Stream(可讀流、可寫流) API解讀
流的介紹
在 NodeJS 中,我們對檔案的操作需要依賴核心模組fs
,fs
中有很基本 API 可以幫助我們讀寫佔用記憶體較小的檔案,如果是大檔案或記憶體不確定也可以通過open
、read
、write
、close
等方法對檔案進行操作,但是這樣操作檔案每一個步驟都要關心,非常繁瑣,fs
中提供了可讀流和可寫流,讓我們通過流來操作檔案,方便我們對檔案的讀取和寫入。
可讀流
1、createReadStream 建立可讀流
createReadStream
方法有兩個引數,第一個引數是讀取檔案的路徑,第二個引數為options
選項,其中有八個引數:
r null null 0o666 true 64 * 1024
createReadStream
的返回值為fs.ReadStream
物件,讀取檔案的資料在不指定encoding
時,預設為 Buffer。
建立可讀流
const fs = require("fs"); // 建立可讀流,讀取 1.txt 檔案 let rs = fs.creatReadStream("1.txt", { start: 0, end: 3, highWaterMark: 2 });複製程式碼
在建立可讀流後預設是不會讀取檔案內容的,讀取檔案時,可讀流有兩種狀態,暫停狀態和流動狀態。
注意:本篇的可寫流為流動模式,流動模式中有暫停狀態和流動狀態,而不是暫停模式,暫停模式是另一種可讀流readable
。
2、流動狀態
流動狀態的意思是,一旦開始讀取檔案,會按照highWaterMark
的值一次一次讀取,直到讀完為止,就像一個開啟的水龍頭,水不斷的流出,直到流乾,需要通過監聽data
事件觸發。
假如現在1.txt
檔案中的內容為0~9
十個數字,我們現在建立可讀流並用流動狀態讀取。
流動狀態
const fs = require("fs"); let rs = fs.createReadStream("1.txt", { start: 0, end: 3, highWaterMark: 2 }); // 讀取檔案 rs.on("data", data => { console.log(data); }); // 監聽讀取結束 rs.on("end", () => { console.log("讀完了"); }); // <Buffer 30 31> // <Buffer 32 33> // 讀完了複製程式碼
在上面程式碼中,返回的rs
物件監聽了兩個事件:
-
data:每次讀取
highWaterMark
個位元組,觸發一次data
事件,直到讀取完成,回撥的引數為每次讀取的 Buffer; - end:當讀取完成時觸發並執行回撥函式。
我們希望最後讀到的結果是完整的,所以我們需要把每一次讀到的結果在data
事件觸發時進行拼接,以前我們可能使用下面這種方式。
錯誤拼接資料的方式
const fs = require("fs"); let rs = fs.createReadStream("1.txt", { start: 0, end: 3, highWaterMark: 2 }); let str = ""; rs.on("data", data => { str += data; }); rs.on("end", () => { console.log(str); }); // 0123複製程式碼
在上面程式碼中如果讀取的檔案內容是中文,每次讀取的highWaterMark
為兩個位元組,不能組成一個完整的漢字,在每次讀取時進行+=
操作會預設呼叫toString
方法,這樣會導致最後讀取的結果是亂碼。
在以後通過流操作檔案時,大部分情況下都是在操作 Buffer,所以應該用下面這種方式來獲取最後讀取到的結果。
正確拼接資料的方式
const fs = require("fs"); let rs = fs.createReadStream("1.txt", { start: 0, end: 3, highWaterMark: 2 }); // 儲存每次讀取回來的 Buffer let bufArr = []; rs.on("data", data => { bufArr.push(data); }); rs.on("end", () => { console.log(Buffer.concat(bufArr).toString()); }); // 0123複製程式碼
3、暫停狀態
在流動狀態中,一旦開始讀取檔案,會不斷的觸發data
事件,直到讀完,暫停狀態是我們每讀取一次就直接暫停,不再繼續讀取,即不再觸發data
事件,除非我們主動控制繼續讀取,就像水龍頭開啟放水一次後馬上關上水龍頭,下次使用時再開啟。
類似於開關水龍頭的動作,也就是暫停和恢復讀取的動作,在可讀流返回的rs
物件上有兩個對應的方法,pause
和resume
。
在下面的場景中我們把建立可讀流的結尾位置更改成9
,在每次讀兩個位元組並暫停一秒後恢復讀取,直到讀完0~9
十個數字。
暫停狀態
const fs = require("fs"); let rs = fs.createReadStream("1.txt", { start: 0, end: 9, hithWaterMark: 2 }); let bufArr = []; rs.on("data", data => { bufArr.push(data); rs.pause(); // 暫停讀取 console.log("暫停", new Date()); setTimeout(() => { rs.resume(); // 恢復讀取 }, 1000) }); rs.on("end", () => { console.log(Buffer.concat(bufArr).toString()); }); // 暫停 2018-07-03T23:52:52.436Z // 暫停 2018-07-03T23:52:53.439Z // 暫停 2018-07-03T23:52:54.440Z // 暫停 2018-07-03T23:52:55.442Z // 暫停 2018-07-03T23:52:56.443Z // 0123456789複製程式碼
4、錯誤監聽
在通過可讀流讀取檔案時都是非同步讀取,在非同步讀取中如果遇到錯誤也可以通過非同步監聽到,可讀流返回值rs
物件可以通過error
事件來監聽錯誤,在讀取檔案出錯時觸發回撥函式,回撥函式引數為err
,即錯誤物件。
錯誤監聽
const fs = require("fs"); // 讀取一個不存在的檔案 let rs = fs.createReadStream("xxx.js", { highWarterMark: 2 }); let bufArr = []; rs.on("data", data => { bufArr.push(data); }); rs.on("err", err => { console.log(err); }); rs.on("end", () => { console.log(Buffer.concat(bufArr).toString()); }); // { Error: ENOENT: no such file or directory, open '......xxx.js' ......}複製程式碼
5、開啟和關閉檔案的監聽
流的適用性非常廣,不只是檔案讀寫,也可以用在http
中資料的請求和響應上,但是在針對檔案讀取返回的rs
上有兩個專有的事件用來監聽檔案的開啟與關閉。
open
事件用來監聽檔案的開啟,回撥函式在開啟檔案後執行,close
事件用來監聽檔案的關閉,如果建立的可讀流的autoClose
為true
,在自動關閉檔案時觸發,回撥函式在關閉檔案後執行。
開啟和關閉可讀流的監聽
const fs = require("fs"); let rs = fs.createReadStream("1.txt", { start: 0, end: 3, highWaterMark: 2 }); rs.on("open", () => { console.log("open"); }); rs.on("close", () => { console.log("close"); }); // open複製程式碼
在上面程式碼我們看出只要建立了可讀流就會開啟檔案觸發open
事件,因為預設為暫停狀態,沒有對檔案進行讀取,所以不會關閉檔案,即不會觸發close
事件。
暫停狀態
const fs = require("fs"); let rs = fs.createReadStream("1.txt", { start: 0, end: 3, hithWaterMark: 2 }); rs.on("open", () => { console.log("open"); }); rs.on("data", data => { console.log(data); }); rs.on("end", () => { console.log("end"); }); rs.on("close", () => { console.log("close"); }); // open // <Buffer 30 31> // <Buffer 32 33> // end // close複製程式碼
從上面例子執行的列印結果可以看出只有開始讀取檔案並讀完後,才會關閉檔案並觸發close
事件,end
事件的觸發要早於close
。
可寫流
1、createWriteStream 建立可寫流
createWriteStream
方法有兩個引數,第一個引數是讀取檔案的路徑,第二個引數為options
選項,其中有七個引數:
w utf8 null 0o666 true 16 * 1024
createWriteStream
返回值為fs.WriteStream
物件,第一次寫入時會真的寫入檔案中,繼續寫入,會寫入到快取中。
建立可寫流
const fs = require("fs"); // 建立可寫流,寫入 2.txt 檔案 let ws = fs.createWriteStream("2.txt", { start: 0, highWaterMark: 3 });複製程式碼
2、可寫流的 write 方法
在可寫流中將內容寫入檔案需要使用ws
的write
方法,引數為寫入的內容,返回值是一個布林值,代表highWaterMark
的值是否足夠當前的寫入,如果足夠,返回true
,否則返回false
,換種說法就是寫入內容的長度是否超出了highWaterMark
,超出返回false
。
write 方法寫入
const fs = require("fs"); let ws = fs.createWriteSteam("2.txt", { start: 0, highWaterMark: 3 }); let flag1 = ws.write("1"); console.log(flag1); let flag2 = ws.write("2"); console.log(flag2); let flag3 = ws.write("3"); console.log(flag3); // true // true // false複製程式碼
寫入不存在的檔案時會自動建立檔案,如果start
的值不是0
,在寫入不存在的檔案時預設找不到寫入的位置。
3、可寫流的 drain 事件
drain
意為 “吸乾”,當前寫入的內容已經大於等於了highWaterMark
,會觸發drain
事件,當內容全部從快取寫入檔案後,會執行回撥函式。
drain 事件
const fs = require("fs"); let ws = fs.createWriteStream("2.txt", { start: 0, highWaterMark: 3 }); let flag1 = ws.write("1"); console.log(flag1); let flag2 = ws.write("2"); console.log(flag2); let flag3 = ws.write("3"); console.log(flag3); ws.on("drain", () => { console.log("吸乾"); }); // true // true // false複製程式碼
4、可寫流的 end 方法
end
方法傳入的引數為最後寫入的內容,end
會將快取未寫入的內容清空寫入檔案,並關閉檔案。
end 方法
const fs = require("fs"); let ws = fs.createWriteStream("2.txt", { start: 0, highWaterMark: 3 }); let flag1 = ws.write("1"); console.log(flag1); let flag2 = ws.write("2"); console.log(flag2); let flag3 = ws.write("3"); console.log(flag3); ws.on("drain", () => { console.log("吸乾"); }); ws.end("寫完了"); // true // true // false複製程式碼
在呼叫end
方法後,即使再次寫入的值超出了highWaterMark
也不會再觸發drain
事件了,此時開啟2.txt
後發現檔案中的內容為 “123寫完了”。
常見報錯
const fs = require("fs"); let ws = fs.createWriteStream("2.txt", { start: 0, highWaterMark: 3 }); ws.write("1"); ws.end("寫完了"); ws.write("2"); // Error [ERR_STREAM_WRITE_AFTER_END]: write after end...複製程式碼
在呼叫end
方法後,不可以再呼叫write
方法寫入,否則會報一個很常見的錯誤write after end
,檔案原有內容會被清空,而且不會被寫入新內容。
可寫流與可讀流混合使用
可寫流和可讀流一般配合來使用,讀來的內容如果超出了可寫流的highWaterMark
,則呼叫可讀流的pause
暫停讀取,等待記憶體中的內容寫入檔案,未寫入的內容小於highWaterMark
時,呼叫可寫流的resume
恢復讀取,建立可寫流返回值的rs
上的pipe
方法是專門用來連線可讀流和可寫流的,可以將一個檔案讀來的內容通過流寫到另一個檔案中。
pipe 方法使用
const fs = require("fs"); // 建立可讀流和可寫流 let rs = fs.createReadStream("1.txt", { highWaterMark: 3 }); let ws = fs.createWriteStream("2.txt", { highWaterMark: 2 }); // 將 1.txt 的內容通過流寫入 2.txt 中 rs.pipe(ws);複製程式碼
通過上面的這種類似於管道的方式,將一個流從一個檔案輸送到了另一個檔案中,而且會根據讀流和寫流的highWaterMark
自由的控制寫入的 “節奏”,不用擔心記憶體的消耗。
總結
這篇是關於讀流和寫流的基本用法,在平時的開發當中,大多數的 API 都用不到,只有最後的pipe
用的最多,無論是在檔案的讀寫還是請求的響應,其他的 API 雖然用的少,但是作為一個合格的程式員一定要有所瞭解。