国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > JavaScript > 正文

淺談Node.js:理解stream

2019-11-19 18:37:57
字體:
供稿:網(wǎng)友

Stream在node.js中是一個抽象的接口,基于EventEmitter,也是一種Buffer的高級封裝,用來處理流數(shù)據(jù)。流模塊便是提供各種API讓我們可以很簡單的使用Stream。

流分為四種類型,如下所示:

  • Readable,可讀流
  • Writable,可寫流
  • Duplex,讀寫流
  • Transform,擴展的Duplex,可修改寫入的數(shù)據(jù)

1、Readable可讀流

通過stream.Readable可創(chuàng)建一個可讀流,它有兩種模式:暫停和流動。

在流動模式下,將自動從下游系統(tǒng)讀取數(shù)據(jù)并使用data事件輸出;暫停模式下,必須顯示調(diào)用stream.read()方法讀取數(shù)據(jù),并觸發(fā)data事件。

所有的可讀流最開始都是暫停模式,可以通過以下方法切換到流動模式:

  • 監(jiān)聽'data'事件
  • 調(diào)用stream.resume()方法
  • 調(diào)用stream.pipe()方法將數(shù)據(jù)輸出到一個可寫流Writable

同樣地,也可以切換到暫停模式,有兩種方法:

  • 如果沒有設(shè)置pipe目標(biāo),調(diào)用stream.pause()方法即可。
  • 如果設(shè)置了pipe目標(biāo),則需要移除所有的data監(jiān)聽和調(diào)用stream.unpipe()方法

在Readable對象中有一個_readableSate的對象,通過該對象可以得知流當(dāng)前處于什么模式,如下所示:

  • readable._readableState.flowing = null,沒有數(shù)據(jù)消費者,流不產(chǎn)生數(shù)據(jù)
  • readable._readableState.flowing = true,處于流動模式
  • readable._readableState.flowing = false,處于暫停模式

為什么使用流取數(shù)據(jù)

對于小文件,使用fs.readFile()方法讀取數(shù)據(jù)更方便,但需要讀取大文件的時候,比如幾G大小的文件,使用該方法將消耗大量的內(nèi)存,甚至使程序崩潰。這種情況下,使用流來處理是更合適的,采用分段讀取,便不會造成內(nèi)存的'爆倉'問題。

data事件

在stream提供數(shù)據(jù)塊給消費者時觸發(fā),有可能是切換到流動模式的時候,也有可能是調(diào)用readable.read()方法且有有效數(shù)據(jù)塊的時候,使用如下所示:

const fs = require('fs');const rs = fs.createReadStream('./appbak.js');var chunkArr = [],  chunkLen = 0;rs.on('data',(chunk)=>{  chunkArr.push(chunk);  chunkLen+=chunk.length;});rs.on('end',(chunk)=>{  console.log(Buffer.concat(chunkArr,chunkLen).toString());});

readable事件

當(dāng)流中有可用數(shù)據(jù)能被讀取時觸發(fā),分為兩種,新的可用的數(shù)據(jù)和到達流的末尾,前者stream.read()方法返回可用數(shù)據(jù),后者返回null,如下所示:

const rs = fs.createReadStream('./appbak.js');var chunkArr = [],  chunkLen = 0;rs.on('readable',()=>{  var chunk = null;  //這里需要判斷是否到了流的末尾  if((chunk = rs.read()) !== null){    chunkArr.push(chunk);    chunkLen+=chunk.length;  }});rs.on('end',(chunk)=>{  console.log(Buffer.concat(chunkArr,chunkLen).toString());});

pause和resume方法

stream.pause()方法讓流進入暫停模式,并停止'data'事件觸發(fā),stream.resume()方法使流進入流動模式,并恢復(fù)'data'事件觸發(fā),也可以用來消費所有數(shù)據(jù),如下所示:

const rs = fs.createReadStream('./下載.png');rs.on('data',(chunk)=>{  console.log(`接收到${chunk.length}字節(jié)數(shù)據(jù)...`);  rs.pause();  console.log(`數(shù)據(jù)接收將暫停1.5秒.`);  setTimeout(()=>{    rs.resume();  },1000);});rs.on('end',(chunk)=>{  console.log(`數(shù)據(jù)接收完畢`);});

pipe(destination[, options])方法

pipe()方法綁定一個可寫流到可讀流上,并自動切換到流動模式,將所有數(shù)據(jù)輸出到可寫流,以及做好了數(shù)據(jù)流的管理,不會發(fā)生數(shù)據(jù)丟失的問題,使用如下所示:

const rs = fs.createReadStream('./app.js');rs.pipe(process.stdout);

以上介紹了多種可讀流的數(shù)據(jù)消費的方法,但對于一個可讀流,最好只選擇其中的一種,推薦使用pipe()方法。

2、Writable可寫流

所有的可寫流都是基于stream.Writable類創(chuàng)建的,創(chuàng)建之后便可將數(shù)據(jù)寫入該流中。

write(chunk[, encoding][, callback])方法

write()方法向可寫流中寫入數(shù)據(jù),參數(shù)含義:

  • chunk,字符串或buffer
  • encoding,若chunk為字符串,則是chunk的編碼
  • callback,當(dāng)前chunk數(shù)據(jù)寫入磁盤時的回調(diào)函數(shù)

該方法的返回值為布爾值,如果為false,則表示需要寫入的數(shù)據(jù)塊被緩存并且此時緩存的大小超出highWaterMark閥值,否則為true。

 使用如下所示:

const ws = fs.createWriteStream('./test.txt');ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});ws.end('done.')

