From 773765f82dd5d98dd61d718e8633f89a9a025a37 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 10 Oct 2023 15:47:53 +0100 Subject: [PATCH] Cleanup finished graphile_worker.jobs last run more than 7 days ago --- apps/webapp/app/platform/zodWorker.server.ts | 74 ++++++++++++++++++- .../app/routes/api.v1.runs.$runId.tasks.ts | 2 +- apps/webapp/app/services/logger.server.ts | 7 ++ apps/webapp/app/services/worker.server.ts | 10 ++- 4 files changed, 88 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/platform/zodWorker.server.ts b/apps/webapp/app/platform/zodWorker.server.ts index ea3cafe4b9..d73df16274 100644 --- a/apps/webapp/app/platform/zodWorker.server.ts +++ b/apps/webapp/app/platform/zodWorker.server.ts @@ -14,7 +14,7 @@ import { run as graphileRun, parseCronItems } from "graphile-worker"; import omit from "lodash.omit"; import { z } from "zod"; import { PrismaClient, PrismaClientOrTransaction } from "~/db.server"; -import { logger } from "~/services/logger.server"; +import { workerLogger as logger } from "~/services/logger.server"; export interface MessageCatalogSchema { [key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion; @@ -81,6 +81,14 @@ export type ZodWorkerDequeueOptions = { tx?: PrismaClientOrTransaction; }; +const CLEANUP_TASK_NAME = "__cleanupOldJobs"; + +export type ZodWorkerCleanupOptions = { + frequencyExpression: string; // cron expression + ttl: number; + taskOptions?: CronItemOptions; +}; + export type ZodWorkerOptions = { name: string; runnerOptions: RunnerOptions; @@ -88,6 +96,7 @@ export type ZodWorkerOptions = { schema: TMessageCatalog; tasks: ZodTasks; recurringTasks?: ZodRecurringTasks; + cleanup?: ZodWorkerCleanupOptions; }; export class ZodWorker { @@ -98,6 +107,7 @@ export class ZodWorker { #tasks: ZodTasks; #recurringTasks?: ZodRecurringTasks; #runner?: GraphileRunner; + #cleanup: ZodWorkerCleanupOptions | undefined; constructor(options: ZodWorkerOptions) { this.#name = options.name; @@ -106,6 +116,7 @@ export class ZodWorker { this.#runnerOptions = options.runnerOptions; this.#tasks = options.tasks; this.#recurringTasks = options.recurringTasks; + this.#cleanup = options.cleanup; } get graphileWorkerSchema() { @@ -337,12 +348,29 @@ export class ZodWorker { taskList[key] = task; } + if (this.#cleanup) { + const task: Task = (payload, helpers) => { + return this.#handleCleanup(payload, helpers); + }; + + taskList[CLEANUP_TASK_NAME] = task; + } + return taskList; } #createCronItemsFromRecurringTasks() { const cronItems: CronItem[] = []; + if (this.#cleanup) { + cronItems.push({ + pattern: this.#cleanup.frequencyExpression, + identifier: CLEANUP_TASK_NAME, + task: CLEANUP_TASK_NAME, + options: this.#cleanup.taskOptions, + }); + } + if (!this.#recurringTasks) { return cronItems; } @@ -434,6 +462,50 @@ export class ZodWorker { } } + async #handleCleanup(rawPayload: unknown, helpers: JobHelpers): Promise { + if (!this.#cleanup) { + return; + } + + const job = helpers.job; + + logger.debug("Received cleanup task", { + payload: rawPayload, + job, + }); + + const parsedPayload = RawCronPayloadSchema.safeParse(rawPayload); + + if (!parsedPayload.success) { + throw new Error( + `Failed to parse cleanup task payload: ${JSON.stringify(parsedPayload.error)}` + ); + } + + const payload = parsedPayload.data; + + // Add the this.#cleanup.ttl to the payload._cron.ts + const expirationDate = new Date(payload._cron.ts.getTime() - this.#cleanup.ttl); + + logger.debug("Cleaning up old jobs", { + expirationDate, + payload, + }); + + const rawResults = await this.#prisma.$queryRawUnsafe( + `DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts RETURNING id`, + expirationDate + ); + + const results = Array.isArray(rawResults) ? rawResults : []; + + logger.debug("Cleaned up old jobs", { + count: results.length, + expirationDate, + payload, + }); + } + #logDebug(message: string, args?: any) { logger.debug(`[worker][${this.#name}] ${message}`, args); } diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts index 1e4dc77362..871a48d92e 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts @@ -305,7 +305,7 @@ export class RunTaskService { { id: task.id, }, - { tx, runAt: task.delayUntil ?? undefined } + { tx, runAt: task.delayUntil ?? undefined, jobKey: `operation:${task.id}` } ); } else if (task.status === "WAITING" && callbackUrl && taskBody.callback) { if (taskBody.callback.timeoutInSeconds > 0) { diff --git a/apps/webapp/app/services/logger.server.ts b/apps/webapp/app/services/logger.server.ts index fdc886479e..75bb91ca14 100644 --- a/apps/webapp/app/services/logger.server.ts +++ b/apps/webapp/app/services/logger.server.ts @@ -8,3 +8,10 @@ export const logger = new Logger( ["examples", "output", "connectionString", "payload"], sensitiveDataReplacer ); + +export const workerLogger = new Logger( + "worker", + (process.env.APP_LOG_LEVEL ?? "debug") as LogLevel, + ["examples", "output", "connectionString"], + sensitiveDataReplacer +); diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 07493aab51..9cc428b754 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -128,6 +128,12 @@ function getWorkerQueue() { return new ZodWorker({ name: "workerQueue", prisma, + cleanup: { + // cleanup once per hour + frequencyExpression: "0 * * * *", + // delete jobs that have been completed for more than 7 days + ttl: 7 * 24 * 60 * 60 * 1000, + }, runnerOptions: { connectionString: env.DATABASE_URL, concurrency: env.WORKER_CONCURRENCY, @@ -229,6 +235,7 @@ function getWorkerQueue() { deliverHttpSourceRequest: { priority: 1, // smaller number = higher priority maxAttempts: 14, + queueName: (payload) => `sources:${payload.id}`, handler: async (payload, job) => { const service = new DeliverHttpSourceRequestService(); @@ -255,7 +262,6 @@ function getWorkerQueue() { }, performTaskOperation: { priority: 0, // smaller number = higher priority - queueName: (payload) => `tasks:${payload.id}`, maxAttempts: 3, handler: async (payload, job) => { const service = new PerformTaskOperationService(); @@ -264,7 +270,6 @@ function getWorkerQueue() { }, }, scheduleEmail: { - queueName: "internal-queue", priority: 100, maxAttempts: 3, handler: async (payload, job) => { @@ -291,7 +296,6 @@ function getWorkerQueue() { }, refreshOAuthToken: { priority: 8, // smaller number = higher priority - queueName: "internal-queue", maxAttempts: 7, handler: async (payload, job) => { await integrationAuthRepository.refreshConnection({