在node中,只要涉及到檔案IO的場景一般都會涉及到一個類-Stream。Stream是對IO裝置的抽象表示,其在JAVA中也有涉及,主要體現在四個類-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream類針對位元組資料進行讀寫;Reader和Writer針對字元資料讀寫。同時Java中有多種針對這四種類型的擴充套件類,如節點流、緩衝流和轉換流等。比較而言,node中Stream型別也和Java中的類似,同樣提供了支援位元組和字元讀寫的Readable和Writeable類,也存在轉換流Transform類,本文主要分析node中Readable和Writeable的實現機制,從底層的角度更好的理解Readable和Writeable實現機制,解讀在讀寫過程中發生的一些重要事件。
Readable類
Readable對應於Java中的InputStream和Reader兩個類,針對Readable設定encode編碼可完成內部資料由Buffer到字元的轉換。Readable Stream有兩種模式,即flowing和paused模式。這兩種模式對於使用者而言區別在於是否需要手動呼叫Readable.prototype.read(n),讀取緩衝區的資料。查詢node API文件可知觸發flowing模式有三種方式:
- 偵聽data事件
- readable.resume()
- readable.pipe()
而觸發paused模式同樣有幾種方式: - 移除data事件
- readable.pause()
- readable.unpipe()
可能這樣講解大家仍不明白Readable Stream這兩種模式的區別,那麼下文從更深層次分析兩種模式的機制。
深入Readable的實現
Readable繼承EventEmitter,大家也都知道。但是相信大家應該不怎麼熟悉Readable的例項屬性_readableState。該屬性是一個ReadableState型別的物件,儲存了Readable例項的重要資訊,如讀取模式(是否為物件模式)、highWaterMark(緩衝區存放的最大位元組數)、緩衝區、flowing模式等。在Readable的實現中,處處使用ReadableState物件記錄當前讀取狀態,並設定緩衝區保證讀操作的順利進行。
首先需要針對Readable.prototype.read方法進行特別解讀:
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
return null;
}
當讀入的資料為0時,執行emitReadable操作。這意味著,針對Readable Stream執行read(0)方法會觸發readable事件,但是不會讀當前緩衝區。因此使用read(0)可以完成一些比較巧妙的事情,如在readable處理函式中可以使用read(0)觸發下一次readable事件,可選的操作讀緩衝區。
繼續分析程式碼,如果讀入的資料並不是0,則計算讀取緩衝區的具體位元組數,
n = howMuchToRead(n, state);
function howMuchToRead(n, state) {
if (state.length === 0 && state.ended)
return 0;
if (state.objectMode)
return n === 0 ? 0 : 1;
if (n === null || isNaN(n)) {
// only flow one buffer at a time
if (state.flowing && state.buffer.length)
return state.buffer[0].length;
// 若是paused狀態,則讀全部的緩衝區
else
return state.length;
}
if (n <= 0)
return 0;
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);
// don't have that much. return null, unless we've ended.
if (n > state.length) {
if (!state.ended) {
state.needReadable = true;
return 0;
} else {
return state.length;
}
}
return n;
}
針對物件模式的讀取,每次只讀一個;對於處在flowing模式下的讀取,每次只讀緩衝區中第一個buffer的長度;在paused模式下則讀取全部緩衝區的長度;若讀取的位元組數大於設定的緩衝區最大值,則適當擴大緩衝區的大小(預設為16k,最大為8m);若讀取的長度大於當前緩衝區的大小,設定needReadable屬性並準備資料等待下一次讀取。
接下來,判斷是否需要準備資料。在這裡,依賴於needReadable的值,
var doRead = state.needReadable;
debug('need readable', doRead);
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug('length less than watermark', doRead);
}
// reading, then it's unnecessary.
if (state.ended || state.reading) {
doRead = false;
debug('reading or ended', doRead);
}
如果當前緩衝區為空,或者緩衝區並未超出我們設定的最大值,那麼就可以繼續準備資料;如果此時正在準備資料或者已經結束讀取,那麼就放棄準備資料。一旦doRead為true,那麼進入準備資料階段,
if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
// if the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
// call internal read method
// 預設Readable未實現_read,丟擲Error
// 針對自定義的Readable子類,_read可修改state.buffer的數量,進行預處理,
// 然後由下面的fromList讀出去快取中的相關資料
this._read(state.highWaterMark);
state.sync = false;
}
接下來設定相關的標誌位,進行_read處理。針對這個私有方法_read,文件上有特殊說明,自定義的Readable實現類需要實現這個方法,在該方法中手動新增資料到Readable物件的讀緩衝區,然後進行Readable的讀取。可以理解為_read函式為讀取資料前的準備工作(準備資料),針對的是流的實現者而言。
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);
var ret;
if (n > 0)
ret = fromList(n, state);
else
ret = null;
if (ret === null) {
state.needReadable = true;
n = 0;
}
state.length -= n;
if (state.length === 0 && !state.ended)
state.needReadable = true;
if (nOrig !== n && state.ended && state.length === 0)
endReadable(this);
// flowing模式下的資料讀取依賴於 read函式
// data事件觸發的次數,依賴於howMuchToRead計算的次數
if (ret !== null)
this.emit('data', ret);
一旦在_read中更新了緩衝區,那麼我們需要重新計算(消費者,即可寫流)讀取的位元組數。fromList方法完成了讀緩衝區的slice,如果是objectMode下的讀,則只讀緩衝區的第一個物件;針對未傳引數的read方法而言,預設讀取全部緩衝區等等。從讀緩衝區讀取完資料之後設定相關flag,如needReadable,最終,觸發data事件,結束!
上節提到,設定data事件的執行函式會進入flowing模式的讀,而上文看到正是read方法觸發了data事件,而預設條件下Readable處於paused狀態,因此在paused狀態讀取資料需要手動執行read函式,每次read讀取完畢觸發一次data事件。從這點看出,flowing和paused狀態區別在於是否需要手動執行read()來獲取資料。flowing狀態下,我們無需執行read,僅需要設定data事件處理函式或者設定導流目標pipe;而在paused狀態下,不僅僅是簡單的執行read方法,因為讀緩衝區的內容時刻在改變,一旦讀緩衝區又有新資料,簡單執行read()就沒法滿足需求(因為我們無法知道是否又有新資料到來),因此需要偵聽讀緩衝區的相關事件,即readable事件,在該事件處理函式中進行read相關資料。
那麼,什麼情況下會觸發readable事件呢?在實現_read私有方法中,我們使用stream.push(chunk)或stream.unshift(chunk)方法注入資料到讀緩衝區,那麼push和unshift方法都實現了下面的邏輯,
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);
if (state.needReadable)
emitReadable(stream);
}
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
if (state.sync)
process.nextTick(emitReadable_, stream);
else
emitReadable_(stream);
}
}
function emitReadable_(stream) {
debug('emit readable');
stream.emit('readable');
flow(stream);
}
// 在flowing狀態下,自動讀取流(替代paused狀態下手動read)
function flow(stream) {
var state = stream._readableState;
debug('flow', state.flowing);
if (state.flowing) {
do {
var chunk = stream.read();
} while (null !== chunk && state.flowing);
}
}
一旦處於flowing模式並且當前緩衝區沒有資料,那麼就立即將預處理的push(unshift)資料傳遞給data事件處理函式,並執行stream.read(0)。前文已經交代過,read(0)僅僅用來觸發readable事件,並不讀取緩衝區,這就是觸發readable的第一種情況。
第二種則是第一種情況之外的所有情景,即根據操作(push、unshift)的不同將資料插入讀緩衝區的不同位置。最後執行emitReadable函式,觸發readable事件。針對emitReadable函式,它的作用就是非同步觸發readable事件,並執行flow函式。flow函式則針對flowing狀態的Readable做自適應讀取,免去了手動執行read函式和何時執行read函式的苦惱。
這樣,對於Readable的實現者,一旦在_read函式插入有效資料到讀緩衝區,都會觸發readable事件,在paused狀態下,設定readable事件處理函式並手動執行read函式,便可完成資料的讀取;而在flowing狀態下,通過設定data事件處理函式或者定義pipe目標流同樣可以實現讀取。
既然pipe同樣可以觸發Readable進入flowing狀態,那麼pipe方法具體做了什麼呢?其實pipe針對Readable和Writeable做了限流,首先針對Readable的data事件進行偵聽,並執行Writeable的write函式,當Writeable的寫緩衝區大於一個臨界值(highWaterMark),導致write函式返回false(此時意味著Writeable無法匹配Readable的速度,Writeable的寫緩衝區已經滿了),此時,pipe修改了Readable模式,執行pause方法,進入paused模式,停止讀取讀緩衝區。而同時Writeable開始重新整理寫緩衝區,重新整理完畢後非同步觸發drain事件,在該事件處理函式中,設定Readable為flowing狀態,並繼續執行flow函式不停的重新整理讀緩衝區,這樣就完成了pipe限流。需要注意的是,Readable和Writeable各自維護了一個緩衝區,在實現的上有區別:Readable的緩衝區是一個數組,存放Buffer、String和Object型別;而Writeable則是一個有向連結串列,依次存放需要寫入的資料。
Writeable解讀
Writeable對應Java的OutputStream和Writer類,實現位元組和字元資料的寫。與Readable類似,Writeable的例項物件同樣維護了一個狀態物件-WriteableState,記錄了當前輸出流的狀態資訊,如寫緩衝區的最大值(hightWaterMark)、緩衝區(有向連結串列)和緩衝區長度等資訊。在本節中,主要分析輸出流的關鍵方法write和事件drain,並解析輸出流的實現者需要實現的方法_write和write的關係。
function write
----------------------------
if (state.ended)
writeAfterEnd(this, cb);
else if (validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}
return ret;
在write方法中,判斷寫入資料的格式並執行writeOrBuffer函式,並返回執行結果,該返回值標示當前寫緩衝區是否已滿。真正執行寫入邏輯的是writeOrBuffer函式,該函式的作用在於重新整理或者更新寫緩衝區,下面看看主要做了什麼,
function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk = decodeChunk(state, chunk, encoding);
if (chunk instanceof Buffer)
encoding = 'buffer';
var len = state.objectMode ? 1 : chunk.length;
state.length += len;
// 如果快取的長度大於highWaterMark,需要重新整理緩衝,所以設定needDrain標誌
var ret = state.length < state.highWaterMark;
// we must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;
// 快取未處理的寫請求,在clearBuffer中執行快取
// 由此看出,Readable和Writeable都有快取,Readable 中快取的方式是陣列(項為Buffer,字串或物件),Writeable的
// 快取則是物件連結串列
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
return ret;
}
writeOrBuffer首先針對資料進行編碼,字串轉換成Buffer型別,如果設定了Writeable的ObjectMode模式則仍為Object型別;接下來更新寫緩衝區的長度,並判斷寫緩衝區長度是否超過設定的Writeable的最大值(預設16k),如果超過超過則ret=false並更新WriteableState的屬性needDrain=true。ret的結果其實就是write方法返回值,因此一旦write返回值為false,意味著當前寫緩衝區已滿,需要停止繼續寫入資料。
在Readable的pipe方法中,涉及到了Writeable的drain事件。該事件的觸發意味著寫緩衝區已可以繼續快取資料,可見drain事件與寫緩衝區嚴格相關。繼續分析writeOrBuffer函式,若當前輸出流正在寫資料,那麼則當前資料快取至寫緩衝區(建立WriteReq物件);否則執行doWrite函式,重新整理緩衝區。
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
doWrite函式設定了需要寫入資料的長度、寫入狀態等資訊,並執行輸出流實現者需要實現的_write函式。在_write函式中,針對資料流向做最後的處理,這裡分析_write函式的具體實現。_write函式有三個引數,分別為chunk,encoding和state.onwrite回撥函式,對該回調函式稍後分析,先著重講解_write函式的實現。在node的fs模組中,可以通過fs.createWriteStream建立Writeable例項,通過執行
var writeStream = fs.createWriteStream('./output',{decodeStrings: false});
console.log(writeStream._write.toString());
-----------------輸出-----------------
function (data, encoding, cb) {
if (!(data instanceof Buffer))
return this.emit('error', new Error('Invalid data'));
if (typeof this.fd !== 'number')
return this.once('open', function() {
this._write(data, encoding, cb);
});
var self = this;
fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {
if (er) {
self.destroy();
return cb(er);
}
self.bytesWritten += bytes;
cb();
});
if (this.pos !== undefined)
this.pos += data.length;
}
看出,在_write實現中,只接受Buffer型別的資料,接著執行fs.write操作,寫入到對應檔案描述符fd對應的檔案中,寫入成功或失敗後執行回撥函式,即state.onwrite函式。
function onwrite(stream, er) {
var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;
onwriteStateUpdate(state);
// 預設未重寫_write方法,會收到er值
if (er)
onwriteError(stream, state, sync, er, cb);
else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state);
// 寫快取的資料
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest) {
clearBuffer(stream, state);
}
// 非同步觸發drain事件
if (sync) {
process.nextTick(afterWrite, stream, state, finished, cb);
} else {
afterWrite(stream, state, finished, cb);
}
}
}
在state.onwrite函式中主要工作有兩個:
- 寫緩衝區的資料
- 寫完緩衝區的資料後,非同步觸發drain事件
第一步,在clearBuffer函式中,就是取出寫緩衝區(有向連結串列)的第一個WriteReq物件,執行doWrite函式,寫入緩衝區的第一個資料;這樣迴圈往復最終清空寫緩衝區,重置一些標誌位。
第二步,非同步執行afterWrite函式,觸發drain事件,並判斷是否寫操作完畢觸發“finish”事件。這裡之所以強調非同步觸發drain事件,是因為為了保證先獲得write()返回值為false,給使用者繫結drain處理函式的時隙,然後再觸發drain事件。
至此,Writeable的重要流程已全部走通。可以看出來,在核心的write()中,判斷寫緩衝區是否已滿並返回該值,在適當條件下快取資料或呼叫_write()寫資料,在Writeable實現者需要實現的** _write() 中,主要任務是資料寫入方向控制,完成最基本的任務**。
總結
對比Readable的read()和_read(),我總結了下這四個函式在“讀寫過程”中的執行順序與關係,如下圖所示: