Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose EventSchemas in Inngest instances #657

Merged
merged 10 commits into from
Oct 21, 2024
Merged

Conversation

jpwilliams
Copy link
Member

@jpwilliams jpwilliams commented Jul 19, 2024

Summary

Exposes "runtime schemas" on an Inngest client, allowing middleware to use it to add custom validation using the schemas passed to EventSchemas.

We'll add this as a top-level option later, but this allows us to explore the functionality while we settle on APIs, default functionality, and supporting many validation libraries.

As an example, here is a custom middleware that makes use of this to add runtime validation to any Zod schemas that exist within your client's EventSchemas. It assumes you have zod installed.

# Use this PR to test
npm install inngest@pr-657
export const inngest = new Inngest({
  id: "my-app",
  schemas,
  middleware: [experimentalValidationMiddleware()],
});
import {
  InngestMiddleware,
  internalEvents,
  type EventPayload,
  type InngestFunction,
} from "inngest";
import { z, ZodType } from "zod";

/**
 * Experimental middleware that validates events using Zod schemas passed using
 * `EventSchemas.fromZod()`.
 */
export const experimentalValidationMiddleware = (opts?: {
  /**
   * Disallow events that don't have a schema defined.
   */
  disallowSchemalessEvents?: boolean;

  /**
   * Disallow events that have a schema defined, but the schema is unknown and
   * not handled in this code.
   */
  disallowUnknownSchemas?: boolean;

  /**
   * Disable validation of incoming events.
   */
  disableIncomingValidation?: boolean;

  /**
   * Disable validation of outgoing events using `inngest.send()` or
   * `step.sendEvent()`.
   */
  disableOutgoingValidation?: boolean;
}) => {
  const mw = new InngestMiddleware({
    name: "Inngest Experimental: Runtime schema validation",
    init({ client }) {
      /**
       * Given an `event`, validate it against its schema.
       */
      const validateEvent = async (
        event: EventPayload,
        potentialInvokeEvents: string[] = []
      ): Promise<EventPayload> => {
        let schemasToAttempt = new Set<string>([event.name]);
        let hasSchema = false;

        /**
         * Trust internal events; don't allow overwriting their typing.
         */
        if (event.name.startsWith("inngest/")) {
          if (event.name !== internalEvents.FunctionInvoked) {
            return event;
          }

          /**
           * If this is an `inngest/function.invoked` event, try validating the
           * payload against one of the function's schemas.
           */
          schemasToAttempt = new Set<string>(potentialInvokeEvents);

          hasSchema = Boolean(
            schemasToAttempt.intersection(
              new Set<string>(
                Object.keys(client["schemas"]?.["runtimeSchemas"] || {})
              )
            ).size
          );
        } else {
          hasSchema = Boolean(
            client["schemas"]?.["runtimeSchemas"][event.name]
          );
        }

        if (!hasSchema) {
          if (opts?.disallowSchemalessEvents) {
            throw new Error(
              `Event "${event.name}" has no schema defined; disallowing`
            );
          }

          return event;
        }

        const errors: Record<string, Error> = {};

        for (const schemaName of schemasToAttempt) {
          try {
            const schema = client["schemas"]?.["runtimeSchemas"][schemaName];

            /**
             * The schema could be a full Zod object.
             */
            if (helpers.isZodObject(schema)) {
              const { success, data, error } = await schema
                .passthrough()
                .safeParseAsync(event);

              if (success) {
                return data as unknown as EventPayload;
              }

              throw new Error(`${error.name}: ${error.message}`);
            }

            /**
             * The schema could also be a regular object with Zod objects inside.
             */
            if (helpers.isObject(schema)) {
              // It could be a partial schema; validate each field
              return await Object.keys(schema).reduce<Promise<EventPayload>>(
                async (acc, key) => {
                  const fieldSchema = schema[key];
                  const eventField = event[key as keyof EventPayload];

                  if (!helpers.isZodObject(fieldSchema) || !eventField) {
                    return acc;
                  }

                  const { success, data, error } = await fieldSchema
                    .passthrough()
                    .safeParseAsync(eventField);

                  if (success) {
                    return { ...(await acc), [key]: data };
                  }

                  throw new Error(`${error.name}: ${error.message}`);
                },
                Promise.resolve<EventPayload>({ ...event })
              );
            }

            /**
             * Didn't find anything? Throw or warn.
             *
             * We only allow this for assessing single schemas, as otherwise we're
             * assessing an invocation would could be multiple.
             */
            if (opts?.disallowUnknownSchemas && schemasToAttempt.size === 1) {
              throw new Error(
                `Event "${event.name}" has an unknown schema; disallowing`
              );
            } else {
              console.warn(
                "Unknown schema found; cannot validate, but allowing"
              );
            }
          } catch (err) {
            errors[schemaName] = err as Error;
          }
        }

        if (Object.keys(errors).length) {
          throw new Error(
            `Event "${event.name}" failed validation:\n\n${Object.keys(errors)
              .map((key) => `Using ${key}: ${errors[key].message}`)
              .join("\n\n")}`
          );
        }

        return event;
      };

      return {
        ...(opts?.disableIncomingValidation
          ? {}
          : {
              async onFunctionRun({ fn }) {
                const backupEvents = (
                  (fn.opts as InngestFunction.Options).triggers || []
                ).reduce<string[]>((acc, trigger) => {
                  if (trigger.event) {
                    return [...acc, trigger.event];
                  }

                  return acc;
                }, []);

                return {
                  async transformInput({ ctx: { events } }) {
                    const validatedEvents = await Promise.all(
                      events.map((event) => {
                        return validateEvent(event, backupEvents);
                      })
                    );

                    return {
                      ctx: {
                        event: validatedEvents[0],
                        events: validatedEvents,
                      } as {},
                    };
                  },
                };
              },
            }),

        ...(opts?.disableOutgoingValidation
          ? {}
          : {
              async onSendEvent() {
                return {
                  async transformInput({ payloads }) {
                    return {
                      payloads: await Promise.all(
                        payloads.map((payload) => {
                          return validateEvent(payload);
                        })
                      ),
                    };
                  },
                };
              },
            }),
      };
    },
  });

  return mw;
};

