From 2bcc517e563a404a9545c6cfda1edb768d56d2d4 Mon Sep 17 00:00:00 2001 From: Lucas Marshall Date: Tue, 19 Dec 2023 15:45:23 -0800 Subject: [PATCH] fix: handle pagination errors correctly (#2069) --- packages/core/remotes/utils/paginator.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/core/remotes/utils/paginator.ts b/packages/core/remotes/utils/paginator.ts index c6c34736c9..fa60ab9e36 100644 --- a/packages/core/remotes/utils/paginator.ts +++ b/packages/core/remotes/utils/paginator.ts @@ -1,5 +1,6 @@ import type { Readable } from 'stream'; import { PassThrough } from 'stream'; +import { logger } from '../../lib/logger'; type PaginatorImpl = (cursor?: string) => Promise; @@ -62,7 +63,7 @@ export async function paginator( response = await pageFetcher(cursor); } catch (e: any) { if (e.problemType === 'SG_TERMINAL_TOO_MANY_REQUESTS_ERROR') { - passThrough.emit('error', e); + passThrough.destroy(e); return; } throw e; @@ -72,13 +73,17 @@ export async function paginator( cursor = getNextCursorFromPage(response); readable.pipe(passThrough, { end: index === lastIndex && !cursor }); - readable.on('error', (err) => passThrough.emit('error', err)); + readable.on('error', (err) => passThrough.destroy(err)); await new Promise((resolve) => readable.on('end', resolve)); } while (cursor); } })().catch((err) => { - passThrough.emit('error', err); + passThrough.destroy(err); + }); + + passThrough.on('error', (err) => { + logger.error({ err }, 'Error in paginator stream'); }); return passThrough;