Node.js stream模块(二)可写流

Node.js stream模块(二)可写流,第1张

Node.js stream模块(二)可写流

目录

数据消费

可写流Writable

_write方法

write方法

end方法 

write方法执行流程

两个问题的思考

drain事件


数据消费者

前面介绍了stream模块的可读流stream.Readable,通过可读流的工作模式,我们可以知道可读流其实是数据生产者,而且必须要有消费者通过监听data事件或者监听readable事件调用read方法去消费数据,才能触发可读流进行数据生产。即消费拉动生产。

我们需要搞清楚上面程序中,谁是消费者。

监听data事件?data事件回调函数?console.log?

其实纯粹的消费者定义,应该是 单纯消费数据的人。消费者不应该 *** 心数据的生产工作,而是只关注数据的消费。所以上面console.log算是一个消费者。

那么监听data事件算是什么呢?

监听data事件能够取出可读流对象缓冲区中的数据给自身回调函数作为参数,如果没有消费者消费,则数据被丢弃。

监听data事件的行为像是一个配送员:

消费者网上下单到某商店购买商品,被一个配送员接单,然后配送员到达商店,催出商店给货,商店库存发现商品不够就回去工厂调货,调到货后,加入商店库存,或者直接交给配送员,配送员开开心心的送货,结果发现是个假单,没有消费者,而配送员无法消费商品,所以直接丢了。

console.log 才是真正的数据消费者,我们知道console.log可以将入参转为字符串后在控制台打印。而控制台打印算是标准输出流。所以其实真正消费者是process.stdout。而process.stdout就是一个可写流对象。那么可写流对象如何进行数据消费呢?我们又如何创建一个自定义的可写流对象呢?

可写流Writable

在stream模块下,有一个Writable抽象类,它有一个抽象方法_write,我们一般用自定义类继承stream.Writable,并重写_write方法。

_write方法

在可写流对象中,有一个缓冲区用于存储将被消费的数据,而_write的作用就是接收缓冲区中的数据并消费

write方法

那么可写流对象的缓冲区的中数据来自于哪呢?Writable原型上有一个write方法,我们通过可写流对象调用write方法可以向缓冲区中写入数据。

 

 write方法可以执行多次,即表示可以向可写流缓冲区中写入多次

write方法每次写入都一个返回值true或者false,

true表示写入本次数据后,缓冲区数据还没有达到highWaterMark水位线,不存在内存溢出风险,

false表示写入本次数据后,缓冲区数据量已经超过了highWaterMark水位线,继续写入可能会造成内存溢出风险了,但是并不会阻止write继续写入数据。

另外write方法的参数只能是字符串,Buffer对象,字节数组

end方法 

那么可写流对象如何知道不会再有数据写入了呢?即如何标志写入结束呢?

Writable原型上有一个end方法,当可写流对象调用end方法后,标志可写流对象停止写入数据,之后再调用write方法就会报错。

end方法参数和write方法一致。但是end方法返回值是当前可写流对象。