const helpers = {
  isZodObject: (value: unknown): value is z.ZodObject<any> => {
    return value instanceof ZodType && value._def.typeName === "ZodObject";
  },

  isObject: (value: unknown): value is Record<string, any> => {
    return typeof value === "object" && value !== null && !Array.isArray(value);
  },
};

Checklist

  • Added a docs PR that references this PR N/A
  • Added unit/integration tests N/A
  • Added changesets if applicable

Related

We'll do this internally later, but storing them means this can also be handled by middleware.
@jpwilliams jpwilliams added prerelease/inngest Create snapshot releases for a PR for the "inngest" package. ⬆️ improvement Performance, reliability, or usability improvements labels Jul 19, 2024
@jpwilliams jpwilliams self-assigned this Jul 19, 2024
Copy link

changeset-bot bot commented Jul 19, 2024

🦋 Changeset detected

Latest commit: 942e3e1

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
inngest Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@inngest-release-bot inngest-release-bot added the 📦 inngest Affects the `inngest` package label Jul 19, 2024
@inngest-release-bot
Copy link
Collaborator

inngest-release-bot commented Jul 19, 2024

A user has added the prerelease/inngest label, so this PR will be published to npm with the tag pr-657. It will be updated with the latest changes as you push commits to this PR.

You can install this prerelease version with:

npm install inngest@pr-657

The last release was built and published from 942e3e1.

@jpwilliams jpwilliams merged commit 7ca9537 into main Oct 21, 2024
48 checks passed
@jpwilliams jpwilliams deleted the feat/expose-eventschemas branch October 21, 2024 16:16
jpwilliams pushed a commit that referenced this pull request Oct 21, 2024
This PR was opened by the [Changesets
release](https://github.com/changesets/action) GitHub action. When
you're ready to do a release, you can merge this and the packages will
be published to npm automatically. If you're not ready to do a release
yet, that's fine, whenever you add more changesets to main, this PR will
be updated.


# Releases
## @inngest/[email protected]

### Minor Changes

- [#704](#704)
[`9438960`](9438960)
Thanks [@jpwilliams](https://github.com/jpwilliams)! - Refactor
`@inngest/test` to have a much simpler public API

## [email protected]

### Patch Changes

- [#657](#657)
[`7ca9537`](7ca9537)
Thanks [@jpwilliams](https://github.com/jpwilliams)! - Expose
`EventSchemas` in `Inngest` instances

- [#311](#311)
[`a53356a`](a53356a)
Thanks [@jpwilliams](https://github.com/jpwilliams)! - Add streaming
capabilities to `"inngest/cloudflare"` handler

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⬆️ improvement Performance, reliability, or usability improvements 📦 inngest Affects the `inngest` package prerelease/inngest Create snapshot releases for a PR for the "inngest" package.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants