Node.js 的 steam 已经实现了可以保证数据平滑流动的背压机制。
通过 drain 事件和 highWaterMark 可以很好的控制数据写入速度和控制内存消耗。
pipe 方法实现了这个背压机制。
数据读写时可能存在的问题下面示例是将 A 文件内容拷贝到 B 文件中。
const fs = require('fs') const rs = fs.createReadStream('A.txt') const ws = fs.createWriteStream('B.txt') rs.on('data', chunk => { ws.write(chunk) })
需要注意的是,数据从磁盘中读取出来速度,远远大于数据写入磁盘的速度,也就是消费者的速度往往跟不上生产者的速度。
这样就会出现产能过剩。
而 Writeable 的内部又维护了一个队列,在它不能实时的去消费由上游传输的数据时,它就会尝试把当前不能消化的数据先缓存到队列中。
但是队列的内存大小设置了上限,因此读写的过程中如果不实现一个背压机制,很可能会出现内存溢出、GC频繁调用、其它进程变慢的场景。
基于这种场景,就需要提供一种可以让数据生产者和消费者之间平滑流动的机制,这就是背压机制。
数据读取流程数据的读取 *** 作分为三个部分:
- 底层数据
- 缓冲区:缓存读取的数据。Readable 默认空间上限是16KB,在文件可读流中为 64KB。一旦读取流的缓冲区大小达到 highWaterMark 就会暂停从底层数据源读取数据,直到缓冲区的数据被消费。
- 消费者:通过主动调用 read 方法或监听 data 事件来消费数据
数据读取分为两种模式:
- 流动模式
- 暂停模式(默认)
从消费者的视角来看,可读流就相当于一个水池,里面装满了要消费的数据。
但是池子外部有一个水龙头开关,如果处于流动模式,就相当于一直放水,直到放完位置。
如果在这个过程中,用水的人跟不上放水的速度,不可能拿着1升的杯子去接10升的水。
所以这时用水的人在受不了的情况下,就要想办法告诉可读流要停一停,等自己先消化一下。
此时可读流就可以去调用 pause 方法将流动模式切换为暂停模式,放水的阀门就会被关闭。
现在用水的人就可以先慢慢消化之前缓冲的水资源,等到都消费完之后,就可以告诉可读流可以继续放水了。
数据写入流程数据的写入 *** 作分为三个部分:
- 生产者
- 缓存区
- 底层资源
数据从上游的生产者传递过来,然后可写流调用 write() 方法消费数据。
在可写流内部同样也有一片内存空间作为缓存队列,同样具有水位线(highWaterMark)。
如果某个时刻缓冲区的数据超过了水位线,就说明当前无法再消费更多的水资源。
此时 write() 方法调用后就会返回 false 给上游的生产者,让他暂停放水,等可写流将缓存的数据消费完,就会再触发 drain 事件通知上游的生产者可以继续放水了。
此时可读流就可以调用 resume 方法再次打开阀门。
如此往复就可以保证数据的平滑流动,既不会出现内存被撑爆的情况,也不会存在某一时刻无水可用。
这就是 pipe() 方法内部的实现原理。
模拟 pipe 中的背压机制实际应用中很少自定义背压机制,除非要对每个获取的数据进行单独的处理。一般情况下使用 pipe 即可。模拟背压机制是为了更好的理解它的原理。
const fs = require('fs') const rs = fs.createReadStream('test.txt', { highWaterMark: 3 }) const ws = fs.createWriteStream('test1.txt', { highWaterMark: 1 }) let flag = true rs.on('data', chunk => { flag = ws.write(chunk, () => { console.log('ok') }) if (!flag) { rs.pause() } }) ws.on('drain', () => { rs.resume() })
使用 pipe 方法的话就相当于:
const fs = require('fs') const rs = fs.createReadStream('test.txt', { highWaterMark: 3 }) const ws = fs.createWriteStream('test1.txt', { highWaterMark: 1 }) rs.pipe(ws)模拟文件可读流
参考原生文件可读流的使用,实现自定义可读流,以理解数据生产的流程以及文件可读流的一些原理。
创建可读流和文件打开const fs = require('fs') const EventEmitter = require('events') class MyFileReadStream extends EventEmitter { constructor(path, options = {}) { super() this.fd = null this.path = path this.flags = options.flags || 'r' // 文件打开模式,默认可读模式 this.mode = options.mode || 438 // 权限位,默认438 (wr权限) this.autoClose = options.autoClose || true // 读取完是否自动关闭文件 this.start = options.start || 0 // 读取的起始位置 this.end = options.end // 读取的结束位置 this.highWaterMark = options.highWaterMark || 64 * 1024 // 缓存区的水位线(B) this.open() } open() { // 原生 open 方法打开指定位置上的文件 fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { return this.emit('error', err) } this.fd = fd this.emit('open', fd) }) } } const rs = new MyFileReadStream('test.txt') rs.on('open', fd => { console.log('open', fd) }) rs.on('error', err => { console.log('error', err) })读取数据和关闭文件
const fs = require('fs') const EventEmitter = require('events') class MyFileReadStream extends EventEmitter { constructor(path, options = {}) { super() this.fd = null this.path = path this.flags = options.flags || 'r' // 文件打开模式,默认可读模式 this.mode = options.mode || 438 // 权限位,默认438 (wr权限) this.autoClose = options.autoClose || true // 读取完是否自动关闭文件 this.start = options.start || 0 // 读取的起始位置 this.end = options.end // 读取的结束位置 this.highWaterMark = options.highWaterMark || 64 * 1024 // 缓存区的水位线(B) this.readOffset = 0 // 每次读取的起始位置 this.open() // 注册事件触发事件 this.on('newListener', type => { if (type === 'data') { this.read() } }) } open() { // 原生 open 方法打开指定位置上的文件 fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { return this.emit('error', err) } this.fd = fd this.emit('open', fd) }) } read() { // 注册 data 事件是同步代码, // 注册 data 事件的时机可能早于 fs.open 的回调,此时 fd 还未赋值 if (typeof this.fd !== 'number') { return this.once('open', this.read) } // 申请指定大小的缓存空间 const buf = Buffer.alloc(this.highWaterMark) // 原生 read 方法读取文件数据 fs.read(this.fd, buf, 0, this.highWaterMark, this.readOffset, (err, readBytes) => { if (readBytes) { // 更新偏移量 this.readOffset += readBytes // 触发 data 事件 this.emit('data', buf.slice(0, readBytes)) // 继续读取 this.read() } else { // 没有数据可读 // 先触发 end 再触发 close this.emit('end') if (this.autoClose) { this.close() } } }) } close() { fs.close(this.fd, () => { this.emit('close') }) } } const rs = new MyFileReadStream('test.txt') // rs.on('open', fd => { // console.log('open', fd) // }) // rs.on('error', err => { // console.log('error', err) // }) // 与原生 fs 一样,如果不绑定 data 事件就不会触发 end 和 close 事件 rs.on('end', () => { console.log('end') }) rs.on('close', () => { console.log('close') }) rs.on('data', chunk => { console.log(chunk) })控制每次读取的数据量
const fs = require('fs') const EventEmitter = require('events') class MyFileReadStream extends EventEmitter { constructor(path, options = {}) { super() this.fd = null this.path = path this.flags = options.flags || 'r' // 文件打开模式,默认可读模式 this.mode = options.mode || 438 // 权限位,默认438 (wr权限) this.autoClose = options.autoClose || true // 读取完是否自动关闭文件 this.start = options.start || 0 // 读取的起始位置 this.end = options.end // 读取的结束位置(包含结束位置的数据) this.highWaterMark = options.highWaterMark || 64 * 1024 // 缓存区的水位线(B) this.readOffset = 0 // 每次读取的起始位置 this.open() // 注册事件触发事件 this.on('newListener', type => { if (type === 'data') { this.read() } }) } open() { // 原生 open 方法打开指定位置上的文件 fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { return this.emit('error', err) } this.fd = fd this.emit('open', fd) }) } read() { // 注册 data 事件是同步代码, // 注册 data 事件的时机可能早于 fs.open 的回调,此时 fd 还未赋值 if (typeof this.fd !== 'number') { return this.once('open', this.read) } // 申请指定大小的缓存空间 const buf = Buffer.alloc(this.highWaterMark) // 要读取的数据量 const howMuchToRead = this.end ? Math.min(this.end - this.readOffset + 1, this.highWaterMark) : this.highWaterMark // 原生 read 方法读取文件数据 fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => { console.log(readBytes) if (readBytes) { // 更新偏移量 this.readOffset += readBytes // 触发 data 事件 this.emit('data', buf.slice(0, readBytes)) // 继续读取 this.read() } else { // 没有数据可读 // 先触发 end 再触发 close this.emit('end') if (this.autoClose) { this.close() } } }) } close() { fs.close(this.fd, () => { this.emit('close') }) } } const rs = new MyFileReadStream('test.txt', { highWaterMark: 3, end: 7 }) // rs.on('open', fd => { // console.log('open', fd) // }) // rs.on('error', err => { // console.log('error', err) // }) // 与原生 fs 一样,如果不绑定 data 事件就不会触发 end 和 close 事件 // rs.on('end', () => { // console.log('end') // }) // rs.on('close', () => { // console.log('close') // }) rs.on('data', chunk => { console.log(chunk) })可读流解决的问题
如果使用 fs 的文件读写 API 实现文件复制就会出现下面的嵌套:
fs.open('A.txt', (err, readFd) => { fs.open('B.txt', (err, writeFd) => { // 需要管理缓存、读取位置、长度等参数 fs.read(readFd, ..., chunk => { fs.write(writeFd, chunk, () => {}) // 循环读取 next() }) }) })
流 *** 作优雅的代替了这种嵌套。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)