Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix probe response sig with streaming #707

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tame-lamps-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"inngest": patch
---

Fix probe response sig with streaming
136 changes: 87 additions & 49 deletions packages/inngest/src/components/InngestCommHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,17 @@ export class InngestCommHandler<
* or not. Takes into account the user's preference and the platform's
* capabilities.
*/
private shouldStream(actions: HandlerResponseWithErrors): boolean {
private async shouldStream(
actions: HandlerResponseWithErrors
): Promise<boolean> {
const rawProbe = await actions.queryStringWithDefaults(
"testing for probe",
queryKeys.Probe
);
if (rawProbe !== undefined) {
return false;
}

// We must be able to stream responses to continue.
if (!actions.transformStreamingResponse) {
return false;
Expand Down Expand Up @@ -626,35 +636,55 @@ export class InngestCommHandler<
* This helps us provide high quality errors about what's going wrong for
* each access without having to wrap every access in a try/catch.
*/
const actions: HandlerResponseWithErrors = Object.entries(
rawActions
).reduce((acc, [key, value]) => {
if (typeof value !== "function") {
return acc;
}
const promisifiedActions: ActionHandlerResponseWithErrors =
Object.entries(rawActions).reduce((acc, [key, value]) => {
if (typeof value !== "function") {
return acc;
}

return {
...acc,
[key]: (reason: string, ...args: unknown[]) => {
const errMessage = [
`Failed calling \`${key}\` from serve handler`,
reason,
]
.filter(Boolean)
.join(" when ");

const fn = () =>
(value as (...args: unknown[]) => unknown)(...args);

return runAsPromise(fn)
.catch(rethrowError(errMessage))
.catch((err) => {
this.log("error", err);
throw err;
});
},
};
}, {} as HandlerResponseWithErrors);
return {
...acc,
[key]: (reason: string, ...args: unknown[]) => {
const errMessage = [
`Failed calling \`${key}\` from serve handler`,
reason,
]
.filter(Boolean)
.join(" when ");

const fn = () =>
(value as (...args: unknown[]) => unknown)(...args);

return runAsPromise(fn)
.catch(rethrowError(errMessage))
.catch((err) => {
this.log("error", err);
throw err;
});
},
};
}, {} as ActionHandlerResponseWithErrors);

/**
* Mapped promisified handlers from userland `serve()` function mixed in
* with some helpers.
*/
const actions: HandlerResponseWithErrors = {
...promisifiedActions,
queryStringWithDefaults: async (
reason: string,
key: string
): Promise<string | undefined> => {
const url = await actions.url(reason);

const ret =
(await actions.queryString?.(reason, key, url)) ||
url.searchParams.get(key) ||
undefined;

return ret;
},
};

const [env, expectedServerKind] = await Promise.all([
actions.env?.("starting to handle request"),
Expand Down Expand Up @@ -809,7 +839,7 @@ export class InngestCommHandler<
};
};

if (this.shouldStream(actions)) {
if (await this.shouldStream(actions)) {
const method = await actions.method("starting streaming response");

if (method === "POST") {
Expand Down Expand Up @@ -916,18 +946,6 @@ export class InngestCommHandler<
try {
const url = await actions.url("starting to handle request");

const getQuerystring = async (
reason: string,
key: string
): Promise<string | undefined> => {
const ret =
(await actions.queryString?.(reason, key, url)) ||
url.searchParams.get(key) ||
undefined;

return ret;
};

if (method === "POST") {
const validationResult = await signatureValidation;
if (!validationResult.success) {
Expand All @@ -941,7 +959,7 @@ export class InngestCommHandler<
};
}

const rawProbe = await getQuerystring(
const rawProbe = await actions.queryStringWithDefaults(
"testing for probe",
queryKeys.Probe
);
Expand Down Expand Up @@ -980,7 +998,7 @@ export class InngestCommHandler<
return probeActions[probe]();
}

const fnId = await getQuerystring(
const fnId = await actions.queryStringWithDefaults(
"processing run request",
queryKeys.FnId
);
Expand All @@ -990,8 +1008,10 @@ export class InngestCommHandler<
}

const stepId =
(await getQuerystring("processing run request", queryKeys.StepId)) ||
null;
(await actions.queryStringWithDefaults(
"processing run request",
queryKeys.StepId
)) || null;

const { version, result } = this.runStep({
functionId: fnId,
Expand Down Expand Up @@ -1148,7 +1168,7 @@ export class InngestCommHandler<
event_key_hash: this.hashedEventKey ?? null,
extra: {
...introspection.extra,
is_streaming: this.shouldStream(actions),
is_streaming: await this.shouldStream(actions),
},
framework: this.frameworkName,
sdk_language: "js",
Expand Down Expand Up @@ -1179,7 +1199,7 @@ export class InngestCommHandler<
}

if (method === "PUT") {
let deployId = await getQuerystring(
let deployId = await actions.queryStringWithDefaults(
"processing deployment request",
queryKeys.DeployId
);
Expand Down Expand Up @@ -1872,7 +1892,7 @@ export interface ActionResponse<
* This enables us to provide accurate errors for each access without having to
* wrap every access in a try/catch.
*/
export type HandlerResponseWithErrors = {
export type ActionHandlerResponseWithErrors = {
[K in keyof HandlerResponse]: NonNullable<HandlerResponse[K]> extends (
...args: infer Args
) => infer R
Expand All @@ -1881,3 +1901,21 @@ export type HandlerResponseWithErrors = {
: (errMessage: string, ...args: Args) => Promise<R>
: HandlerResponse[K];
};

/**
* A version of {@link ActionHandlerResponseWithErrors} that includes helper
* functions that provide sensible defaults on top of the direct access given
* from the bare response.
*/
export interface HandlerResponseWithErrors
extends ActionHandlerResponseWithErrors {
/**
* Fetch a query string value from the request. If no `querystring` action
* has been provided by the `serve()` handler, this will fall back to using
* the provided URL to fetch the query string instead.
*/
queryStringWithDefaults: (
reason: string,
key: string
) => Promise<string | undefined>;
}
Loading