Skip to content

Commit

Permalink
feat: store event payload, inputs and outputs of plugins in chain state
Browse files Browse the repository at this point in the history
  • Loading branch information
whilefoo committed Feb 29, 2024
1 parent b5fd06d commit d769841
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/github/github-event-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { customOctokit } from "./github-client";
import { GitHubContext, SimplifiedContext } from "./github-context";
import { createAppAuth } from "@octokit/auth-app";
import { CloudflareKV } from "./utils/cloudflare-kv";
import { PluginChainState } from "./types/plugin-state-chain";
import { PluginChainState } from "./types/plugin";

export type Options = {
webhookSecret: string;
Expand Down
51 changes: 16 additions & 35 deletions src/github/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { EmitterWebhookEvent, EmitterWebhookEventName } from "@octokit/webhooks";
import { EmitterWebhookEvent } from "@octokit/webhooks";
import { GitHubEventHandler } from "../github-event-handler";
import { getConfig } from "../utils/config";
import { issueCommentCreated } from "./issue-comment/created";
import { repositoryDispatch } from "./repository-dispatch";
import { dispatchWorkflow, getDefaultBranch } from "../utils/workflow-dispatch";
import { DelegatedComputeInputs } from "../types/plugin";

function tryCatchWrapper(fn: (event: EmitterWebhookEvent) => unknown) {
return async (event: EmitterWebhookEvent) => {
Expand Down Expand Up @@ -36,7 +37,7 @@ async function handleEvent(event: EmitterWebhookEvent, eventHandler: InstanceTyp
return;
}

const pluginChains = config.plugins[context.key];
const pluginChains = config.plugins[context.key].concat(config.plugins["*"]);

if (pluginChains.length === 0) {
console.log(`No handler found for event ${event.name}`);
Expand All @@ -48,15 +49,24 @@ async function handleEvent(event: EmitterWebhookEvent, eventHandler: InstanceTyp
const { plugin, with: settings } = pluginChain.uses[0];
console.log(`Calling handler for event ${event.name}`);

const id = crypto.randomUUID();
await eventHandler.pluginChainState.put(id, {
const stateId = crypto.randomUUID();

const state = {
eventId: context.id,
eventName: context.key,
event: event,
currentPlugin: 0,
pluginChain: pluginChain.uses,
});
outputs: new Array(pluginChain.uses.length),
inputs: new Array(pluginChain.uses.length),
};

const ref = plugin.ref ?? (await getDefaultBranch(context, plugin.owner, plugin.repo));
const token = await eventHandler.getToken(event.payload.installation.id);
const inputs = new DelegatedComputeInputs(id, context.key, event, settings, token, ref);
const inputs = new DelegatedComputeInputs(stateId, context.key, event, settings, token, ref);

state.inputs[0] = inputs;
await eventHandler.pluginChainState.put(stateId, state);

await dispatchWorkflow(context, {
owner: plugin.owner,
Expand All @@ -67,32 +77,3 @@ async function handleEvent(event: EmitterWebhookEvent, eventHandler: InstanceTyp
});
}
}

class DelegatedComputeInputs<T extends EmitterWebhookEventName = EmitterWebhookEventName> {
public id: string;
public eventName: T;
public event: EmitterWebhookEvent<T>;
public settings: unknown;
public authToken: string;
public ref: string;

constructor(id: string, eventName: T, event: EmitterWebhookEvent<T>, settings: unknown, authToken: string, ref: string) {
this.id = id;
this.eventName = eventName;
this.event = event;
this.settings = settings;
this.authToken = authToken;
this.ref = ref;
}

public getInputs() {
return {
id: this.id,
eventName: this.eventName,
event: JSON.stringify(this.event),
settings: JSON.stringify(this.settings),
authToken: this.authToken,
ref: this.ref,
};
}
}
39 changes: 17 additions & 22 deletions src/github/handlers/repository-dispatch.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
import { StaticDecode, Type } from "@sinclair/typebox";
import { GitHubContext } from "../github-context";
import { dispatchWorkflow, getDefaultBranch } from "../utils/workflow-dispatch";
import { Value } from "@sinclair/typebox/value";
import { DelegatedComputeInputs, PluginOutput, pluginOutputSchema } from "../types/plugin";

export async function repositoryDispatch(context: GitHubContext<"repository_dispatch">) {
console.log("Repository dispatch event received", context.payload.client_payload);

const pluginOutput = context.payload.client_payload as PluginOutput;

if (!Value.Check(pluginOutputSchema, pluginOutput)) {
if (!Value.Decode(pluginOutputSchema, pluginOutput)) {
const errors = [...Value.Errors(pluginOutputSchema, pluginOutput)];
console.error("Invalid environment variables", errors);
throw new Error("Invalid environment variables");
}

const state = await context.eventHandler.pluginChainState.get(pluginOutput.id);
const state = await context.eventHandler.pluginChainState.get(pluginOutput.stateId);
if (!state) {
console.error("No state found for plugin chain");
return;
}

if (!("installation" in state.event.payload) || state.event.payload.installation?.id === undefined) {
console.error("No installation found");
return;
}

const currentPlugin = state.pluginChain[state.currentPlugin];
if (currentPlugin.plugin.owner !== context.payload.repository.owner.login || currentPlugin.plugin.repo !== context.payload.repository.name) {
console.error("Plugin chain state does not match payload");
return;
}

state.currentPlugin++;
await context.eventHandler.pluginChainState.put(pluginOutput.id, state);

const nextPlugin = state.pluginChain[state.currentPlugin];
if (!nextPlugin) {
console.log("No more plugins to call");
Expand All @@ -37,27 +39,20 @@ export async function repositoryDispatch(context: GitHubContext<"repository_disp

console.log("Dispatching next plugin", nextPlugin);

const inputs = {
...pluginOutput.output,
id: pluginOutput.id,
settings: JSON.stringify(nextPlugin.with),
ref: nextPlugin.plugin.ref ?? (await getDefaultBranch(context, nextPlugin.plugin.owner, nextPlugin.plugin.repo)),
};
const token = await context.eventHandler.getToken(state.event.payload.installation.id);
const ref = nextPlugin.plugin.ref ?? (await getDefaultBranch(context, nextPlugin.plugin.owner, nextPlugin.plugin.repo));
const inputs = new DelegatedComputeInputs(pluginOutput.stateId, state.eventName, state.event, nextPlugin.with, token, ref);

state.outputs[state.currentPlugin] = pluginOutput;
state.currentPlugin++;
state.inputs[state.currentPlugin] = inputs;
await context.eventHandler.pluginChainState.put(pluginOutput.stateId, state);

await dispatchWorkflow(context, {
owner: nextPlugin.plugin.owner,
repository: nextPlugin.plugin.repo,
ref: nextPlugin.plugin.ref,
workflowId: nextPlugin.plugin.workflowId,
inputs: inputs,
inputs: inputs.getInputs(),
});
}

const pluginOutputSchema = Type.Object({
id: Type.String(),
owner: Type.String(),
repo: Type.String(),
output: Type.Any(),
});

type PluginOutput = StaticDecode<typeof pluginOutputSchema>;
6 changes: 0 additions & 6 deletions src/github/types/plugin-state-chain.ts

This file was deleted.

70 changes: 70 additions & 0 deletions src/github/types/plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { EmitterWebhookEvent, EmitterWebhookEventName } from "@octokit/webhooks";
import { PluginChain } from "./config";
import { StaticDecode, Type } from "@sinclair/typebox";

function jsonString() {
return Type.Transform(Type.String())
.Decode((value) => JSON.parse(value) as unknown)
.Encode((value) => JSON.stringify(value));
}

export const pluginOutputSchema = Type.Object({
stateId: Type.String(),
output: jsonString(),
});

export type PluginOutput = StaticDecode<typeof pluginOutputSchema>;

export class DelegatedComputeInputs<T extends EmitterWebhookEventName = EmitterWebhookEventName> {
public stateId: string;
public eventName: T;
public event: EmitterWebhookEvent<T>;
public settings: unknown;
public authToken: string;
public ref: string;

constructor(stateId: string, eventName: T, event: EmitterWebhookEvent<T>, settings: unknown, authToken: string, ref: string) {
this.stateId = stateId;
this.eventName = eventName;
this.event = event;
this.settings = settings;
this.authToken = authToken;
this.ref = ref;
}

public getInputs() {
return {
stateId: this.stateId,
eventName: this.eventName,
event: JSON.stringify(this.event),
settings: JSON.stringify(this.settings),
authToken: this.authToken,
ref: this.ref,
};
}
}

export type PluginChainState<T extends EmitterWebhookEventName = EmitterWebhookEventName> = {
eventId: string;
eventName: T;
event: EmitterWebhookEvent<T>;
currentPlugin: number;
pluginChain: PluginChain;
inputs: DelegatedComputeInputs[];
outputs: PluginOutput[];
};

// convert top level properties to string
export function convertToString(obj: Record<string, unknown>): Record<string, string> {
const newObj: Record<string, string> = {};
for (let i = 0; i < Object.keys(obj).length; i++) {
const key = Object.keys(obj)[i];
const val = obj[key];
if (typeof val === "string") {
newObj[key] = val;
} else {
newObj[key] = JSON.stringify(val);
}
}
return newObj;
}

0 comments on commit d769841

Please sign in to comment.