From 6b6421b39da6250b77ea0254584a82e59183d239 Mon Sep 17 00:00:00 2001 From: Aaron Harper Date: Tue, 10 Sep 2024 11:55:30 -0400 Subject: [PATCH 1/3] Fix streaming breaks probes --- .../src/components/InngestCommHandler.ts | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/packages/inngest/src/components/InngestCommHandler.ts b/packages/inngest/src/components/InngestCommHandler.ts index f7427b06..4c2a8f53 100644 --- a/packages/inngest/src/components/InngestCommHandler.ts +++ b/packages/inngest/src/components/InngestCommHandler.ts @@ -551,7 +551,28 @@ export class InngestCommHandler< * or not. Takes into account the user's preference and the platform's * capabilities. */ - private shouldStream(actions: HandlerResponseWithErrors): boolean { + private async shouldStream( + actions: HandlerResponseWithErrors + ): Promise { + const getQuerystring = async ( + reason: string, + key: string + ): Promise => { + const url = await actions.url("starting to handle request"); + + const ret = + (await actions.queryString?.(reason, key, url)) || + url.searchParams.get(key) || + undefined; + + return ret; + }; + + const rawProbe = await getQuerystring("testing for probe", queryKeys.Probe); + if (rawProbe !== undefined) { + return false; + } + // We must be able to stream responses to continue. if (!actions.transformStreamingResponse) { return false; @@ -809,7 +830,7 @@ export class InngestCommHandler< }; }; - if (this.shouldStream(actions)) { + if (await this.shouldStream(actions)) { const method = await actions.method("starting streaming response"); if (method === "POST") { @@ -1148,7 +1169,7 @@ export class InngestCommHandler< event_key_hash: this.hashedEventKey ?? null, extra: { ...introspection.extra, - is_streaming: this.shouldStream(actions), + is_streaming: await this.shouldStream(actions), }, framework: this.frameworkName, sdk_language: "js", From 084be8513e8f9a4de3cfcc297a9350a1459053de Mon Sep 17 00:00:00 2001 From: Aaron Harper Date: Tue, 10 Sep 2024 11:56:15 -0400 Subject: [PATCH 2/3] Add changeset --- .changeset/tame-lamps-wonder.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/tame-lamps-wonder.md diff --git a/.changeset/tame-lamps-wonder.md b/.changeset/tame-lamps-wonder.md new file mode 100644 index 00000000..538882a5 --- /dev/null +++ b/.changeset/tame-lamps-wonder.md @@ -0,0 +1,5 @@ +--- +"inngest": patch +--- + +Fix probe response sig with streaming From 8d51836d758350ec2974c4453cbc2b43fc1df432 Mon Sep 17 00:00:00 2001 From: Jack Williams <1736957+jpwilliams@users.noreply.github.com> Date: Thu, 12 Sep 2024 12:52:04 +0000 Subject: [PATCH 3/3] Refactor `getQueryString()` to have the helper sit in a single location --- .../src/components/InngestCommHandler.ts | 139 ++++++++++-------- 1 file changed, 78 insertions(+), 61 deletions(-) diff --git a/packages/inngest/src/components/InngestCommHandler.ts b/packages/inngest/src/components/InngestCommHandler.ts index 4c2a8f53..205a6e3c 100644 --- a/packages/inngest/src/components/InngestCommHandler.ts +++ b/packages/inngest/src/components/InngestCommHandler.ts @@ -554,21 +554,10 @@ export class InngestCommHandler< private async shouldStream( actions: HandlerResponseWithErrors ): Promise { - const getQuerystring = async ( - reason: string, - key: string - ): Promise => { - const url = await actions.url("starting to handle request"); - - const ret = - (await actions.queryString?.(reason, key, url)) || - url.searchParams.get(key) || - undefined; - - return ret; - }; - - const rawProbe = await getQuerystring("testing for probe", queryKeys.Probe); + const rawProbe = await actions.queryStringWithDefaults( + "testing for probe", + queryKeys.Probe + ); if (rawProbe !== undefined) { return false; } @@ -647,35 +636,55 @@ export class InngestCommHandler< * This helps us provide high quality errors about what's going wrong for * each access without having to wrap every access in a try/catch. */ - const actions: HandlerResponseWithErrors = Object.entries( - rawActions - ).reduce((acc, [key, value]) => { - if (typeof value !== "function") { - return acc; - } + const promisifiedActions: ActionHandlerResponseWithErrors = + Object.entries(rawActions).reduce((acc, [key, value]) => { + if (typeof value !== "function") { + return acc; + } - return { - ...acc, - [key]: (reason: string, ...args: unknown[]) => { - const errMessage = [ - `Failed calling \`${key}\` from serve handler`, - reason, - ] - .filter(Boolean) - .join(" when "); - - const fn = () => - (value as (...args: unknown[]) => unknown)(...args); - - return runAsPromise(fn) - .catch(rethrowError(errMessage)) - .catch((err) => { - this.log("error", err); - throw err; - }); - }, - }; - }, {} as HandlerResponseWithErrors); + return { + ...acc, + [key]: (reason: string, ...args: unknown[]) => { + const errMessage = [ + `Failed calling \`${key}\` from serve handler`, + reason, + ] + .filter(Boolean) + .join(" when "); + + const fn = () => + (value as (...args: unknown[]) => unknown)(...args); + + return runAsPromise(fn) + .catch(rethrowError(errMessage)) + .catch((err) => { + this.log("error", err); + throw err; + }); + }, + }; + }, {} as ActionHandlerResponseWithErrors); + + /** + * Mapped promisified handlers from userland `serve()` function mixed in + * with some helpers. + */ + const actions: HandlerResponseWithErrors = { + ...promisifiedActions, + queryStringWithDefaults: async ( + reason: string, + key: string + ): Promise => { + const url = await actions.url(reason); + + const ret = + (await actions.queryString?.(reason, key, url)) || + url.searchParams.get(key) || + undefined; + + return ret; + }, + }; const [env, expectedServerKind] = await Promise.all([ actions.env?.("starting to handle request"), @@ -937,18 +946,6 @@ export class InngestCommHandler< try { const url = await actions.url("starting to handle request"); - const getQuerystring = async ( - reason: string, - key: string - ): Promise => { - const ret = - (await actions.queryString?.(reason, key, url)) || - url.searchParams.get(key) || - undefined; - - return ret; - }; - if (method === "POST") { const validationResult = await signatureValidation; if (!validationResult.success) { @@ -962,7 +959,7 @@ export class InngestCommHandler< }; } - const rawProbe = await getQuerystring( + const rawProbe = await actions.queryStringWithDefaults( "testing for probe", queryKeys.Probe ); @@ -1001,7 +998,7 @@ export class InngestCommHandler< return probeActions[probe](); } - const fnId = await getQuerystring( + const fnId = await actions.queryStringWithDefaults( "processing run request", queryKeys.FnId ); @@ -1011,8 +1008,10 @@ export class InngestCommHandler< } const stepId = - (await getQuerystring("processing run request", queryKeys.StepId)) || - null; + (await actions.queryStringWithDefaults( + "processing run request", + queryKeys.StepId + )) || null; const { version, result } = this.runStep({ functionId: fnId, @@ -1200,7 +1199,7 @@ export class InngestCommHandler< } if (method === "PUT") { - let deployId = await getQuerystring( + let deployId = await actions.queryStringWithDefaults( "processing deployment request", queryKeys.DeployId ); @@ -1893,7 +1892,7 @@ export interface ActionResponse< * This enables us to provide accurate errors for each access without having to * wrap every access in a try/catch. */ -export type HandlerResponseWithErrors = { +export type ActionHandlerResponseWithErrors = { [K in keyof HandlerResponse]: NonNullable extends ( ...args: infer Args ) => infer R @@ -1902,3 +1901,21 @@ export type HandlerResponseWithErrors = { : (errMessage: string, ...args: Args) => Promise : HandlerResponse[K]; }; + +/** + * A version of {@link ActionHandlerResponseWithErrors} that includes helper + * functions that provide sensible defaults on top of the direct access given + * from the bare response. + */ +export interface HandlerResponseWithErrors + extends ActionHandlerResponseWithErrors { + /** + * Fetch a query string value from the request. If no `querystring` action + * has been provided by the `serve()` handler, this will fall back to using + * the provided URL to fetch the query string instead. + */ + queryStringWithDefaults: ( + reason: string, + key: string + ) => Promise; +}