Node 深入Stream(1)
1. 流的概念
- 流是一組有序的,有起點和終點的位元組資料傳輸手段
- 它不關心檔案的整體內容,只關注是否從檔案中讀到了資料,以及讀到資料之後的處理
- 流是一個抽象介面,被 Node 中的很多物件所實現。比如HTTP 伺服器request和response物件都是流。
2.可讀流createReadStream
實現了stream.Readable
介面的物件,將物件資料讀取為流資料,當監聽data事件後,開始發射資料
fs.createReadStream = function(path, options) { return new ReadStream(path, options); }; util.inherits(ReadStream, Readable); 複製程式碼
2.1 建立可讀流
var rs = fs.createReadStream(path,[options]); 複製程式碼
- path讀取檔案的路徑
-
options
- flags開啟檔案要做的操作,預設為'r'
- encoding預設為null
- start開始讀取的索引位置
- end結束讀取的索引位置(包括結束位置)
- highWaterMark讀取快取區預設的大小64kb
如果指定utf8編碼highWaterMark要大於3個位元組
2.2 監聽data事件
流切換到流動模式,資料會被儘可能快的讀出
rs.on('data', function (data) { console.log(data); }); 複製程式碼
2.3 監聽end事件
該事件會在讀完資料後被觸發
rs.on('end', function () { console.log('讀取完成'); }); 複製程式碼
2.4 監聽error事件
rs.on('error', function (err) { console.log(err); }); 複製程式碼
2.5 監聽open事件
rs.on('open', function () { console.log(err); }); 複製程式碼
2.6 監聽close事件
rs.on('close', function () { console.log(err); }); 複製程式碼
2.7 設定編碼
與指定{encoding:'utf8'}效果相同,設定編碼
rs.setEncoding('utf8'); 複製程式碼
2.8 暫停和恢復觸發data
通過pause()方法和resume()方法
rs.on('data', function (data) { rs.pause(); console.log(data); }); setTimeout(function () { rs.resume(); },2000); 複製程式碼
3.可寫流createWriteStream
實現了stream.Writable介面的物件來將流資料寫入到物件中
fs.createWriteStream = function(path, options) { return new WriteStream(path, options); }; util.inherits(WriteStream, Writable); 複製程式碼
3.1 建立可寫流
var ws = fs.createWriteStream(path,[options]); 複製程式碼
- path寫入的檔案路徑
-
options
- flags開啟檔案要做的操作,預設為'w'
- encoding預設為utf8
- highWaterMark寫入快取區的預設大小16kb
3.2 write方法
ws.write(chunk,[encoding],[callback]); 複製程式碼
- chunk寫入的資料buffer/string
- encoding編碼格式chunk為字串時有用,可選
- callback 寫入成功後的回撥
返回值為布林值,系統快取區滿時為false,未滿時為true
3.3 end方法
ws.end(chunk,[encoding],[callback]); 複製程式碼
表明接下來沒有資料要被寫入 Writable 通過傳入可選的 chunk 和 encoding 引數,可以在關閉流之前再寫入一段資料 如果傳入了可選的 callback 函式,它將作為 'finish' 事件的回撥函式
3.4 drain方法
- 當一個流不處在 drain 的狀態, 對 write() 的呼叫會快取資料塊, 並且返回 false。 一旦所有當前所有快取的資料塊都排空了(被作業系統接受來進行輸出), 那麼 'drain' 事件就會被觸發
-
建議, 一旦 write() 返回 false, 在 'drain' 事件觸發前, 不能寫入任何資料塊
let fs = require('fs'); let ws = fs.createWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); let i = 10; function write(){ letflag = true; while(i&&flag){ flag = ws.write("1"); i--; console.log(flag); } } write(); ws.on('drain',()=>{ console.log("drain"); write(); }); 複製程式碼
3.5 finish方法
在呼叫了 stream.end() 方法,且緩衝區資料都已經傳給底層系統之後, 'finish' 事件將被觸發。
var writer = fs.createWriteStream('./2.txt'); for (let i = 0; i < 100; i++) { writer.write(`hello, ${i}!\n`); } writer.end('結束\n'); writer.on('finish', () => { console.error('所有的寫入已經完成!'); }); 複製程式碼
4.pipe方法
4.1 pipe方法的原理
var fs = require('fs'); var ws = fs.createWriteStream('./2.txt'); var rs = fs.createReadStream('./1.txt'); rs.on('data', function (data) { var flag = ws.write(data); if(!flag) rs.pause(); }); ws.on('drain', function () { rs.resume(); }); rs.on('end', function () { ws.end(); }); 複製程式碼
4.2 pipe用法
readStream.pipe(writeStream); var from = fs.createReadStream('./1.txt'); var to = fs.createWriteStream('./2.txt'); from.pipe(to); 複製程式碼
將資料的滯留量限制到一個可接受的水平,以使得不同速度的來源和目標不會淹沒可用記憶體。
4.3 unpipe用法
- readable.unpipe()方法將之前通過stream.pipe()方法繫結的流分離
-
如果 destination 沒有傳入, 則所有繫結的流都會被分離.
let fs = require('fs'); var from = fs.createReadStream('./1.txt'); var to = fs.createWriteStream('./2.txt'); from.pipe(to); setTimeout(() => { console.log('關閉向2.txt的寫入'); from.unpipe(writable); console.log('手工關閉檔案流'); to.end(); }, 1000); 複製程式碼
4.4 cork
呼叫 writable.cork() 方法將強制所有寫入資料都存放到記憶體中的緩衝區裡。 直到呼叫 stream.uncork() 或 stream.end() 方法時,緩衝區裡的資料才會被輸出。
4.5 uncork
writable.uncork()將輸出在stream.cork()
方法被呼叫之後緩衝在記憶體中的所有資料。
stream.cork(); stream.write('1'); stream.write('2'); process.nextTick(() => stream.uncork()); 複製程式碼
5. 簡單實現
5.1 可讀流的簡單實現
let fs = require('fs'); let ReadStream = require('./ReadStream'); let rs = ReadStream('./1.txt', { flags: 'r', encoding: 'utf8', start: 3, end: 7, highWaterMark: 3 }); rs.on('open', function () { console.log("open"); }); rs.on('data', function (data) { console.log(data); }); rs.on('end', function () { console.log("end"); }); rs.on('close', function () { console.log("close"); }); /** open 456 789 end close **/ 複製程式碼
let fs = require('fs'); let EventEmitter = require('events'); class WriteStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.fd = options.fd; this.flags = options.flags || 'r'; this.encoding = options.encoding; this.start = options.start || 0; this.pos = this.start; this.end = options.end; this.flowing = false; this.autoClose = true; this.highWaterMark = options.highWaterMark || 64 * 1024; this.buffer = Buffer.alloc(this.highWaterMark); this.length = 0; this.on('newListener', (type, listener) => { if (type == 'data') { this.flowing = true; this.read(); } }); this.on('end', () => { if (this.autoClose) { this.destroy(); } }); this.open(); } read() { if (typeof this.fd != 'number') { return this.once('open', () => this.read()); } let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark; fs.read(this.fd,this.buffer,0,n,this.pos,(err,bytesRead)=>{ if(err){ return; } if(bytesRead){ let data = this.buffer.slice(0,bytesRead); data = this.encoding?data.toString(this.encoding):data; this.emit('data',data); this.pos += bytesRead; if(this.end && this.pos > this.end){ return this.emit('end'); } if(this.flowing) this.read(); }else{ this.emit('end'); } }) } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) return this.emit('error', err); this.fd = fd; this.emit('open', fd); }) } end() { if (this.autoClose) { this.destroy(); } } destroy() { fs.close(this.fd, () => { this.emit('close'); }) } } module.exports = WriteStream; 複製程式碼
5.2 可寫流的簡單實現
let fs = require('fs'); let FileWriteStream = require('./FileWriteStream'); let ws = FileWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); let i = 10; function write(){ letflag = true; while(i&&flag){ flag = ws.write("1",'utf8',(function(i){ return function(){ console.log(i); } })(i)); i--; console.log(flag); } } write(); ws.on('drain',()=>{ console.log("drain"); write(); }); /** 10 9 8 drain 7 6 5 drain 4 3 2 drain 1 **/ 複製程式碼
let fs = require('fs'); let EventEmitter = require('events'); class WriteStream extendsEventEmitter{ constructor(path, options) { super(path, options); this.path = path; this.fd = options.fd; this.flags = options.flags || 'w'; this.mode = options.mode || 0o666; this.encoding = options.encoding; this.start = options.start || 0; this.pos = this.start; this.writing = false; this.autoClose = true; this.highWaterMark = options.highWaterMark || 16 * 1024; this.buffers = []; this.length = 0; this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) return this.emit('error', err); this.fd = fd; this.emit('open', fd); }) } write(chunk, encoding, cb) { if (typeof encoding == 'function') { cb = encoding; encoding = null; } chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8'); let len = chunk.length; this.length += len; let ret = this.length < this.highWaterMark; if (this.writing) { this.buffers.push({ chunk, encoding, cb, }); } else { this.writing = true; this._write(chunk, encoding,this.clearBuffer.bind(this)); } return ret; } _write(chunk, encoding, cb) { if (typeof this.fd != 'number') { return this.once('open', () => this._write(chunk, encoding, cb)); } fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => { if (err) { if (this.autoClose) { this.destroy(); } return this.emit('error', err); } this.length -= written; this.pos += written; cb && cb(); }); } clearBuffer() { let data = this.buffers.shift(); if (data) { this._write(data.chunk, data.encoding, this.clearBuffer.bind(this)) } else { this.writing = false; this.emit('drain'); } } end() { if (this.autoClose) { this.emit('end'); this.destroy(); } } destroy() { fs.close(this.fd, () => { this.emit('close'); }) } } module.exports = WriteStream; 複製程式碼
5.3 pipe
let fs = require('fs'); let ReadStream = require('./ReadStream'); let rs = ReadStream('./1.txt', { flags: 'r', encoding: 'utf8', highWaterMark: 3 }); let FileWriteStream = require('./WriteStream'); let ws = FileWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); rs.pipe(ws); 複製程式碼
ReadStream.prototype.pipe = function (dest) { this.on('data', (data)=>{ let flag = dest.write(data); if(!flag){ this.pause(); } }); dest.on('drain', ()=>{ this.resume(); }); this.on('end', ()=>{ dest.end(); }); } ReadStream.prototype.pause = function(){ this.flowing = false; } ReadStream.prototype.resume = function(){ this.flowing = true; this.read(); } 複製程式碼
5.4 暫停模式
let fs =require('fs'); let ReadStream2 = require('./ReadStream2'); let rs = new ReadStream2('./1.txt',{ start:3, end:8, encoding:'utf8', highWaterMark:3 }); rs.on('readable',function () { console.log('readable'); console.log('rs.buffer.length',rs.length); let d = rs.read(1); console.log(d); console.log('rs.buffer.length',rs.length); setTimeout(()=>{ console.log('rs.buffer.length',rs.length); },500) }); 複製程式碼
`
let fs = require('fs'); let EventEmitter = require('events'); class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.buffer = Buffer.alloc(this.highWaterMark); this.flags = options.flags || 'r'; this.encoding = options.encoding; this.mode = options.mode || 0o666; this.start = options.start || 0; this.end = options.end; this.pos = this.start; this.autoClose = options.autoClose || true; this.bytesRead = 0; this.closed = false; this.flowing; this.needReadable = false; this.length = 0; this.buffers = []; this.on('end', function () { if (this.autoClose) { this.destroy(); } }); this.on('newListener', (type) => { if (type == 'data') { this.flowing = true; this.read(); } if (type == 'readable') { this.read(0); } }); this.open(); }
open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { if (this.autoClose) { this.destroy(); return this.emit('error', err); } } this.fd = fd; this.emit('open'); }); } read(n) { if (typeof this.fd != 'number') { return this.once('open', () => this.read()); } n = parseInt(n,10); if(n != n){ n = this.length; } if(this.length ==0) this.needReadable = true; let ret; if (0<n < this.length) { ret = Buffer.alloc(n); let b ; let index = 0; while(null != (b = this.buffers.shift())){ for(let i=0;i<b.length;i++){ ret[index++] = b[i]; if(index == ret.length){ this.length -= n; b = b.slice(i+1); this.buffers.unshift(b); break; } } } if (this.encoding) ret = ret.toString(this.encoding); } let _read = () => { let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => { if (err) { return } let data; if (bytesRead > 0) { data = this.buffer.slice(0, bytesRead); this.pos += bytesRead; this.length += bytesRead; if (this.end && this.pos > this.end) { if(this.needReadable){ this.emit('readable'); } this.emit('end'); } else { this.buffers.push(data); if(this.needReadable){ this.emit('readable'); this.needReadable = false; } } } else { if(this.needReadable) { this.emit('readable'); } return this.emit('end'); } }) } if (this.length == 0 || (this.length < this.highWaterMark)) { _read(0); } return ret; } destroy() { fs.close(this.fd, (err) => { this.emit('close'); }); } pause() { this.flowing = false; } resume() { this.flowing = true; this.read(); } pipe(dest) { this.on('data', (data) => { let flag = dest.write(data); if (!flag) this.pause(); }); dest.on('drain', () => { this.resume(); }); this.on('end', () => { dest.end(); }); }複製程式碼