write方法执行流程
Writable.prototype.write = function(chunk, encoding, cb) {
  return _write(this, chunk, encoding, cb) === true;
};
function _write(stream, chunk, encoding, cb) {
  const state = stream._writableState;

  if (typeof encoding === 'function') {
    cb = encoding;
    encoding = state.defaultEncoding;
  } else {
    if (!encoding)
      encoding = state.defaultEncoding;
    else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
      throw new ERR_UNKNOWN_ENCODING(encoding);
    if (typeof cb !== 'function')
      cb = nop;
  }

  if (chunk === null) {
    throw new ERR_STREAM_NULL_VALUES();
  } else if (!state.objectMode) {
    if (typeof chunk === 'string') {
      if (state.decodeStrings !== false) {
        chunk = Buffer.from(chunk, encoding);
        encoding = 'buffer';
      }
    } else if (chunk instanceof Buffer) {
      encoding = 'buffer';
    } else if (Stream._isUint8Array(chunk)) {
      chunk = Stream._uint8ArrayToBuffer(chunk);
      encoding = 'buffer';
    } else {
      throw new ERR_INVALID_ARG_TYPE(
        'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
    }
  }

  let err;
  if (state.ending) {
    err = new ERR_STREAM_WRITE_AFTER_END();
  } else if (state.destroyed) {
    err = new ERR_STREAM_DESTROYED('write');
  }

  if (err) {
    process.nextTick(cb, err);
    errorOrDestroy(stream, err, true);
    return err;
  }
  state.pendingcb++;
  return writeOrBuffer(stream, state, chunk, encoding, cb);
}
// If we're already writing something, then just put this
// in the queue, and wait our turn.  Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, callback) {
  const len = state.objectMode ? 1 : chunk.length;

  state.length += len;

  // stream._write resets state.length
  const ret = state.length < state.highWaterMark;
  // We must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked || state.errored || !state.constructed) {
    state.buffered.push({ chunk, encoding, callback });
    if (state.allBuffers && encoding !== 'buffer') {
      state.allBuffers = false;
    }
    if (state.allNoop && callback !== nop) {
      state.allNoop = false;
    }
  } else {
    state.writelen = len;
    state.writecb = callback;
    state.writing = true;
    state.sync = true;
    stream._write(chunk, encoding, state.onwrite);
    state.sync = false;
  }

  // Return false if errored or destroyed in order to break
  // any synchronous while(stream.write(data)) loops.
  return ret && !state.errored && !state.destroyed;
}

走读以上代码,我们可以分析出:write方法写入的数据{chunk, encoding, callback}可能会被缓存在state.buffered中,即可写流对象缓冲区中,也可能直接被传递给可写流对象重写的_write方法直接消费。即上面WriteOrBuffer方法的逻辑。

WriteOrBuffer方法中如果满足下面条件

if (state.writing || state.corked || state.errored || !state.constructed)

则,write方法写入的数据会被缓存在state.buffered中,否则,直接传给_write消费。

可读流对象_writableState下的属性初值(可读流对象刚创建时)writingfalsecorked0errorednullconstructedtrue

所以默认情况下,上面判断条件为false,即write写入的数据不会被缓存到state.buffered中,而是会被_write消费。

而在write方法执行过程中,只有state.writing存在被置为true的机会,就是_write方法执行时

但是当_write回调执行时,即state.onwrite执行时,state.writing会被修改为false。

 那必然存在一种情况:如果_write的回调state.onwrite是异步执行的,则state.writing的true就不会在同步代码执行时被置为false。比如下面,我们将在重写的_write中,将callback(对应state.onwrite)加入异步任务队列中

则_write执行后,state.writing还是为true,所以下一次write可以进入缓存流程 

而这也是保证多次write的话,write的内容按顺序消费的逻辑。后面的write的数据{chunk, encoding, callback}必须要等前面的write的数据被消费掉才能被消费。

但是state.buffered缓冲区是有大小限制的,同时为了防止内存溢出,state.buffered存在水位线highWaterMark用来实时比较,当state.buffered中的数据量state.length超过了highWaterMark,write方法就会返回false,来通知调用write的人。

此时,虽然可以继续用write向state.buffered中继续添加数据,但是很不保险,随时有内存溢出的风险。所以推荐的做法是,当write返回false时,暂停后续write,先将state.buffered中的数据消费完。

两个问题的思考

此时有两个问题:

1、state.buffered缓冲区中的数据如何消费?

2、state.buffered缓冲区中数据消费完了,如何通知外界(生产者)进行后续write?

我们知道write流程中,只有_write可以消费数据,且第一次write必然走非缓存,即会触发_write,第二次write可能会走缓存。那么第二次write缓存的数据如何消费呢?

第一次write必走非缓存的_write,而此时传递给_write的回调并不是我们传入的callback,而是state.onwrite。下面是onwrite的代码。

