Skip to content

feat: allow actions to run after other action #1027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
eec0167
feat: add action success events to db
tefkah Mar 4, 2025
4ce4eb8
feat: setup basic chain actions
tefkah Mar 5, 2025
67a8e29
chore: merge
tefkah Mar 6, 2025
d455437
fix: remove unique constraint
tefkah Mar 6, 2025
ec99078
fix: fix types for actioninstancescontext
tefkah Mar 6, 2025
3ce58ce
fix: ignore error
tefkah Mar 6, 2025
e2ba8e0
fix: fix type errors
tefkah Mar 6, 2025
d4bff98
feat: progress
tefkah Mar 10, 2025
0cd8be8
chore: remove logs
tefkah Mar 10, 2025
cb16088
chore: fix type errors
tefkah Mar 10, 2025
a5b4552
fix: make work
tefkah Mar 10, 2025
29b5048
dev: allow rules to be seeded
tefkah Mar 10, 2025
4dadc0d
dev: add tests
tefkah Mar 10, 2025
74e95c7
Merge branch 'main' into tfk/rule-after-action
tefkah Mar 10, 2025
91774c5
fix: update seed scripts
tefkah Mar 10, 2025
076387e
Merge branch 'tfk/rule-after-action' of https://github.com/pubpub/pla…
tefkah Mar 10, 2025
af205db
fix: correctly fail actions if they return clientException, and check…
tefkah Mar 10, 2025
eadb6c2
feat: add max stack depth protection at rule run- and creation time
tefkah Mar 10, 2025
a34cef8
fix: fix type errors
tefkah Mar 10, 2025
a3516d6
fix: fix test
tefkah Mar 10, 2025
e173c15
dev: add chained rules to croccroc seed
tefkah Mar 10, 2025
ca92e0f
dev: add sequential rule test
tefkah Mar 10, 2025
2d83030
dev: add better logs for failing action trigger in flock
tefkah Mar 10, 2025
f4da9e0
chore: make minio startup faster
tefkah Mar 10, 2025
6d9465a
fix: make jobs work when self hosting
tefkah Mar 10, 2025
58555c3
fix: set pubpub url correctly for jobs during tests
tefkah Mar 10, 2025
8f6f390
fix: remove some logs and comments
tefkah Mar 10, 2025
c91ba97
Merge branch 'main' into tfk/rule-after-action
tefkah Mar 10, 2025
a13d506
chore: remove some comments
tefkah Mar 10, 2025
6621b11
chore: clean up rules a bit
tefkah Mar 10, 2025
56a4bc3
fix: remove commented out 'use server'
tefkah Mar 10, 2025
bf6cd9a
chore: merge
tefkah Mar 11, 2025
7d048da
Merge branch 'main' into tfk/rule-after-action
tefkah Mar 11, 2025
5621ede
Merge branch 'main' into tfk/rule-after-action
tefkah Mar 13, 2025
4ddab90
fix: use new action seed syntax
tefkah Mar 13, 2025
322c039
chore: also stub sentry
tefkah Mar 13, 2025
e7fbafe
chore: merge
tefkah Mar 17, 2025
fcf8bf0
refactor: rename triggering/watchedAction(instance) to sourceAction
tefkah Mar 17, 2025
cbc3d98
fix: revert platform-migrations changes
tefkah Mar 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions core/actions/_lib/rules.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { z } from "zod";

import type { ActionInstances } from "db/public";
import { Event } from "db/public";

import { defineRule } from "~/actions/types";
Expand Down Expand Up @@ -37,9 +38,39 @@ export const pubEnteredStage = defineRule({
});
export type PubEnteredStage = typeof pubEnteredStage;

export type Rules = PubInStageForDuration | PubLeftStage | PubEnteredStage;
export const actionSucceeded = defineRule({
event: Event.actionSucceeded,
display: {
base: "a specific action succeeds",
withConfig: (actionInstance: ActionInstances) => `${actionInstance.name} succeeds`,
},
});
export type ActionSucceeded = typeof actionSucceeded;

export const actionFailed = defineRule({
event: Event.actionFailed,
display: {
base: "a specific action fails",
withConfig: (actionInstance) => `${actionInstance.name} fails`,
},
});
export type ActionFailed = typeof actionFailed;

export type Rules =
| PubInStageForDuration
| PubLeftStage
| PubEnteredStage
| ActionSucceeded
| ActionFailed;

export type SchedulableEvent =
| Event.pubInStageForDuration
| Event.actionFailed
| Event.actionSucceeded;

export type RuleForEvent<E extends Event> = E extends E ? Extract<Rules, { event: E }> : never;

export type RuleForEvent<E extends Event> = Extract<Rules, { event: E }>;
export type SchedulableRule = RuleForEvent<SchedulableEvent>;

