Skip to content

Commit

Permalink
fixup: more simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 16, 2019
1 parent b291878 commit 62d6326
Showing 1 changed file with 12 additions and 35 deletions.
47 changes: 12 additions & 35 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,46 +234,23 @@ function done(stream, er, data) {
}

const from = require('internal/streams/from');
const createReadableStreamAsyncIterator = require('internal/streams/async_iterator');

Transform.by = function by(asyncGeneratorFn, opts) {
let resume = null;
function next() {
if (resume) {
const doResume = resume;
resume = null;
doResume();
let _resolve;
let _promise = new Promise((resolve) => _resolve = resolve);
return from(Duplex, asyncGeneratorFn(async function*() {
while (true) {
const { chunk, done, cb } = await _promise;
if (done) return cb();
yield chunk;
_promise = new Promise((resolve) => _resolve = resolve);
cb();
}
}
const input = new Readable({
objectMode: true,
autoDestroy: true,
read: next,
highWaterMark: 1, // TODO: Buffer here?
destroy (err, callback) {
next();
callback(err);
}
});

const iterator = createReadableStreamAsyncIterator(input);
return from(Duplex, asyncGeneratorFn(iterator), {
}()), {
objectMode: true,
autoDestroy: true,
...opts,
write(chunk, encoding, callback) {
if (!input.push(chunk)) {
resume = callback;
} else {
callback();
}
},
final(callback) {
input.push(null);
resume = callback;
},
destroy(err, callback) {
input.destroy(err, callback);
}
write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }),
final: (cb) => _resolve({ done: true, cb })
});
};

0 comments on commit 62d6326

Please sign in to comment.