function onwrite(stream, er) {
  const state = stream._writableState;
  const sync = state.sync;
  const cb = state.writecb;

  if (typeof cb !== 'function') {
    errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
    return;
  }

  state.writing = false;
  state.writecb = null;
  state.length -= state.writelen;
  state.writelen = 0;

  if (er) {
    // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
    er.stack; // eslint-disable-line no-unused-expressions

    if (!state.errored) {
      state.errored = er;
    }

    // In case of duplex streams we need to notify the readable side of the
    // error.
    if (stream._readableState && !stream._readableState.errored) {
      stream._readableState.errored = er;
    }

    if (sync) {
      process.nextTick(onwriteError, stream, state, er, cb);
    } else {
      onwriteError(stream, state, er, cb);
    }
  } else {
    if (state.buffered.length > state.bufferedIndex) {
      clearBuffer(stream, state);
    }

    if (sync) {
      // It is a common case that the callback passed to .write() is always
      // the same. In that case, we do not schedule a new nextTick(), but
      // rather just increase a counter, to improve performance and avoid
      // memory allocations.
      if (state.afterWriteTickInfo !== null &&
          state.afterWriteTickInfo.cb === cb) {
        state.afterWriteTickInfo.count++;
      } else {
        state.afterWriteTickInfo = { count: 1, cb, stream, state };
        process.nextTick(afterWriteTick, state.afterWriteTickInfo);
      }
    } else {
      afterWrite(stream, state, 1, cb);
    }
  }
}

state.onwrite如果被传入_write后同步执行,则state.writing会同步变为false,则第二次write会继续走非缓存消费。

state.onwrite如果被传入_write后异步执行,则state.writing在同步执行过程中继续保持true,则第二次write会走缓存。

所以第二次write数据进state.buffered缓冲区的话,则说明state.onwrite是异步执行的。

当同步代码执行完后,state.onwrite会被从异步任务队列中被事件循环捞出来执行。

通过debug可以发现onwrite之后进入了clearBuffer方法,通过名字就可以看出clearBuffer适用于清空state.buffered的。而clearBuffer会调用doWrite,doWrite会调用_write进行数据消费。

所以state.buffered缓冲区中的数据虽然是同步加入的,但是都是异步被消费的。验证如下:

drain事件

 而当state.buffered中数据被清空完,即state.onwrite执行完前,都会执行afterWrite方法

function afterWrite(stream, state, count, cb) {
  const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
    state.needDrain;
  if (needDrain) {
    state.needDrain = false;
    stream.emit('drain');
  }

  while (count-- > 0) {
    state.pendingcb--;
    cb();
  }

  if (state.destroyed) {
    errorBuffer(state);
  }

  finishMaybe(stream, state);
}

在afterWrite方法会判断state.needDrain,若为true(且可写流非结束态)则触发drain事件。

需要注意的是,state.needDrain是在缓冲区已经清空后,作为是否触发drain事件的前提判断,并不是state.needDrain触发的缓冲区清空行为。

state.needDrain是当 缓冲区数据量超过水位线时被置为true的。

所以整体流程应该是:

第一次write写入的数据{chunk, encoding, callback},被同步消费,即同步调用_write,但是传入_write的参数是{chunk, encoding, state.onwrite},即回调参数已经被改为了state.onwrite,该回调方法如果在_write中被异步执行,则会导致第二次write写入的数据会被加入state.buffered,即可写流的缓冲区中,等待异步消费。

当同步代码执行完后,异步执行第一次_write的state.onwrite,该方法会触发清空state.buffered数据去给_write消费的 *** 作clearBuffer,而当state.onwrite执行完前,会调用afterWrite,判断是否需要触发drain事件,当state.needDrain为true时,且可写流非结束态时,就会触发drain事件。

而drain事件正是可写流通知外界可以继续write的方式

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5712357.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存