背壓機制

如果可寫流的寫入速度跟不上可讀流的讀取速度,write方法添加的數(shù)據(jù)將被緩存,逐漸增多,導(dǎo)致占用大量內(nèi)存。我們希望的是消耗一個數(shù)據(jù),再去讀取一個數(shù)據(jù),這樣內(nèi)存就維持在一個水平上。如何做到這一點?可以利用write方法的返回值來判斷可寫流的緩存狀態(tài)和'drain'事件,及時切換可讀流的模式,如下所示:

function copy(src,dest){  src = path.resolve(src);  dest = path.resolve(dest);  const rs = fs.createReadStream(src);  const ws = fs.createWriteStream(dest);  console.log('正在復(fù)制中...');  const stime = +new Date();  rs.on('data',(chunk)=>{    if(null === ws.write(chunk)){      rs.pause();    }  });  ws.on('drain',()=>{    rs.resume();  });  rs.on('end',()=>{    const etime = +new Date();    console.log(`已完成,用時:${(etime-stime)/1000}秒`);    ws.end();  });  function calcProgress(){      }}copy('./CSS權(quán)威指南 第3版.pdf','./javascript.pdf');

drain事件

如果Writable.write()方法返回false,則drain事件將會被觸發(fā),上面的背壓機制已經(jīng)使用了該事件。

finish事件

在調(diào)用stream.end()方法之后且所有緩存區(qū)的數(shù)據(jù)都被寫入到下游系統(tǒng),就會觸發(fā)該事件,如下所示:

const ws = fs.createWriteStream('./alphabet.txt');const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';ws.on('finish',()=>{  console.log('done.');});for(let letter of alphabetStr.split()){  ws.write(letter);}ws.end();//必須調(diào)用

end([chunk][, encoding][, callback])方法

end()方法被調(diào)用之后,便不能再調(diào)用stream.write()方法寫入數(shù)據(jù),負(fù)責(zé)將拋出錯誤。

3、Duplex讀寫流

Duplex流同時實現(xiàn)了Readable與Writable類的接口,既是可讀流,也是可寫流。例如'zlib streams'、'crypto streams'、'TCP sockets'等都是Duplex流。

4、Transform流

Duplex流的擴展,區(qū)別在于,Transform流自動將寫入端的數(shù)據(jù)變換后添加到可讀端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四種流的實現(xiàn)

