Skip to content

Commit

Permalink
feat: do not run jobs when pub is moved and unschedule jobs when rule…
Browse files Browse the repository at this point in the history
… is removed
  • Loading branch information
tefkah committed May 28, 2024
1 parent e9f4f83 commit 6b2e107
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 25 deletions.
20 changes: 20 additions & 0 deletions core/actions/_lib/runActionInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ const _runActionInstance = async ({
};
}

const pubInStage = await db
.selectFrom("PubsInStages")
.where("pubId", "=", pubId)
.where("stageId", "=", actionInstance.stageId)
.selectAll()
.executeTakeFirst();

if (!pubInStage) {
logger.warn({
msg: `Pub ${pubId} is not in stage ${actionInstance.stageId}, even though the action instance is.
This most likely happened because the pub was moved before the time the action was scheduled to run.`,
pubId,
actionInstanceId,
});
return {
error: `Pub ${pubId} is not in stage ${actionInstance.stageId}, even though the action instance is.
This most likely happened because the pub was moved before the time the action was scheduled to run.`,
};
}

logger.info(actionInstance.action);
const action = getActionByName(actionInstance.action);

Expand Down
30 changes: 27 additions & 3 deletions core/actions/_lib/scheduleActionInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import { jsonArrayFrom } from "kysely/helpers/postgres";

import { logger } from "logger";

import type { ActionInstancesId } from "~/kysely/types/public/ActionInstances";
import type { PubsId } from "~/kysely/types/public/Pubs";
import type { Rules } from "~/kysely/types/public/Rules";
import type { StagesId } from "~/kysely/types/public/Stages";
import { db } from "~/kysely/database";
import Event from "~/kysely/types/public/Event";
import { addDuration } from "~/lib/dates";
import { getJobsClient } from "~/lib/server/jobs";
import { getJobsClient, getScheduledActionJobKey } from "~/lib/server/jobs";