export type RuleConfig<Rule extends Rules = Rules> = Rule extends Rule
? NonNullable<Rule["additionalConfig"]>["_input"]
Expand Down
18 changes: 10 additions & 8 deletions core/actions/_lib/runActionInstance.db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,29 @@ const pubTriggerTestSeed = async () => {
},
stages: {
Submission: {
actions: [
{
actions: {
"1": {
action: Action.log,
config: {
debounce: 1,
},
},
{
"2": {
action: Action.email,
config: {
recipient: "[email protected]",
recipientEmail: "[email protected]",
body: "Hello",
subject: "Test",
},
},
{
"3": {
action: Action.googleDriveImport,
config: {
docUrl: "https://docs.google.com/document/d/1234567890",
outputField: `${slugName}:title`,
},
},
],
},
},
},
pubs: [
Expand Down Expand Up @@ -83,9 +83,10 @@ describe("runActionInstance", () => {
pubId: pubs[0].id,
event: Event.pubEnteredStage,
communityId: community.id,
stack: [],
});

expect(result).toEqual({
expect(result).toMatchObject({
success: true,
report: "Logged out some data, check your console.",
data: {},
Expand All @@ -101,7 +102,7 @@ describe("runActionInstance", () => {
expect(actionRuns).toHaveLength(1);

expect(actionRuns[0].status).toEqual(ActionRunStatus.success);
expect(actionRuns[0].result, "Action run should be successfully created").toEqual({
expect(actionRuns[0].result, "Action run should be successfully created").toMatchObject({
success: true,
report: "Logged out some data, check your console.",
data: {},
Expand Down Expand Up @@ -133,6 +134,7 @@ describe("runActionInstance", () => {
docUrl: fakeDocURL,
},
communityId: community.id,
stack: [],
});

expect(result).toEqual({
Expand Down
86 changes: 66 additions & 20 deletions core/actions/_lib/runActionInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ import { hydratePubValues } from "~/lib/fields/utils";
import { createLastModifiedBy } from "~/lib/lastModifiedBy";
import { getPubsWithRelatedValues } from "~/lib/server";
import { autoRevalidate } from "~/lib/server/cache/autoRevalidate";
import { isClientException } from "~/lib/serverActions";
import { MAX_STACK_DEPTH } from "~/lib/server/rules";
import { isClientExceptionOptions } from "~/lib/serverActions";
import { getActionByName } from "../api";
import { getActionRunByName } from "./getRuns";
import { resolveWithPubfields } from "./resolvePubfields";
import { scheduleActionInstances } from "./scheduleActionInstance";

export type ActionInstanceRunResult = ClientException | ClientExceptionOptions | ActionSuccess;
export type ActionInstanceRunResult = (ClientException | ClientExceptionOptions | ActionSuccess) & {
stack: ActionRunsId[];
};

export type RunActionInstanceArgs = {
pubId: PubsId;
communityId: CommunitiesId;
actionInstanceId: ActionInstancesId;
actionInstanceArgs?: Record<string, unknown>;
stack: ActionRunsId[];
scheduledActionRunId?: ActionRunsId;
} & ({ event: Event } | { userId: UsersId });

const _runActionInstance = async (
Expand All @@ -40,6 +46,8 @@ const _runActionInstance = async (
): Promise<ActionInstanceRunResult> => {
const isActionUserInitiated = "userId" in args;

const stack = [...args.stack, args.actionRunId];

const pubPromise = getPubsWithRelatedValues(
{
pubId: args.pubId,
Expand Down Expand Up @@ -83,6 +91,7 @@ const _runActionInstance = async (
return {
error: "Pub not found",
cause: pubResult.reason,
stack,
};
}

Expand All @@ -91,6 +100,7 @@ const _runActionInstance = async (
return {
error: "Action instance not found",
cause: actionInstanceResult.reason,
stack,
};
}

Expand All @@ -107,12 +117,14 @@ const _runActionInstance = async (
return {
error: `Pub ${args.pubId} is not in stage ${actionInstance.stageId}, even though the action instance is.
This most likely happened because the pub was moved before the time the action was scheduled to run.`,
stack,
};
}

if (!actionInstance.action) {
return {
error: "Action not found",
stack,
};
}

Expand All @@ -122,6 +134,7 @@ const _runActionInstance = async (
if (!actionRun || !action) {
return {
error: "Action not found",
stack,
};
}

Expand All @@ -131,6 +144,7 @@ const _runActionInstance = async (
const err = {
error: "Invalid config",
cause: parsedConfig.error,
stack,
};
if (args.actionInstanceArgs) {
// Check if the args passed can substitute for missing or invalid config
Expand All @@ -150,6 +164,7 @@ const _runActionInstance = async (
title: "Invalid pub config",
cause: parsedArgs.error,
error: "The action was run with invalid parameters",
stack,
};
}

Expand Down Expand Up @@ -203,42 +218,64 @@ const _runActionInstance = async (
actionRunId: args.actionRunId,
});

return result;
if (isClientExceptionOptions(result)) {
await scheduleActionInstances({
pubId: args.pubId,
stageId: actionInstance.stageId,
event: Event.actionFailed,
stack,
sourceActionInstanceId: actionInstance.id,
});
return { ...result, stack };
}

await scheduleActionInstances({
pubId: args.pubId,
stageId: actionInstance.stageId,
event: Event.actionSucceeded,
stack,
sourceActionInstanceId: actionInstance.id,
});

return { ...result, stack };
} catch (error) {
captureException(error);
logger.error(error);

await scheduleActionInstances({
pubId: args.pubId,
stageId: actionInstance.stageId,
event: Event.actionFailed,
stack,
sourceActionInstanceId: actionInstance.id,
});

return {
title: "Failed to run action",
error: error.message,
stack,
};
}
};

export async function runActionInstance(args: RunActionInstanceArgs, trx = db) {
if (args.stack.length > MAX_STACK_DEPTH) {
throw new Error(
`Action instance stack depth of ${args.stack.length} exceeds the maximum allowed depth of ${MAX_STACK_DEPTH}`
);
}

const isActionUserInitiated = "userId" in args;

// we need to first create the action run,
// in case the action modifies the pub and needs to pass the lastModifiedBy field
// which in this case would be `action-run:<action-run-id>`

const actionRuns = await autoRevalidate(
trx
.with(
"existingScheduledActionRun",
(db) =>
db
.selectFrom("action_runs")
.selectAll()
.where("actionInstanceId", "=", args.actionInstanceId)
.where("pubId", "=", args.pubId)
.where("status", "=", ActionRunStatus.scheduled)
// this should be guaranteed to be unique, as only one actionInstance should be scheduled per pub
)
.insertInto("action_runs")
.values((eb) => ({
id:
isActionUserInitiated || args.event !== Event.pubInStageForDuration
? undefined
: eb.selectFrom("existingScheduledActionRun").select("id"),
id: args.scheduledActionRunId,
Comment on lines -225 to +278
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since im passing around the scheduledActionRunId anyway, i don't need to look it up anymore (which was flaky).

what this was doing at first was incorrect anyway. it was based on the (then already) invalid assumption that there would only be one action run scheduled per action instance per stage per pub.
basically, when we just had the pubEnteredStage rule for a specific actionInstanceId, i would

  • schedule that actionInstance when a pub entered a stage
  • create a scheduled ActionRun
  • when the action is run, i look up the scheduled ActionRun for that actionInstance and pub, and update it.

however this was very finicky. if for instance, you move a pub in and out of a stage twice, you'd end up with two scheduled actioninstances for the same pub id. then this query would fail, as eb.selectFrom would return two actions.

this assumption would be even more incorrect now, as one actioninstance now can have many scheduled actionruns (Log1->Log2 and Log1->Log3)

even if i limited this, there was no guarantee i would pick out the correct action run.

therefore, passing around the scheduledActionRunId was the way to go

(btw the reason i am creating/updating the actionrun in the first place is bc i need it to set the lastModifiedBy if the action run ends up modifying a pub)

actionInstanceId: args.actionInstanceId,
pubId: args.pubId,
userId: isActionUserInitiated ? args.userId : null,
Expand All @@ -252,6 +289,7 @@ export async function runActionInstance(args: RunActionInstanceArgs, trx = db) {
.where("action_instances.id", "=", args.actionInstanceId),
params: args,
event: isActionUserInitiated ? undefined : args.event,
sourceActionRunId: args.stack.at(-1),
}))
.returningAll()
// conflict should only happen if a scheduled action is excecuted
Expand All @@ -269,6 +307,7 @@ export async function runActionInstance(args: RunActionInstanceArgs, trx = db) {
title: "Action run failed",
error: `Multiple scheduled action runs found for pub ${args.pubId} and action instance ${args.actionInstanceId}. This should never happen.`,
cause: `Multiple scheduled action runs found for pub ${args.pubId} and action instance ${args.actionInstanceId}. This should never happen.`,
stack: args.stack,
};

await autoRevalidate(
Expand All @@ -294,11 +333,16 @@ export async function runActionInstance(args: RunActionInstanceArgs, trx = db) {

const result = await _runActionInstance({ ...args, actionRunId: actionRun.id });

const status = isClientException(result) ? ActionRunStatus.failure : ActionRunStatus.success;
const status = isClientExceptionOptions(result)
? ActionRunStatus.failure
: ActionRunStatus.success;

// update the action run with the result
await autoRevalidate(
trx.updateTable("action_runs").set({ status, result }).where("id", "=", actionRun.id)
trx
.updateTable("action_runs")
.set({ status, result })
.where("id", "=", args.scheduledActionRunId ?? actionRun.id)
).executeTakeFirstOrThrow(
() =>
new Error(
Expand All @@ -314,6 +358,7 @@ export const runInstancesForEvent = async (
stageId: StagesId,
event: Event,
communityId: CommunitiesId,
stack: ActionRunsId[],
trx = db
) => {
const instances = await trx
Expand All @@ -335,6 +380,7 @@ export const runInstancesForEvent = async (
communityId,
actionInstanceId: instance.actionInstanceId,
event,
stack,
},
trx
),
Expand Down
Loading
Loading