diff --git a/core/actions/_lib/runActionInstance.ts b/core/actions/_lib/runActionInstance.ts index dd0355572..9bc1dea11 100644 --- a/core/actions/_lib/runActionInstance.ts +++ b/core/actions/_lib/runActionInstance.ts @@ -72,6 +72,26 @@ const _runActionInstance = async ({ }; } + const pubInStage = await db + .selectFrom("PubsInStages") + .where("pubId", "=", pubId) + .where("stageId", "=", actionInstance.stageId) + .selectAll() + .executeTakeFirst(); + + if (!pubInStage) { + logger.warn({ + msg: `Pub ${pubId} is not in stage ${actionInstance.stageId}, even though the action instance is. + This most likely happened because the pub was moved before the time the action was scheduled to run.`, + pubId, + actionInstanceId, + }); + return { + error: `Pub ${pubId} is not in stage ${actionInstance.stageId}, even though the action instance is. + This most likely happened because the pub was moved before the time the action was scheduled to run.`, + }; + } + logger.info(actionInstance.action); const action = getActionByName(actionInstance.action); diff --git a/core/actions/_lib/scheduleActionInstance.ts b/core/actions/_lib/scheduleActionInstance.ts index 46a317e95..364576d5b 100644 --- a/core/actions/_lib/scheduleActionInstance.ts +++ b/core/actions/_lib/scheduleActionInstance.ts @@ -4,13 +4,14 @@ import { jsonArrayFrom } from "kysely/helpers/postgres"; import { logger } from "logger"; +import type { ActionInstancesId } from "~/kysely/types/public/ActionInstances"; import type { PubsId } from "~/kysely/types/public/Pubs"; import type { Rules } from "~/kysely/types/public/Rules"; import type { StagesId } from "~/kysely/types/public/Stages"; import { db } from "~/kysely/database"; import Event from "~/kysely/types/public/Event"; import { addDuration } from "~/lib/dates"; -import { getJobsClient } from "~/lib/server/jobs"; +import { getJobsClient, getScheduledActionJobKey } from "~/lib/server/jobs"; export const scheduleActionInstances = async ({ pubId, @@ -47,8 +48,8 @@ export const scheduleActionInstances = async ({ .execute(); if (!instances.length) { - logger.warn({ - msg: `No action instances found for stage ${stageId}`, + logger.debug({ + msg: `No action instances found for stage ${stageId}. Most likely this is because a Pub is moved into a stage without action instances.`, pubId, stageId, instances, @@ -94,3 +95,26 @@ export const scheduleActionInstances = async ({ return results; }; + +export const unscheduleAction = async ({ + actionInstanceId, + stageId, + pubId, +}: { + actionInstanceId: ActionInstancesId; + stageId: StagesId; + pubId: PubsId; +}) => { + const jobKey = getScheduledActionJobKey({ stageId, actionInstanceId, pubId }); + try { + const jobsClient = await getJobsClient(); + await jobsClient.unscheduleJob(jobKey); + logger.debug({ msg: "Unscheduled action", actionInstanceId, stageId, pubId }); + } catch (error) { + logger.error(error); + return { + error: "Failed to unschedule action", + cause: error, + }; + } +}; diff --git a/core/app/c/[communitySlug]/stages/manage/actions.ts b/core/app/c/[communitySlug]/stages/manage/actions.ts index eaf25c381..9a4d27aec 100644 --- a/core/app/c/[communitySlug]/stages/manage/actions.ts +++ b/core/app/c/[communitySlug]/stages/manage/actions.ts @@ -3,19 +3,21 @@ import type { Action as PrismaAction } from "@prisma/client"; import { revalidateTag } from "next/cache"; +import { captureException } from "@sentry/nextjs"; import { logger } from "logger"; +import type { CreateRuleSchema } from "./components/panel/StagePanelRuleCreator"; import type Action from "~/kysely/types/public/Action"; -import type Event from "~/kysely/types/public/Event"; +import type { CommunitiesId } from "~/kysely/types/public/Communities"; import type { RulesId } from "~/kysely/types/public/Rules"; +import { unscheduleAction } from "~/actions/_lib/scheduleActionInstance"; import { humanReadableEvent } from "~/actions/api"; import { db } from "~/kysely/database"; import { type ActionInstancesId } from "~/kysely/types/public/ActionInstances"; -import { CommunitiesId } from "~/kysely/types/public/Communities"; +import Event from "~/kysely/types/public/Event"; import { defineServerAction } from "~/lib/server/defineServerAction"; import prisma from "~/prisma/db"; -import { CreateRuleSchema } from "./components/panel/StagePanelRuleCreator"; async function deleteStages(stageIds: string[]) { await prisma.stage.deleteMany({ @@ -281,7 +283,60 @@ export const deleteRule = defineServerAction(async function deleteRule( communityId: string ) { try { - await db.deleteFrom("rules").where("id", "=", ruleId).executeTakeFirst(); + const deletedRule = await db + .deleteFrom("rules") + .where("id", "=", ruleId) + .returningAll() + .executeTakeFirst(); + + if (!deletedRule) { + return { + error: "Failed to delete rule", + cause: `Rule with id ${ruleId} not found`, + }; + } + + if (deletedRule.event !== Event.pubInStageForDuration) { + return; + } + + const actionInstance = await db + .selectFrom("action_instances") + .select(["id", "action", "stage_id"]) + .where("id", "=", deletedRule.action_instance_id) + .executeTakeFirst(); + + if (!actionInstance) { + // something is wrong here + captureException( + new Error( + `Action instance not found for rule ${ruleId} while trying to unschedule jobs` + ) + ); + return; + } + + const pubsInStage = await db + .selectFrom("PubsInStages") + .select(["pubId", "stageId"]) + .where("stageId", "=", actionInstance.stage_id) + .execute(); + + if (!pubsInStage) { + // we don't need to unschedule any jobs, as there are no pubs this rule could have been applied to + return; + } + + logger.debug(`Unscheduling jobs for rule ${ruleId}`); + await Promise.all( + pubsInStage.map(async (pubInStage) => + unscheduleAction({ + actionInstanceId: actionInstance.id, + pubId: pubInStage.pubId, + stageId: pubInStage.stageId, + }) + ) + ); } catch (error) { logger.error(error); return { diff --git a/core/lib/server/jobs.ts b/core/lib/server/jobs.ts index 1916433dd..be47d867a 100644 --- a/core/lib/server/jobs.ts +++ b/core/lib/server/jobs.ts @@ -7,14 +7,26 @@ import { logger } from "logger"; import type { ClientExceptionOptions } from "../serverActions"; import { env } from "../env/env.mjs"; -import { ClientException } from "../serverActions"; import "date-fns"; import type { Interval } from "~/actions/_lib/rules"; +import type { ActionInstancesId } from "~/kysely/types/public/ActionInstances"; +import type { PubsId } from "~/kysely/types/public/Pubs"; +import type { StagesId } from "~/kysely/types/public/Stages"; import Event from "~/kysely/types/public/Event"; import { addDuration } from "../dates"; +export const getScheduledActionJobKey = ({ + stageId, + actionInstanceId, + pubId, +}: { + stageId: StagesId; + actionInstanceId: ActionInstancesId; + pubId: PubsId; +}) => `scheduled-action-${stageId}-${actionInstanceId}-${pubId}`; + export type JobsClient = { scheduleEmail( instanceId: string, @@ -23,11 +35,11 @@ export type JobsClient = { ): Promise; unscheduleJob(jobKey: string): Promise; scheduleAction(options: { - actionInstanceId: string; - stageId: string; + actionInstanceId: ActionInstancesId; + stageId: StagesId; duration: number; interval: Interval; - pubId: string; + pubId: PubsId; }): Promise; }; @@ -57,13 +69,19 @@ export const makeJobsClient = async (): Promise => { return job; }, async unscheduleJob(jobKey: string) { - logger.info({ msg: `Unscheduling email with key: ${jobKey}`, job: { key: jobKey } }); + logger.info({ msg: `Unscheduling job with key: ${jobKey}`, job: { key: jobKey } }); await workerUtils.withPgClient(async (pg) => { await pg.query(`SELECT graphile_worker.remove_job($1);`, [jobKey]); }); + + logger.info({ + msg: `Successfully unscheduled job with key: ${jobKey}`, + job: { key: jobKey }, + }); }, async scheduleAction({ actionInstanceId, stageId, duration, interval, pubId }) { const runAt = addDuration({ duration, interval }); + const jobKey = getScheduledActionJobKey({ stageId, actionInstanceId, pubId }); logger.info({ msg: `Scheduling action with key: ${actionInstanceId} to run at ${runAt}`, @@ -88,6 +106,8 @@ export const makeJobsClient = async (): Promise => { }, { runAt, + jobKey, + jobKeyMode: "replace", } ); diff --git a/core/next.config.mjs b/core/next.config.mjs index f5933f6f5..18a41e218 100644 --- a/core/next.config.mjs +++ b/core/next.config.mjs @@ -38,7 +38,12 @@ const nextConfig = { }, experimental: { instrumentationHook: true, - serverComponentsExternalPackages: ["@aws-sdk"], + serverComponentsExternalPackages: [ + "@aws-sdk", + // without this here, next will sort of implode and no longer compile and serve pages properly + // if graphile-worker is used in server actions + "graphile-worker", + ], }, // open telemetry cries a lot during build, don't think it's serious // https://github.com/open-telemetry/opentelemetry-js/issues/4173 diff --git a/core/pages/api/v0/[...ts-rest].ts b/core/pages/api/v0/[...ts-rest].ts index 24a6c8a95..52184462f 100644 --- a/core/pages/api/v0/[...ts-rest].ts +++ b/core/pages/api/v0/[...ts-rest].ts @@ -192,7 +192,7 @@ const internalRouter = createNextRoute(api.internal, { return { status: 200, - body: actionScheduleResults, + body: actionScheduleResults ?? [], }; }, }); diff --git a/jobs/index.ts b/jobs/index.ts index b71490024..96bf9bb09 100644 --- a/jobs/index.ts +++ b/jobs/index.ts @@ -96,6 +96,7 @@ const makeTaskList = (client: Client<{}>) => { if ("operation" in payload && payload.operation === "INSERT") { const { stageId, pubId } = payload.new; + eventLogger.info({ msg: "Attempting to schedule action", stageId, pubId }); schedulePromise = apiClient.scheduleAction({ params: { stageId }, body: { pubId }, @@ -143,7 +144,11 @@ const makeTaskList = (client: Client<{}>) => { event, ...extra, }); - } else if (scheduleResult.value !== null) { + } else if ( + scheduleResult.value && + scheduleResult.value.status === 200 && + scheduleResult.value.body?.length > 0 + ) { eventLogger.info({ msg: "Action scheduled", results: scheduleResult.value, diff --git a/packages/contracts/src/resources/internal.ts b/packages/contracts/src/resources/internal.ts index 3c4b6fa27..9a54572d5 100644 --- a/packages/contracts/src/resources/internal.ts +++ b/packages/contracts/src/resources/internal.ts @@ -17,16 +17,14 @@ export const internalApi = contract.router( pubId: z.string(), }), responses: { - 200: z - .array( - z.object({ - actionInstanceName: z.string(), - actionInstanceId: z.string(), - runAt: z.string(), - result: z.any(), - }) - ) - .optional(), + 200: z.array( + z.object({ + actionInstanceName: z.string(), + actionInstanceId: z.string(), + runAt: z.string(), + result: z.any(), + }) + ), }, }, triggerAction: {