Skip to content

Commit 773765f

Browse files
committed
Cleanup finished graphile_worker.jobs last run more than 7 days ago
1 parent 476e2f0 commit 773765f

File tree

4 files changed

+88
-5
lines changed

4 files changed

+88
-5
lines changed

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { run as graphileRun, parseCronItems } from "graphile-worker";
1414
import omit from "lodash.omit";
1515
import { z } from "zod";
1616
import { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
17-
import { logger } from "~/services/logger.server";
17+
import { workerLogger as logger } from "~/services/logger.server";
1818

1919
export interface MessageCatalogSchema {
2020
[key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
@@ -81,13 +81,22 @@ export type ZodWorkerDequeueOptions = {
8181
tx?: PrismaClientOrTransaction;
8282
};
8383

84+
const CLEANUP_TASK_NAME = "__cleanupOldJobs";
85+
86+
export type ZodWorkerCleanupOptions = {
87+
frequencyExpression: string; // cron expression
88+
ttl: number;
89+
taskOptions?: CronItemOptions;
90+
};
91+
8492
export type ZodWorkerOptions<TMessageCatalog extends MessageCatalogSchema> = {
8593
name: string;
8694
runnerOptions: RunnerOptions;
8795
prisma: PrismaClient;
8896
schema: TMessageCatalog;
8997
tasks: ZodTasks<TMessageCatalog>;
9098
recurringTasks?: ZodRecurringTasks;
99+
cleanup?: ZodWorkerCleanupOptions;
91100
};
92101

93102
export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
@@ -98,6 +107,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
98107
#tasks: ZodTasks<TMessageCatalog>;
99108
#recurringTasks?: ZodRecurringTasks;
100109
#runner?: GraphileRunner;
110+
#cleanup: ZodWorkerCleanupOptions | undefined;
101111

102112
constructor(options: ZodWorkerOptions<TMessageCatalog>) {
103113
this.#name = options.name;
@@ -106,6 +116,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
106116
this.#runnerOptions = options.runnerOptions;
107117
this.#tasks = options.tasks;
108118
this.#recurringTasks = options.recurringTasks;
119+
this.#cleanup = options.cleanup;
109120
}
110121

111122
get graphileWorkerSchema() {
@@ -337,12 +348,29 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
337348
taskList[key] = task;
338349
}
339350

351+
if (this.#cleanup) {
352+
const task: Task = (payload, helpers) => {
353+
return this.#handleCleanup(payload, helpers);
354+
};
355+
356+
taskList[CLEANUP_TASK_NAME] = task;
357+
}
358+
340359
return taskList;
341360
}
342361

343362
#createCronItemsFromRecurringTasks() {
344363
const cronItems: CronItem[] = [];
345364

365+
if (this.#cleanup) {
366+
cronItems.push({
367+
pattern: this.#cleanup.frequencyExpression,
368+
identifier: CLEANUP_TASK_NAME,
369+
task: CLEANUP_TASK_NAME,
370+
options: this.#cleanup.taskOptions,
371+
});
372+
}
373+
346374
if (!this.#recurringTasks) {
347375
return cronItems;
348376
}
@@ -434,6 +462,50 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
434462
}
435463
}
436464

465+
async #handleCleanup(rawPayload: unknown, helpers: JobHelpers): Promise<void> {
466+
if (!this.#cleanup) {
467+
return;
468+
}
469+
470+
const job = helpers.job;
471+
472+
logger.debug("Received cleanup task", {
473+
payload: rawPayload,
474+
job,
475+
});
476+
477+
const parsedPayload = RawCronPayloadSchema.safeParse(rawPayload);
478+
479+
if (!parsedPayload.success) {
480+
throw new Error(
481+
`Failed to parse cleanup task payload: ${JSON.stringify(parsedPayload.error)}`
482+
);
483+
}
484+
485+
const payload = parsedPayload.data;
486+
487+
// Add the this.#cleanup.ttl to the payload._cron.ts
488+
const expirationDate = new Date(payload._cron.ts.getTime() - this.#cleanup.ttl);
489+
490+
logger.debug("Cleaning up old jobs", {
491+
expirationDate,
492+
payload,
493+
});
494+
495+
const rawResults = await this.#prisma.$queryRawUnsafe(
496+
`DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts RETURNING id`,
497+
expirationDate
498+
);
499+
500+
const results = Array.isArray(rawResults) ? rawResults : [];
501+
502+
logger.debug("Cleaned up old jobs", {
503+
count: results.length,
504+
expirationDate,
505+
payload,
506+
});
507+
}
508+
437509
#logDebug(message: string, args?: any) {
438510
logger.debug(`[worker][${this.#name}] ${message}`, args);
439511
}

apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ export class RunTaskService {
305305
{
306306
id: task.id,
307307
},
308-
{ tx, runAt: task.delayUntil ?? undefined }
308+
{ tx, runAt: task.delayUntil ?? undefined, jobKey: `operation:${task.id}` }
309309
);
310310
} else if (task.status === "WAITING" && callbackUrl && taskBody.callback) {
311311
if (taskBody.callback.timeoutInSeconds > 0) {

apps/webapp/app/services/logger.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,10 @@ export const logger = new Logger(
88
["examples", "output", "connectionString", "payload"],
99
sensitiveDataReplacer
1010
);
11+
12+
export const workerLogger = new Logger(
13+
"worker",
14+
(process.env.APP_LOG_LEVEL ?? "debug") as LogLevel,
15+
["examples", "output", "connectionString"],
16+
sensitiveDataReplacer
17+
);

apps/webapp/app/services/worker.server.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ function getWorkerQueue() {
128128
return new ZodWorker({
129129
name: "workerQueue",
130130
prisma,
131+
cleanup: {
132+
// cleanup once per hour
133+
frequencyExpression: "0 * * * *",
134+
// delete jobs that have been completed for more than 7 days
135+
ttl: 7 * 24 * 60 * 60 * 1000,
136+
},
131137
runnerOptions: {
132138
connectionString: env.DATABASE_URL,
133139
concurrency: env.WORKER_CONCURRENCY,
@@ -229,6 +235,7 @@ function getWorkerQueue() {
229235
deliverHttpSourceRequest: {
230236
priority: 1, // smaller number = higher priority
231237
maxAttempts: 14,
238+
queueName: (payload) => `sources:${payload.id}`,
232239
handler: async (payload, job) => {
233240
const service = new DeliverHttpSourceRequestService();
234241

@@ -255,7 +262,6 @@ function getWorkerQueue() {
255262
},
256263
performTaskOperation: {
257264
priority: 0, // smaller number = higher priority
258-
queueName: (payload) => `tasks:${payload.id}`,
259265
maxAttempts: 3,
260266
handler: async (payload, job) => {
261267
const service = new PerformTaskOperationService();
@@ -264,7 +270,6 @@ function getWorkerQueue() {
264270
},
265271
},
266272
scheduleEmail: {
267-
queueName: "internal-queue",
268273
priority: 100,
269274
maxAttempts: 3,
270275
handler: async (payload, job) => {
@@ -291,7 +296,6 @@ function getWorkerQueue() {
291296
},
292297
refreshOAuthToken: {
293298
priority: 8, // smaller number = higher priority
294-
queueName: "internal-queue",
295299
maxAttempts: 7,
296300
handler: async (payload, job) => {
297301
await integrationAuthRepository.refreshConnection({

0 commit comments

Comments
 (0)