目录
可写流Writable
_write方法
write方法
end方法
write方法执行流程
两个问题的思考
drain事件
数据消费者
前面介绍了stream模块的可读流stream.Readable,通过可读流的工作模式,我们可以知道可读流其实是数据生产者,而且必须要有消费者通过监听data事件或者监听readable事件调用read方法去消费数据,才能触发可读流进行数据生产。即消费拉动生产。
我们需要搞清楚上面程序中,谁是消费者。
监听data事件?data事件回调函数?console.log?
其实纯粹的消费者定义,应该是 单纯消费数据的人。消费者不应该 *** 心数据的生产工作,而是只关注数据的消费。所以上面console.log算是一个消费者。
那么监听data事件算是什么呢?
监听data事件能够取出可读流对象缓冲区中的数据给自身回调函数作为参数,如果没有消费者消费,则数据被丢弃。
监听data事件的行为像是一个配送员:
消费者网上下单到某商店购买商品,被一个配送员接单,然后配送员到达商店,催出商店给货,商店库存发现商品不够就回去工厂调货,调到货后,加入商店库存,或者直接交给配送员,配送员开开心心的送货,结果发现是个假单,没有消费者,而配送员无法消费商品,所以直接丢了。
console.log 才是真正的数据消费者,我们知道console.log可以将入参转为字符串后在控制台打印。而控制台打印算是标准输出流。所以其实真正消费者是process.stdout。而process.stdout就是一个可写流对象。那么可写流对象如何进行数据消费呢?我们又如何创建一个自定义的可写流对象呢?
可写流Writable在stream模块下,有一个Writable抽象类,它有一个抽象方法_write,我们一般用自定义类继承stream.Writable,并重写_write方法。
_write方法在可写流对象中,有一个缓冲区用于存储将被消费的数据,而_write的作用就是接收缓冲区中的数据并消费
write方法那么可写流对象的缓冲区的中数据来自于哪呢?Writable原型上有一个write方法,我们通过可写流对象调用write方法可以向缓冲区中写入数据。
write方法可以执行多次,即表示可以向可写流缓冲区中写入多次
write方法每次写入都一个返回值true或者false,
true表示写入本次数据后,缓冲区数据还没有达到highWaterMark水位线,不存在内存溢出风险,
false表示写入本次数据后,缓冲区数据量已经超过了highWaterMark水位线,继续写入可能会造成内存溢出风险了,但是并不会阻止write继续写入数据。
另外write方法的参数只能是字符串,Buffer对象,字节数组
end方法那么可写流对象如何知道不会再有数据写入了呢?即如何标志写入结束呢?
Writable原型上有一个end方法,当可写流对象调用end方法后,标志可写流对象停止写入数据,之后再调用write方法就会报错。
end方法参数和write方法一致。但是end方法返回值是当前可写流对象。
write方法执行流程Writable.prototype.write = function(chunk, encoding, cb) { return _write(this, chunk, encoding, cb) === true; };
function _write(stream, chunk, encoding, cb) { const state = stream._writableState; if (typeof encoding === 'function') { cb = encoding; encoding = state.defaultEncoding; } else { if (!encoding) encoding = state.defaultEncoding; else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding); if (typeof cb !== 'function') cb = nop; } if (chunk === null) { throw new ERR_STREAM_NULL_VALUES(); } else if (!state.objectMode) { if (typeof chunk === 'string') { if (state.decodeStrings !== false) { chunk = Buffer.from(chunk, encoding); encoding = 'buffer'; } } else if (chunk instanceof Buffer) { encoding = 'buffer'; } else if (Stream._isUint8Array(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); encoding = 'buffer'; } else { throw new ERR_INVALID_ARG_TYPE( 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } } let err; if (state.ending) { err = new ERR_STREAM_WRITE_AFTER_END(); } else if (state.destroyed) { err = new ERR_STREAM_DESTROYED('write'); } if (err) { process.nextTick(cb, err); errorOrDestroy(stream, err, true); return err; } state.pendingcb++; return writeOrBuffer(stream, state, chunk, encoding, cb); }
// If we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, callback) { const len = state.objectMode ? 1 : chunk.length; state.length += len; // stream._write resets state.length const ret = state.length < state.highWaterMark; // We must ensure that previous needDrain will not be reset to false. if (!ret) state.needDrain = true; if (state.writing || state.corked || state.errored || !state.constructed) { state.buffered.push({ chunk, encoding, callback }); if (state.allBuffers && encoding !== 'buffer') { state.allBuffers = false; } if (state.allNoop && callback !== nop) { state.allNoop = false; } } else { state.writelen = len; state.writecb = callback; state.writing = true; state.sync = true; stream._write(chunk, encoding, state.onwrite); state.sync = false; } // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. return ret && !state.errored && !state.destroyed; }
走读以上代码,我们可以分析出:write方法写入的数据{chunk, encoding, callback}可能会被缓存在state.buffered中,即可写流对象缓冲区中,也可能直接被传递给可写流对象重写的_write方法直接消费。即上面WriteOrBuffer方法的逻辑。
WriteOrBuffer方法中如果满足下面条件
if (state.writing || state.corked || state.errored || !state.constructed)
则,write方法写入的数据会被缓存在state.buffered中,否则,直接传给_write消费。
所以默认情况下,上面判断条件为false,即write写入的数据不会被缓存到state.buffered中,而是会被_write消费。
而在write方法执行过程中,只有state.writing存在被置为true的机会,就是_write方法执行时
但是当_write回调执行时,即state.onwrite执行时,state.writing会被修改为false。
那必然存在一种情况:如果_write的回调state.onwrite是异步执行的,则state.writing的true就不会在同步代码执行时被置为false。比如下面,我们将在重写的_write中,将callback(对应state.onwrite)加入异步任务队列中
则_write执行后,state.writing还是为true,所以下一次write可以进入缓存流程
而这也是保证多次write的话,write的内容按顺序消费的逻辑。后面的write的数据{chunk, encoding, callback}必须要等前面的write的数据被消费掉才能被消费。
但是state.buffered缓冲区是有大小限制的,同时为了防止内存溢出,state.buffered存在水位线highWaterMark用来实时比较,当state.buffered中的数据量state.length超过了highWaterMark,write方法就会返回false,来通知调用write的人。
此时,虽然可以继续用write向state.buffered中继续添加数据,但是很不保险,随时有内存溢出的风险。所以推荐的做法是,当write返回false时,暂停后续write,先将state.buffered中的数据消费完。
两个问题的思考此时有两个问题:
1、state.buffered缓冲区中的数据如何消费?
2、state.buffered缓冲区中数据消费完了,如何通知外界(生产者)进行后续write?
我们知道write流程中,只有_write可以消费数据,且第一次write必然走非缓存,即会触发_write,第二次write可能会走缓存。那么第二次write缓存的数据如何消费呢?
第一次write必走非缓存的_write,而此时传递给_write的回调并不是我们传入的callback,而是state.onwrite。下面是onwrite的代码。
function onwrite(stream, er) { const state = stream._writableState; const sync = state.sync; const cb = state.writecb; if (typeof cb !== 'function') { errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); return; } state.writing = false; state.writecb = null; state.length -= state.writelen; state.writelen = 0; if (er) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 er.stack; // eslint-disable-line no-unused-expressions if (!state.errored) { state.errored = er; } // In case of duplex streams we need to notify the readable side of the // error. if (stream._readableState && !stream._readableState.errored) { stream._readableState.errored = er; } if (sync) { process.nextTick(onwriteError, stream, state, er, cb); } else { onwriteError(stream, state, er, cb); } } else { if (state.buffered.length > state.bufferedIndex) { clearBuffer(stream, state); } if (sync) { // It is a common case that the callback passed to .write() is always // the same. In that case, we do not schedule a new nextTick(), but // rather just increase a counter, to improve performance and avoid // memory allocations. if (state.afterWriteTickInfo !== null && state.afterWriteTickInfo.cb === cb) { state.afterWriteTickInfo.count++; } else { state.afterWriteTickInfo = { count: 1, cb, stream, state }; process.nextTick(afterWriteTick, state.afterWriteTickInfo); } } else { afterWrite(stream, state, 1, cb); } } }
state.onwrite如果被传入_write后同步执行,则state.writing会同步变为false,则第二次write会继续走非缓存消费。
state.onwrite如果被传入_write后异步执行,则state.writing在同步执行过程中继续保持true,则第二次write会走缓存。
所以第二次write数据进state.buffered缓冲区的话,则说明state.onwrite是异步执行的。
当同步代码执行完后,state.onwrite会被从异步任务队列中被事件循环捞出来执行。
通过debug可以发现onwrite之后进入了clearBuffer方法,通过名字就可以看出clearBuffer适用于清空state.buffered的。而clearBuffer会调用doWrite,doWrite会调用_write进行数据消费。
所以state.buffered缓冲区中的数据虽然是同步加入的,但是都是异步被消费的。验证如下:
drain事件而当state.buffered中数据被清空完,即state.onwrite执行完前,都会执行afterWrite方法
function afterWrite(stream, state, count, cb) { const needDrain = !state.ending && !stream.destroyed && state.length === 0 && state.needDrain; if (needDrain) { state.needDrain = false; stream.emit('drain'); } while (count-- > 0) { state.pendingcb--; cb(); } if (state.destroyed) { errorBuffer(state); } finishMaybe(stream, state); }
在afterWrite方法会判断state.needDrain,若为true(且可写流非结束态)则触发drain事件。
需要注意的是,state.needDrain是在缓冲区已经清空后,作为是否触发drain事件的前提判断,并不是state.needDrain触发的缓冲区清空行为。
state.needDrain是当 缓冲区数据量超过水位线时被置为true的。
所以整体流程应该是:
第一次write写入的数据{chunk, encoding, callback},被同步消费,即同步调用_write,但是传入_write的参数是{chunk, encoding, state.onwrite},即回调参数已经被改为了state.onwrite,该回调方法如果在_write中被异步执行,则会导致第二次write写入的数据会被加入state.buffered,即可写流的缓冲区中,等待异步消费。
当同步代码执行完后,异步执行第一次_write的state.onwrite,该方法会触发清空state.buffered数据去给_write消费的 *** 作clearBuffer,而当state.onwrite执行完前,会调用afterWrite,判断是否需要触发drain事件,当state.needDrain为true时,且可写流非结束态时,就会触发drain事件。
而drain事件正是可写流通知外界可以继续write的方式
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)