stream模塊提供的API可以讓我們很簡單的實現(xiàn)流,該模塊使用require('stream')引用,我們只要繼承四種流中的一個基類(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后實現(xiàn)它的接口就可以了,需要實現(xiàn)的接口如下所示:

| Use-case | Class | Method(s) to implement |
 | ------------- |-------------| -----|
 | Reading only | Readable | _read |
 | Writing only | Writable | _write, _writev |
 | Reading and writing | Duplex | _read, _write, _writev |
 | Operate on written data, then read the result | Transform | _transform, _flush |

Readable流實現(xiàn)

如上所示,我們只要繼承Readable類并實現(xiàn)_read接口即可,,如下所示:

const Readable = require('stream').Readable;const util = require('util');const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();/*function AbReadable(){  if(!this instanceof AbReadable){    return new AbReadable();  }  Readable.call(this);}util.inherits(AbReadable,Readable);AbReadable.prototype._read = function(){  if(!alphabetArr.length){    this.push(null);  }else{    this.push(alphabetArr.shift());  }};const abReadable = new AbReadable();abReadable.pipe(process.stdout);*//*class AbReadable extends Readable{  constructor(){    super();  }  _read(){    if(!alphabetArr.length){      this.push(null);    }else{      this.push(alphabetArr.shift());    }  }}const abReadable = new AbReadable();abReadable.pipe(process.stdout);*//*const abReadable = new Readable({  read(){    if(!alphabetArr.length){      this.push(null);    }else{      this.push(alphabetArr.shift());    }  }});abReadable.pipe(process.stdout);*/const abReadable = Readable();abReadable._read = function(){  if (!alphabetArr.length) {    this.push(null);  } else {    this.push(alphabetArr.shift());  }}abReadable.pipe(process.stdout);

以上代碼使用了四種方法創(chuàng)建一個Readable可讀流,必須實現(xiàn)_read()方法,以及用到了readable.push()方法,該方法的作用是將指定的數(shù)據(jù)添加到讀取隊列。

Writable流實現(xiàn)

我們只要繼承Writable類并實現(xiàn)_write或_writev接口,如下所示(只使用兩種方法):

/*class MyWritable extends Writable{  constructor(){    super();  }  _write(chunk,encoding,callback){    process.stdout.write(chunk);    callback();  }}const myWritable = new MyWritable();*/const myWritable = new Writable({  write(chunk,encoding,callback){    process.stdout.write(chunk);    callback();  }});myWritable.on('finish',()=>{  process.stdout.write('done');})myWritable.write('a');myWritable.write('b');myWritable.write('c');myWritable.end();

Duplex流實現(xiàn)

實現(xiàn)Duplex流,需要繼承Duplex類,并實現(xiàn)_read和_write接口,如下所示:

class MyDuplex extends Duplex{  constructor(){    super();    this.source = [];  }  _read(){    if (!this.source.length) {      this.push(null);    } else {      this.push(this.source.shift());    }  }  _write(chunk,encoding,cb){    this.source.push(chunk);    cb();  }}const myDuplex = new MyDuplex();myDuplex.on('finish',()=>{  process.stdout.write('write done.')});myDuplex.on('end',()=>{  process.stdout.write('read done.')});myDuplex.write('/na/n');myDuplex.write('c/n');myDuplex.end('b/n');myDuplex.pipe(process.stdout);

上面的代碼實現(xiàn)了_read()方法,可作為可讀流來使用,同時實現(xiàn)了_write()方法,又可作為可寫流來使用。

Transform流實現(xiàn)

實現(xiàn)Transform流,需要繼承Transform類,并實現(xiàn)_transform接口,如下所示:

class MyTransform extends Transform{  constructor(){    super();  }  _transform(chunk, encoding, callback){    chunk = (chunk+'').toUpperCase();    callback(null,chunk);  }}const myTransform = new MyTransform();myTransform.write('hello world!');myTransform.end();myTransform.pipe(process.stdout);

上面代碼中的_transform()方法,其第一個參數(shù),要么為error,要么為null,第二個參數(shù)將被自動轉(zhuǎn)發(fā)給readable.push()方法,因此該方法也可以使用如下寫法:

_transform(chunk, encoding, callback){  chunk = (chunk+'').toUpperCase()  this.push(chunk)  callback();}

Object Mode流實現(xiàn)

我們知道流中的數(shù)據(jù)默認(rèn)都是Buffer類型,可讀流的數(shù)據(jù)進入流中便被轉(zhuǎn)換成buffer,然后被消耗,可寫流寫入數(shù)據(jù)時,底層調(diào)用也將其轉(zhuǎn)化為buffer。但將構(gòu)造函數(shù)的objectMode選擇設(shè)置為true,便可產(chǎn)生原樣的數(shù)據(jù),如下所示:

const rs = Readable();rs.push('a');rs.push('b');rs.push(null);rs.on('data',(chunk)=>{console.log(chunk);});//<Buffer 61>與<Buffer 62>const rs1 = Readable({objectMode:!0});rs1.push('a');rs1.push('b');rs1.push(null);rs1.on('data',(chunk)=>{console.log(chunk);});//a與b

下面利用Transform流實現(xiàn)一個簡單的CSS壓縮工具,如下所示:

function minify(src,dest){  const transform = new Transform({    transform(chunk,encoding,cb){      cb(null,(chunk.toString()).replace(/[/s/r/n/t]/g,''));    }  });  fs.createReadStream(src,{encoding:'utf8'}).pipe(transform).pipe(fs.createWriteStream(dest));}minify('./reset.css','./reset.min.css');

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持武林網(wǎng)。

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 夏津县| 桃园市| 饶平县| 赤壁市| 南皮县| 桐梓县| 周至县| 金川县| 瓮安县| 六枝特区| 盐津县| 土默特右旗| 衢州市| 河池市| 方山县| 镇安县| 株洲县| 沙湾县| 太和县| 满城县| 丽江市| 凤山市| 绥宁县| 昌图县| 高台县| 灵台县| 尉氏县| 连平县| 金乡县| 刚察县| 南雄市| 房产| 澜沧| 扎兰屯市| 滁州市| 罗定市| 韶山市| 长宁县| 尉氏县| 广州市| 双江|