export const scheduleActionInstances = async ({
pubId,
Expand Down Expand Up @@ -47,8 +48,8 @@ export const scheduleActionInstances = async ({
.execute();

if (!instances.length) {
logger.warn({
msg: `No action instances found for stage ${stageId}`,
logger.debug({
msg: `No action instances found for stage ${stageId}. Most likely this is because a Pub is moved into a stage without action instances.`,
pubId,
stageId,
instances,
Expand Down Expand Up @@ -94,3 +95,26 @@ export const scheduleActionInstances = async ({

return results;
};

export const unscheduleAction = async ({
actionInstanceId,
stageId,
pubId,
}: {
actionInstanceId: ActionInstancesId;
stageId: StagesId;
pubId: PubsId;
}) => {
const jobKey = getScheduledActionJobKey({ stageId, actionInstanceId, pubId });
try {
const jobsClient = await getJobsClient();
await jobsClient.unscheduleJob(jobKey);
logger.debug({ msg: "Unscheduled action", actionInstanceId, stageId, pubId });
} catch (error) {
logger.error(error);
return {
error: "Failed to unschedule action",
cause: error,
};
}
};
63 changes: 59 additions & 4 deletions core/app/c/[communitySlug]/stages/manage/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
import type { Action as PrismaAction } from "@prisma/client";

import { revalidateTag } from "next/cache";
import { captureException } from "@sentry/nextjs";

import { logger } from "logger";

import type { CreateRuleSchema } from "./components/panel/StagePanelRuleCreator";
import type Action from "~/kysely/types/public/Action";
import type Event from "~/kysely/types/public/Event";
import type { CommunitiesId } from "~/kysely/types/public/Communities";
import type { RulesId } from "~/kysely/types/public/Rules";
import { unscheduleAction } from "~/actions/_lib/scheduleActionInstance";
import { humanReadableEvent } from "~/actions/api";
import { db } from "~/kysely/database";
import { type ActionInstancesId } from "~/kysely/types/public/ActionInstances";
import { CommunitiesId } from "~/kysely/types/public/Communities";
import Event from "~/kysely/types/public/Event";
import { defineServerAction } from "~/lib/server/defineServerAction";
import prisma from "~/prisma/db";
import { CreateRuleSchema } from "./components/panel/StagePanelRuleCreator";

async function deleteStages(stageIds: string[]) {
await prisma.stage.deleteMany({
Expand Down Expand Up @@ -281,7 +283,60 @@ export const deleteRule = defineServerAction(async function deleteRule(
communityId: string
) {
try {
await db.deleteFrom("rules").where("id", "=", ruleId).executeTakeFirst();
const deletedRule = await db
.deleteFrom("rules")
.where("id", "=", ruleId)
.returningAll()
.executeTakeFirst();

if (!deletedRule) {
return {
error: "Failed to delete rule",
cause: `Rule with id ${ruleId} not found`,
};
}

if (deletedRule.event !== Event.pubInStageForDuration) {
return;
}

const actionInstance = await db
.selectFrom("action_instances")
.select(["id", "action", "stage_id"])
.where("id", "=", deletedRule.action_instance_id)
.executeTakeFirst();

if (!actionInstance) {
// something is wrong here
captureException(
new Error(
`Action instance not found for rule ${ruleId} while trying to unschedule jobs`
)
);
return;
}

const pubsInStage = await db
.selectFrom("PubsInStages")
.select(["pubId", "stageId"])
.where("stageId", "=", actionInstance.stage_id)
.execute();

if (!pubsInStage) {
// we don't need to unschedule any jobs, as there are no pubs this rule could have been applied to
return;
}

logger.debug(`Unscheduling jobs for rule ${ruleId}`);
await Promise.all(
pubsInStage.map(async (pubInStage) =>
unscheduleAction({
actionInstanceId: actionInstance.id,
pubId: pubInStage.pubId,
stageId: pubInStage.stageId,
})
)
);
} catch (error) {
logger.error(error);
return {
Expand Down
30 changes: 25 additions & 5 deletions core/lib/server/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,26 @@ import { logger } from "logger";

import type { ClientExceptionOptions } from "../serverActions";
import { env } from "../env/env.mjs";
import { ClientException } from "../serverActions";

import "date-fns";

import type { Interval } from "~/actions/_lib/rules";
import type { ActionInstancesId } from "~/kysely/types/public/ActionInstances";
import type { PubsId } from "~/kysely/types/public/Pubs";
import type { StagesId } from "~/kysely/types/public/Stages";
import Event from "~/kysely/types/public/Event";
import { addDuration } from "../dates";

export const getScheduledActionJobKey = ({
stageId,
actionInstanceId,
pubId,
}: {
stageId: StagesId;
actionInstanceId: ActionInstancesId;
pubId: PubsId;
}) => `scheduled-action-${stageId}-${actionInstanceId}-${pubId}`;

export type JobsClient = {
scheduleEmail(
instanceId: string,
Expand All @@ -23,11 +35,11 @@ export type JobsClient = {
): Promise<Job>;
unscheduleJob(jobKey: string): Promise<void>;
scheduleAction(options: {
actionInstanceId: string;
stageId: string;
actionInstanceId: ActionInstancesId;
stageId: StagesId;
duration: number;
interval: Interval;
pubId: string;
pubId: PubsId;
}): Promise<Job | ClientExceptionOptions>;
};

Expand Down Expand Up @@ -57,13 +69,19 @@ export const makeJobsClient = async (): Promise<JobsClient> => {
return job;
},
async unscheduleJob(jobKey: string) {
logger.info({ msg: `Unscheduling email with key: ${jobKey}`, job: { key: jobKey } });
logger.info({ msg: `Unscheduling job with key: ${jobKey}`, job: { key: jobKey } });
await workerUtils.withPgClient(async (pg) => {
await pg.query(`SELECT graphile_worker.remove_job($1);`, [jobKey]);
});

logger.info({
msg: `Successfully unscheduled job with key: ${jobKey}`,
job: { key: jobKey },
});
},
async scheduleAction({ actionInstanceId, stageId, duration, interval, pubId }) {
const runAt = addDuration({ duration, interval });
const jobKey = getScheduledActionJobKey({ stageId, actionInstanceId, pubId });

logger.info({
msg: `Scheduling action with key: ${actionInstanceId} to run at ${runAt}`,
Expand All @@ -88,6 +106,8 @@ export const makeJobsClient = async (): Promise<JobsClient> => {
},
{
runAt,
jobKey,
jobKeyMode: "replace",
}
);

Expand Down
7 changes: 6 additions & 1 deletion core/next.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ const nextConfig = {
},
experimental: {
instrumentationHook: true,
serverComponentsExternalPackages: ["@aws-sdk"],
serverComponentsExternalPackages: [
"@aws-sdk",
// without this here, next will sort of implode and no longer compile and serve pages properly
// if graphile-worker is used in server actions
"graphile-worker",
],
},
// open telemetry cries a lot during build, don't think it's serious
// https://github.com/open-telemetry/opentelemetry-js/issues/4173
Expand Down
2 changes: 1 addition & 1 deletion core/pages/api/v0/[...ts-rest].ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ const internalRouter = createNextRoute(api.internal, {

return {
status: 200,
body: actionScheduleResults,
body: actionScheduleResults ?? [],
};
},
});
Expand Down
7 changes: 6 additions & 1 deletion jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ const makeTaskList = (client: Client<{}>) => {
if ("operation" in payload && payload.operation === "INSERT") {
const { stageId, pubId } = payload.new;

eventLogger.info({ msg: "Attempting to schedule action", stageId, pubId });
schedulePromise = apiClient.scheduleAction({
params: { stageId },
body: { pubId },
Expand Down Expand Up @@ -143,7 +144,11 @@ const makeTaskList = (client: Client<{}>) => {
event,
...extra,
});
} else if (scheduleResult.value !== null) {
} else if (
scheduleResult.value &&
scheduleResult.value.status === 200 &&
scheduleResult.value.body?.length > 0
) {
eventLogger.info({
msg: "Action scheduled",
results: scheduleResult.value,
Expand Down
18 changes: 8 additions & 10 deletions packages/contracts/src/resources/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ export const internalApi = contract.router(
pubId: z.string(),
}),
responses: {
200: z
.array(
z.object({
actionInstanceName: z.string(),
actionInstanceId: z.string(),
runAt: z.string(),
result: z.any(),
})
)
.optional(),
200: z.array(
z.object({
actionInstanceName: z.string(),
actionInstanceId: z.string(),
runAt: z.string(),
result: z.any(),
})
),
},
},
triggerAction: {
Expand Down

0 comments on commit 6b2e107

Please sign in to comment.