Skip to content

Commit

Permalink
fixup: simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 16, 2019
1 parent 2fe96e4 commit b291878
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 121 deletions.
136 changes: 37 additions & 99 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const {
ERR_TRANSFORM_WITH_LENGTH_0
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
const Readable = require('_stream_readable');
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

Expand Down Expand Up @@ -232,110 +233,47 @@ function done(stream, er, data) {
return stream.push(null);
}

function SourceIterator(asyncGeneratorFn, opts) {
const source = this;
const result = asyncGeneratorFn(this);
if (typeof result[Symbol.asyncIterator] !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}
const iter = result[Symbol.asyncIterator]();
if (typeof iter.next !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}
const from = require('internal/streams/from');
const createReadableStreamAsyncIterator = require('internal/streams/async_iterator');

this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = new Transform({
Transform.by = function by(asyncGeneratorFn, opts) {
let resume = null;
function next() {
if (resume) {
const doResume = resume;
resume = null;
doResume();
}
}
const input = new Readable({
objectMode: true,
...opts,
transform(chunk, encoding, cb) {
source.encoding = encoding;
if (source[kSourceIteratorResolve] === null) {
source[kSourceIteratorChunk] = chunk;
source[kSourceIteratorPull] = cb;
return;
}
source[kSourceIteratorResolve]({ value: chunk, done: false });
source[kSourceIteratorResolve] = null;
cb(null);
autoDestroy: true,
read: next,
highWaterMark: 1, // TODO: Buffer here?
destroy (err, callback) {
next();
callback(err);
}
});
this.encoding = this[kSourceIteratorStream]._transformState.writeencoding;
this[kSourceIteratorGrabResolve] = (resolve) => {
this[kSourceIteratorResolve] = resolve;
};
const first = iter.next();
this[kSourceIteratorPump](iter, first);
}

SourceIterator.prototype[Symbol.asyncIterator] = function() {
return this;
};

ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype);

SourceIterator.prototype.next = function next() {
if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null)
return new Promise(this[kSourceIteratorGrabResolve]);

this[kSourceIteratorPull](null);
const result = Promise.resolve({
value: this[kSourceIteratorChunk],
done: false
});
this[kSourceIteratorChunk] = null;
this[kSourceIteratorPull] = null;
return result;
};

SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) {
const stream = this[kSourceIteratorStream];
try {
stream.removeListener('prefinish', prefinish);
stream.on('prefinish', () => {
if (this[kSourceIteratorResolve] !== null) {
this[kSourceIteratorResolve]({ value: undefined, done: true });
}
});
let next = await p;
while (true) {
const { done, value } = next;
if (done) {
if (value !== undefined) stream.push(value);

// In the event of an early return we explicitly
// discard any buffered state
if (stream._writableState.length > 0) {
const { length } = stream._writableState;
const { transforming } = stream._transformState;
stream._writableState.length = 0;
stream._transformState.transforming = false;
prefinish.call(stream);
stream._writableState.length = length;
stream._transformState.transforming = transforming;
} else {
prefinish.call(stream);
}
break;
const iterator = createReadableStreamAsyncIterator(input);
return from(Duplex, asyncGeneratorFn(iterator), {
objectMode: true,
autoDestroy: true,
...opts,
write(chunk, encoding, callback) {
if (!input.push(chunk)) {
resume = callback;
} else {
callback();
}
stream.push(value);
next = await iter.next();
},
final(callback) {
input.push(null);
resume = callback;
},
destroy(err, callback) {
input.destroy(err, callback);
}
} catch (err) {
process.nextTick(() => stream.destroy(err));
} finally {
this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = null;
}
};


Transform.by = function by(asyncGeneratorFn, opts) {
const source = new SourceIterator(asyncGeneratorFn, opts);
const stream = source[kSourceIteratorStream];

return stream;
});
};
43 changes: 21 additions & 22 deletions test/parallel/test-transform-by.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { Readable, Transform } = require('stream');
const { strictEqual } = require('assert');

async function transformBy() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -21,7 +21,7 @@ async function transformBy() {
}

async function transformByFuncReturnsObjectWithSymbolAsyncIterator() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
const mapper = (source) => ({
[Symbol.asyncIterator]() {
return {
Expand Down Expand Up @@ -55,8 +55,7 @@ transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() {
});

expectsError(() => Transform.by(mapper), {
message: 'asyncGeneratorFn must return an async iterable',
code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
Expand All @@ -69,8 +68,7 @@ async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() {
});

expectsError(() => Transform.by(mapper), {
message: 'asyncGeneratorFn must return an async iterable',
code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
Expand All @@ -91,14 +89,13 @@ async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() {
const mapper = () => ({});

expectsError(() => Transform.by(mapper), {
message: 'asyncGeneratorFn must return an async iterable',
code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}

async function transformByEncoding() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
strictEqual(source.encoding, 'ascii');
Expand All @@ -116,7 +113,7 @@ async function transformByEncoding() {
}

async function transformBySourceIteratorCompletes() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
const mustReach = mustCall();
async function * mapper(source) {
for await (const chunk of source) {
Expand All @@ -134,7 +131,7 @@ async function transformBySourceIteratorCompletes() {
}

async function transformByYieldPlusReturn() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -151,7 +148,7 @@ async function transformByYieldPlusReturn() {
}

async function transformByReturnEndsStream() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -170,7 +167,7 @@ async function transformByReturnEndsStream() {
}

async function transformByOnData() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -191,7 +188,7 @@ async function transformByOnData() {
}

async function transformByOnDataNonObject() {
const readable = Readable.from('test', { objectMode: false });
const readable = Readable.from('test'.split(''), { objectMode: false });
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
Expand All @@ -212,7 +209,7 @@ async function transformByOnDataNonObject() {
}

async function transformByOnErrorAndDestroyed() {
const stream = Readable.from('test').pipe(Transform.by(
const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
Expand All @@ -230,7 +227,7 @@ async function transformByOnErrorAndDestroyed() {
}

async function transformByErrorTryCatchAndDestroyed() {
const stream = Readable.from('test').pipe(Transform.by(
const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
Expand All @@ -250,7 +247,7 @@ async function transformByErrorTryCatchAndDestroyed() {
}

async function transformByOnErrorAndTryCatchAndDestroyed() {
const stream = Readable.from('test').pipe(Transform.by(
const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
Expand Down Expand Up @@ -286,17 +283,19 @@ async function transformByThrowPriorToForAwait() {
strictEqual(err.message, 'kaboom');
}));

read.pipe(stream);
read.pipe(stream).resume();
}

Promise.all([
transformBy(),
transformByFuncReturnsObjectWithSymbolAsyncIterator(),
transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(),
transformByObjReturnedWSymbolAsyncIteratorWithNoNext(),
transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(),
// NOTE: These should be handled by Readable.from.
// transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(),
// transformByObjReturnedWSymbolAsyncIteratorWithNoNext(),
// transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(),
transformByFuncReturnsObjectWithoutSymbolAsyncIterator(),
transformByEncoding(),
// NOTE: This doesn't make sense for iterable? Is it consistent with Readable.from?
// transformByEncoding(),
transformBySourceIteratorCompletes(),
transformByYieldPlusReturn(),
transformByReturnEndsStream(),
Expand Down

0 comments on commit b291878

Please sign in to comment.