什么是流(steams)
流(stream)是 Node.js 中處理流式數(shù)據(jù)的抽象接口。
Streams 不是 Node.js 獨有的概念。它們是幾十年前在 Unix 操作系統(tǒng)中引入的。
它們能夠以一種有效的方式來處理文件的讀、寫,網(wǎng)絡(luò)通信或任何類型的端到端信息交換。
例如,當(dāng)你編寫了一段程序用來讀取文件時,傳統(tǒng)的方法是將文件從頭到尾讀入內(nèi)存,然后再進(jìn)行處理。而使用流的話,你就可以逐塊讀取它,處理其內(nèi)容而不將其全部保存在內(nèi)存中。
以如下代碼為例
const fs = require('fs');const rs = fs.createReadStream('test.md');let data = '';rs.on("data", function (chunk) {data += chunk;});rs.on("end", function() {console.log(data);});利用 createReadStream 創(chuàng)建一個讀取數(shù)據(jù)的流,來讀取 test.md 文件的內(nèi)容,此時監(jiān)聽 data 事件,它是在當(dāng)流將數(shù)據(jù)塊傳送給消費者后觸發(fā)。并在對應(yīng)的 eventHandler 中,拼接 chunk。在 end 事件中,打印到終端上。
之前說流,可以逐塊讀取文件內(nèi)容,那么這個塊,也就是 chunk 是什么?
一般情況下是 Buffer,修改 data 事件的 eventHandler 來驗證下
rs.on("data", function (chunk) {console.log("chunk", Buffer.isBuffer(chunk)) // log truedata += chunk;});流的工作方式可以具體的表述為,在內(nèi)存中準(zhǔn)備一段 Buffer,然后在 fs.read() 讀取時逐步從磁盤中將字節(jié)復(fù)制到 Buffer 中。
為什么要使用 Stream
利用 Stream 來處理數(shù)據(jù),主要是因為它的兩個優(yōu)點:
內(nèi)存效率:在夠處理數(shù)據(jù)之前,不需要占用大量內(nèi)存;
時間效率:處理數(shù)據(jù)花費的時間更少,因為流是逐塊來處理數(shù)據(jù),而不是等到整個數(shù)據(jù)有效負(fù)載才啟動。
首先內(nèi)存效率,與 fs.readFile 這種會緩沖整個文件相比,流式傳輸充分地利用 Buffer (超過 8kb)不受 V8 內(nèi)存控制的特點,利用堆外內(nèi)存完成高效地傳輸。相關(guān)驗證可以參考這篇博文,地址。
時間效率,與 fs.FileSync 相比,有些優(yōu)勢,但是與異步的 fs.readFile 相比,優(yōu)勢不大。
Node.js 中 Stream 的使用
首先用一張圖來了解下 Node.js 中有哪些內(nèi)置的 Stream 接口

圖中提供了一些 Node.js 原生的流的示例,有些是可讀、寫的流。 也有一些是可讀寫的流,如 TCP sockets、zlib 以及 crypto。
特別注意: 流的讀、寫與環(huán)境是密切相關(guān)的。例如 HTTP 響應(yīng)在客戶端上的可讀流,但它是服務(wù)器上的可寫流。同時還需要注意,stdio streams(stdin,stdout,stderr) 在子進(jìn)程上是相反的流。
使用一個例子來展示流的使用
首先利用如下腳本創(chuàng)建一個比較大的文件(大概 430 MB)
const fs = require('fs');const file = fs.createWriteStream('test.md');for(let i=0; i<= 1e6; i++) {file.write('hello world./n');}file.end();在當(dāng)前目錄下,啟動 http 服務(wù)
const http = require('http')const fs = require('fs')const server = http.createServer(function (req, res) {fs.readFile(__dirname + '/test.md', (err, data) => {res.end(data)})})server.listen(3000)得到的結(jié)果,如圖

const http = require('http')const fs = require('fs')const server = http.createServer((req, res) => {const stream = fs.createReadStream(__dirname + '/test.md')stream.pipe(res)})server.listen(3000)
時間減少了 2s 多。這可以解釋為,在讀取文件內(nèi)容,并且不需要改變內(nèi)容的場景下,流能夠完成只讀取 buffer,然后直接傳輸,不做額外的轉(zhuǎn)換,避免損耗,提高性能。
上述代碼中,應(yīng)用了 stream.pipe(...) 。它主要是對流進(jìn)行鏈?zhǔn)降毓艿啦僮鳎?br />
src.pipe(dest1).pipe(dest2)
這樣數(shù)據(jù)流會被自動管理。
如果可讀流發(fā)生錯誤,目標(biāo)可寫流不會自動關(guān)閉,需要手動關(guān)閉所有流以避免內(nèi)存泄漏。
通常,當(dāng)你使用 pipe 方法時,就不需要使用事件,但如果場景需要以更靈活、自定義的方式使用流,那么就要考慮事件。
Stream events
在上述例子中,我們使用了可讀流的 data 、end 事件來控制文件的讀取,它本質(zhì)上與 pipe 方法相同,例如
# readable.pipe(writable)readable.on('data', (chunk) => {writable.write(chunk);});readable.on('end', () => {writable.end();});只不過,使用 event 會更加靈活,可控。

圖中簡單羅列了可讀流、可寫流的相關(guān)事件、方法,其中最重要的是
可讀流:
可寫流:
流的不同類型
除了上面涉及到的可讀、寫流之后,還有 Duplex、Transform 兩類:
如何創(chuàng)建一個可讀流
這里只做簡單介紹,具體見 stream module。
const Stream = require('stream')const readableStream = new Stream.Readable()readableStream._read = (size) => {console.log('read', size)}利用 Stream 模塊初始化一個可讀流,然后向其中發(fā)送數(shù)據(jù)
readableStream.push('hi!')readableStream.push('ho!')如何創(chuàng)建一個可寫流
為了創(chuàng)建可寫流,需要擴(kuò)展了基本的 Writable 對象,并實現(xiàn)了它的 _write 方法。
const Stream = require('stream')const writableStream = new Stream.Writable()實現(xiàn) _write 方法:
writableStream._write = (chunk, encoding, next) => {console.log(chunk.toString())next()}結(jié)合上述例子實現(xiàn)
利用 readableStream 讀入數(shù)據(jù),并輸出到 writableStream
const Stream = require('stream')const readableStream = new Stream.Readable()readableStream._read = (size) => {console.log('read', size)}const writableStream = new Stream.Writable()writableStream._write = (chunk, encoding, next) => {console.log('write', chunk.toString())next()}readableStream.pipe(writableStream)readableStream.push('hi!')readableStream.push('ho!')/* log:read 16384write hi!write ho!*/以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持武林網(wǎng)。
新聞熱點
疑難解答