最新公告
  • 欢迎您光临起源地模板网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入钻石VIP
  • 彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    正文概述 掘金(zxg_神说要有光)   2021-08-26   451

    把一个东西从 A 搬到 B 该怎么搬呢?

    抬起来,移动到目的地,放下不就行了么。

    那如果这个东西有一吨重呢?

    那就一部分一部分的搬。

    其实 IO 也就是搬东西,包括网络的 IO、文件的 IO,如果数据量少,那么直接传送全部内容就行了,但如果内容特别多,一次性加载到内存会崩溃,而且速度也慢,这时候就可以一部分一部分的处理,这就是流的思想。

    各种语言基本都实现了 stream 的 api,Node.js 也是,stream api 是比较常用的,下面我们就来探究一下 stream。

    本文会回答以下问题:

    • Node.js 的 4 种 stream 是什么
    • 生成器如何与 Readable Stream 结合
    • stream 的暂停和流动
    • 什么是背压问题,如何解决

    Node.js 的 4种 stream

    流的直观感受

    从一个地方流到另一个地方,显然有流出的一方和流入的一方,流出的一方就是可读流(readable),而流入的一方就是可写流(writable)。

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    当然,也有的流既可以流入又可以流出,这种叫做双工流(duplex)

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    既然可以流入又可以流出,那么是不是可以对流入的内容做下转换再流出呢,这种流叫做转换流(transform)

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    duplex 流的流入和流出内容不需要相关,而 transform 流的流入和流出是相关的,这是两者的区别。

    流的 api

    Node.js 提供的 stream 就是上面介绍的那 4 种:

    const stream = require('stream');
    
    // 可读流
    const Readable = stream.Readable;
    // 可写流
    const Writable = stream.Writable;
    // 双工流
    const Duplex = stream.Duplex;
    // 转换流
    const Transform = stream.Transform;
    

    它们都有要实现的方法:

    • Readable 需要实现 _read 方法来返回内容
    • Writable 需要实现 _write 方法来接受内容
    • Duplex 需要实现 _read 和 _write 方法来接受和返回内容
    • Transform 需要实现 _transform 方法来把接受的内容转换之后返回

    我们分别来看一下:

    Readable

    Readable 要实现 _read 方法,通过 push 返回具体的数据。

    const Stream = require('stream');
    
    const readableStream = Stream.Readable();
    
    readableStream._read = function() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }
    
    readableStream.on('data', (data)=> {
        console.log(data.toString())
    });
    
    readableStream.on('end', () => {
        console.log('done~');
    });
    

    当 push 一个 null 时,就代表结束流。

    执行效果如下:

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    创建 Readable 也可以通过继承的方式:

    const Stream = require('stream');
    
    class ReadableDong extends Stream.Readable {
    
        constructor() {
            super();
        }
    
        _read() {
            this.push('阿门阿前一棵葡萄树,');
            this.push('阿东阿东绿的刚发芽,');
            this.push('阿东背着那重重的的壳呀,');
            this.push('一步一步地往上爬。')
            this.push(null);
        }
    
    }
    
    const readableStream = new ReadableDong();
    
    readableStream.on('data', (data)=> {
        console.log(data.toString())
    });
    
    readableStream.on('end', () => {
        console.log('done~');
    });
    

    可读流是生成内容的,那么很自然可以和生成器结合:

    const Stream = require('stream');
    
    class ReadableDong extends Stream.Readable {
    
        constructor(iterator) {
            super();
            this.iterator = iterator;
        }
    
        _read() {
            const next = this.iterator.next();
            if(next.done) {
                return this.push(null);
            } else {
                this.push(next.value)
            }
        }
    
    }
    
    function *songGenerator() {
        yield '阿门阿前一棵葡萄树,';
        yield '阿东阿东绿的刚发芽,';
        yield '阿东背着那重重的的壳呀,';
        yield '一步一步地往上爬。';
    }
    
    const songIterator = songGenerator();
    
    const readableStream = new ReadableDong(songIterator);
    
    readableStream.on('data', (data)=> {
        console.log(data.toString())
    });
    
    readableStream.on('end', () => {
        console.log('done~');
    });
    

    这就是可读流,通过实现 _read 方法来返回内容。

    Writable

    Writable 要实现 _write 方法,接收写入的内容。

    const Stream = require('stream');
    
    const writableStream = Stream.Writable();
    
    writableStream._write = function (data, enc, next) {
       console.log(data.toString());
       // 每秒写一次
       setTimeout(() => {
           next();
       }, 1000);
    }
    
    writableStream.on('finish', () => console.log('done~'));
    
    writableStream.write('阿门阿前一棵葡萄树,');
    writableStream.write('阿东阿东绿的刚发芽,');
    writableStream.write('阿东背着那重重的的壳呀,');
    writableStream.write('一步一步地往上爬。');
    writableStream.end();
    

    接收写入的内容,打印出来,并且调用 next 来处理下一个写入的内容,这里调用 next 是异步的,可以控制频率。

    跑了一下,确实可以正常的处理写入的内容:

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    这就是可写流,通过实现 _write 方法来处理写入的内容。

    Duplex

    Duplex 是可读可写,同时实现 _read 和 _write 就可以了

    const Stream = require('stream');
    
    var duplexStream = Stream.Duplex();
    
    duplexStream._read = function () {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }
    
    duplexStream._write = function (data, enc, next) {
        console.log(data.toString());
        next();
    }
    
    duplexStream.on('data', data => console.log(data.toString()));
    duplexStream.on('end', data => console.log('read done~'));
    
    duplexStream.write('阿门阿前一棵葡萄树,');
    duplexStream.write('阿东阿东绿的刚发芽,');
    duplexStream.write('阿东背着那重重的的壳呀,');
    duplexStream.write('一步一步地往上爬。');
    duplexStream.end();
    
    duplexStream.on('finish', data => console.log('write done~'));
    

    整合了 Readable 流和 Writable 流的功能,这就是双工流 Duplex。

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    Transform

    Duplex 流虽然可读可写,但是两者之间没啥关联,而有的时候需要对流入的内容做转换之后流出,这时候就需要转换流 Transform。

    Transform 流要实现 _transform 的 api,我们实现下对内容做反转的转换流:

    const Stream = require('stream');
    
    class TransformReverse extends Stream.Transform {
    
      constructor() {
        super()
      }
    
      _transform(buf, enc, next) {
        const res = buf.toString().split('').reverse().join('');
        this.push(res)
        next()
      }
    }
    
    var transformStream = new TransformReverse();
    
    transformStream.on('data', data => console.log(data.toString()))
    transformStream.on('end', data => console.log('read done~'));
    
    transformStream.write('阿门阿前一棵葡萄树');
    transformStream.write('阿东阿东绿的刚发芽');
    transformStream.write('阿东背着那重重的的壳呀');
    transformStream.write('一步一步地往上爬');
    transformStream.end()
    
    transformStream.on('finish', data => console.log('write done~'));
    

    跑了一下,效果如下:

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    流的暂停和流动

    我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    背压

    但是 read 和 write 都是异步的,如果两者速率不一致呢?

    如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。

    而如果 Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。

    这种读入速率大于写入速率的现象叫做“背压”,或者“负压”。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。

    这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。

    彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    解决背压

    怎么解决这种读写速率不一致的问题呢?

    当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。

    readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。

    当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。

    readableStream.readableFlowing = false;
    
    let data;
    while((data = readableStream.read()) != null) {
        console.log(data.toString());
    }
    

    但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。

    当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:

    • true: 数据已经写入目标
    • false:目标不可写入,暂时放在缓冲区

    我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:

    const rs = fs.createReadStream(src);
    const ws = fs.createWriteStream(dst);
    
    rs.on('data', function (chunk) {
        if (ws.write(chunk) === false) {
            rs.pause();
        }
    });
    
    rs.on('end', function () {
        ws.end();
    });
    
    ws.on('drain', function () {
        rs.resume();
    });
    

    这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。

    pipe 有背压问题么?

    平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。

    const rs = fs.createReadStream(src);
    const ws = fs.createWriteStream(dst);
    
    rs.pipe(ws);
    

    总结

    流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。

    Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。

    创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)

    当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。

    流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!


    起源地下载网 » 彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    模板不会安装或需要功能定制以及二次开发?
    请QQ联系我们

    发表评论

    还没有评论,快来抢沙发吧!

    如需帝国cms功能定制以及二次开发请联系我们

    联系作者

    请选择支付方式

    ×
    迅虎支付宝
    迅虎微信
    支付宝当面付
    余额支付
    ×
    微信扫码支付 0 元