From 837379cb4a035894850d028039642aa689035a53 Mon Sep 17 00:00:00 2001 From: Nick Clark Date: Mon, 17 Jun 2019 13:37:07 -0600 Subject: [PATCH 1/2] Pass errors from onClose to onError --- src/index.js | 15 +++++++++++---- src/test/index.test.js | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/src/index.js b/src/index.js index b44f75b..8584a1c 100644 --- a/src/index.js +++ b/src/index.js @@ -11,7 +11,7 @@ function callbackToAsyncIterator( listener: ((arg: CallbackInput) => any) => Promise, options?: { onError?: (err: Error) => void, - onClose?: (arg?: ?ReturnVal) => void, + onClose?: (arg?: ?ReturnVal) => Promise | void, buffering?: boolean, } = {} ) { @@ -54,7 +54,14 @@ function callbackToAsyncIterator( pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); pullQueue = []; pushQueue = []; - onClose && onClose(listenerReturnValue); + if (onClose) { + try { + const closeRet = onClose(listenerReturnValue); + if (closeRet) closeRet.catch(e => onError(e)); + } catch (e) { + onError(e); + } + } } } @@ -66,7 +73,7 @@ function callbackToAsyncIterator( emptyQueue(); return Promise.resolve({ value: undefined, done: true }); }, - throw(error) { + throw(error: Error) { emptyQueue(); onError(error); return Promise.reject(error); @@ -84,7 +91,7 @@ function callbackToAsyncIterator( return() { return Promise.reject(err); }, - throw(error) { + throw(error: Error) { return Promise.reject(error); }, [$$asyncIterator]() { diff --git a/src/test/index.test.js b/src/test/index.test.js index dc07222..a07cbd5 100644 --- a/src/test/index.test.js +++ b/src/test/index.test.js @@ -95,6 +95,38 @@ describe('options', () => { }); }); + it('should call onError with an error thrown by a non async onClose', async () => { + const error = new Error('Bla bla'); + const listener = (cb: () => void) => Promise.resolve(); + + expect.assertions(1); + const iter = asyncify(listener, { + onClose: () => { + throw error; + }, + onError: err => { + expect(err).toEqual(error); + }, + }); + await iter.return(); + }); + + it('should call onError with an error thrown by an async onClose', async () => { + const error = new Error('Bla bla'); + const listener = (cb: () => void) => Promise.resolve(); + + expect.assertions(1); + const iter = asyncify(listener, { + onClose: async () => { + throw error; + }, + onError: err => { + expect(err).toEqual(error); + }, + }); + await iter.return(); + }); + it('should call onClose with the return value from the listener', async () => { const returnValue = 'asdf'; const listener = (cb: () => void) => From 0d672b5b6ed3f1f3491cdcd981391a95d78f1d6c Mon Sep 17 00:00:00 2001 From: Nick Clark Date: Mon, 17 Jun 2019 13:50:25 -0600 Subject: [PATCH 2/2] Ensure onClose is only called once we receive a value from the listener --- src/index.js | 8 ++++++++ src/test/index.test.js | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/src/index.js b/src/index.js index 8584a1c..7e79ddf 100644 --- a/src/index.js +++ b/src/index.js @@ -21,10 +21,14 @@ function callbackToAsyncIterator( let pushQueue = []; let listening = true; let listenerReturnValue; + let listenerReturnedValue = false; + let closingWaitingOnListenerReturnValue = false; // Start listener listener(value => pushValue(value)) .then(a => { listenerReturnValue = a; + listenerReturnedValue = true; + if (closingWaitingOnListenerReturnValue) emptyQueue(); }) .catch(err => { onError(err); @@ -49,6 +53,10 @@ function callbackToAsyncIterator( } function emptyQueue() { + if (onClose && !listenerReturnedValue) { + closingWaitingOnListenerReturnValue = true; + return; + } if (listening) { listening = false; pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); diff --git a/src/test/index.test.js b/src/test/index.test.js index a07cbd5..fd19f6e 100644 --- a/src/test/index.test.js +++ b/src/test/index.test.js @@ -145,6 +145,24 @@ describe('options', () => { await iter.return(); }); + it('should call onClose with the return value from an listener only after the promise resolves', async () => { + const returnValue = 'asdf'; + const listener = (cb: () => void) => + new Promise(res => { + res(returnValue); + }); + + expect.hasAssertions(); + const iter = asyncify(listener, { + onClose: val => { + expect(val).toEqual(returnValue); + }, + }); + // Wait a tick so that the promise resolves with the return value + iter.return(); + await new Promise(res => setTimeout(res, 10)); + }); + describe('buffering', () => { it('should not buffer incoming values if disabled', async () => { const listener = (cb: (arg: number) => void) =>