diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 45bd819bfdf74f..959fe1e0c90e4b 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -234,46 +234,23 @@ function done(stream, er, data) { } const from = require('internal/streams/from'); -const createReadableStreamAsyncIterator = require('internal/streams/async_iterator'); Transform.by = function by(asyncGeneratorFn, opts) { - let resume = null; - function next() { - if (resume) { - const doResume = resume; - resume = null; - doResume(); + let _resolve; + let _promise = new Promise((resolve) => _resolve = resolve); + return from(Duplex, asyncGeneratorFn(async function*() { + while (true) { + const { chunk, done, cb } = await _promise; + if (done) return cb(); + yield chunk; + _promise = new Promise((resolve) => _resolve = resolve); + cb(); } - } - const input = new Readable({ - objectMode: true, - autoDestroy: true, - read: next, - highWaterMark: 1, // TODO: Buffer here? - destroy (err, callback) { - next(); - callback(err); - } - }); - - const iterator = createReadableStreamAsyncIterator(input); - return from(Duplex, asyncGeneratorFn(iterator), { + }()), { objectMode: true, autoDestroy: true, ...opts, - write(chunk, encoding, callback) { - if (!input.push(chunk)) { - resume = callback; - } else { - callback(); - } - }, - final(callback) { - input.push(null); - resume = callback; - }, - destroy(err, callback) { - input.destroy(err, callback); - } + write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }), + final: (cb) => _resolve({ done: true, cb }) }); };