從前面兩篇文章,我們了解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,然后寫入 Writable。換句話說,每次傳遞數據時,都需要寫如下的模板代碼
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read())})為了方便使用,Node.js 提供了 pipe() 方法,讓我們可以優雅的傳遞數據
readable.pipe(writable)
現在,就讓我們來看看它是如何實現的吧
pipe
首先需要先調用 Readable 的 pipe() 方法
// lib/_stream_readable.jsReadable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 記錄 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保證 error 事件觸發時,onerror 首先被執行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 觸發 Writable 的 pipe 事件 dest.emit('pipe', src); // 將 Readable 改為 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest;};執行 pipe() 函數時,首先將 Writable 記錄到 state.pipes 中,然后綁定相關事件,最后如果 Readable 不是 flow 模式,就調用 resume() 將 Readable 改為 flow 模式
傳遞數據
Readable 從數據源獲取到數據后,觸發 data 事件,執行 ondata()
ondata() 相關代碼:
// lib/_stream_readable.js // 防止在 dest.write(chunk) 內調用 src.push(chunk) 造成 awaitDrain 重復增加,awaitDrain 不能清零,Readable 卡住的情況 // 詳情見 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 內調用 src.unpipe(dest),導致 awaitDrain 不能清零,Readable 卡住的情況 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 進入 pause 模式 src.pause(); } }在 ondata(chunk) 函數內,通過 dest.write(chunk) 將數據寫入 Writable
此時,在 _write() 內部可能會調用 src.push(chunk) 或使其 unpipe,這會導致 awaitDrain 多次增加,不能清零,Readable 卡住
當不能再向 Writable 寫入數據時,Readable 會進入 pause 模式,直到所有的 drain 事件觸發
新聞熱點
疑難解答
圖片精選