Skip to content

Commit

Permalink
Report worker metrics when WORKER_REPORTER_EMAIL env var is set
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam committed Oct 10, 2023
1 parent f54a9ff commit b737f7a
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 8 deletions.
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const EnvironmentSchema = z.object({
EXECUTION_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
WORKER_ENABLED: z.string().default("true"),
EXECUTION_WORKER_ENABLED: z.string().default("true"),
WORKER_REPORTER_EMAIL: z.string().email().optional(),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
85 changes: 83 additions & 2 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ export type ZodWorkerDequeueOptions = {
};

const CLEANUP_TASK_NAME = "__cleanupOldJobs";
const REPORTER_TASK_NAME = "__reporter";

export type ZodWorkerCleanupOptions = {
frequencyExpression: string; // cron expression
ttl: number;
maxCount: number;
taskOptions?: CronItemOptions;
};

Expand All @@ -97,6 +99,7 @@ export type ZodWorkerOptions<TMessageCatalog extends MessageCatalogSchema> = {
tasks: ZodTasks<TMessageCatalog>;
recurringTasks?: ZodRecurringTasks;
cleanup?: ZodWorkerCleanupOptions;
reporter?: (subject: string, message: string) => Promise<void>;
};

export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
Expand All @@ -108,6 +111,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
#recurringTasks?: ZodRecurringTasks;
#runner?: GraphileRunner;
#cleanup: ZodWorkerCleanupOptions | undefined;
#reporter?: (subject: string, message: string) => Promise<void>;

constructor(options: ZodWorkerOptions<TMessageCatalog>) {
this.#name = options.name;
Expand All @@ -117,6 +121,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
this.#tasks = options.tasks;
this.#recurringTasks = options.recurringTasks;
this.#cleanup = options.cleanup;
this.#reporter = options.reporter;
}

get graphileWorkerSchema() {
Expand Down Expand Up @@ -356,6 +361,14 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
taskList[CLEANUP_TASK_NAME] = task;
}

if (this.#reporter) {
const task: Task = (payload, helpers) => {
return this.#handleReporter(payload, helpers);
};

taskList[REPORTER_TASK_NAME] = task;
}

return taskList;
}

Expand All @@ -371,6 +384,14 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
});
}

if (this.#reporter) {
cronItems.push({
pattern: "50 * * * *", // Every hour at 50 minutes past the hour
identifier: REPORTER_TASK_NAME,
task: REPORTER_TASK_NAME,
});
}

if (!this.#recurringTasks) {
return cronItems;
}
Expand Down Expand Up @@ -493,8 +514,9 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
});

const rawResults = await this.#prisma.$queryRawUnsafe(
`DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts RETURNING id`,
expirationDate
`WITH rows AS (SELECT id FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts ORDER BY run_at ASC LIMIT $2) DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE id IN (SELECT id FROM rows) RETURNING id`,
expirationDate,
this.#cleanup.maxCount
);

const results = Array.isArray(rawResults) ? rawResults : [];
Expand All @@ -504,6 +526,65 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
expirationDate,
payload,
});

if (this.#reporter) {
await this.#reporter(
"Worker Queue Cleanup",
`Cleaned up ${results.length} jobs older than ${expirationDate.toISOString()}`
);
}
}

async #handleReporter(rawPayload: unknown, helpers: JobHelpers): Promise<void> {
if (!this.#reporter) {
return;
}

logger.debug("Received reporter task", {
payload: rawPayload,
});

const parsedPayload = RawCronPayloadSchema.safeParse(rawPayload);

if (!parsedPayload.success) {
throw new Error(
`Failed to parse cleanup task payload: ${JSON.stringify(parsedPayload.error)}`
);
}

const payload = parsedPayload.data;

// Subtract an hour from the payload._cron.ts
const startAt = new Date(payload._cron.ts.getTime() - 1000 * 60 * 60);

const schema = z.array(z.object({ count: z.coerce.number() }));

// Count the number of jobs that have been added since the startAt date and before the payload._cron.ts date
const rawAddedResults = await this.#prisma.$queryRawUnsafe(
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs WHERE created_at > $1 AND created_at < $2`,
startAt,
payload._cron.ts
);

const addedCountResults = schema.parse(rawAddedResults)[0];

// Count the total number of jobs in the jobs table
const rawTotalResults = await this.#prisma.$queryRawUnsafe(
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs`
);

const totalCountResults = schema.parse(rawTotalResults)[0];

logger.debug("Calculated metrics about the jobs table", {
rawAddedResults,
rawTotalResults,
payload,
});

await this.#reporter(
"Worker Queue Metrics",
`Added ${addedCountResults.count} jobs in the last hour, total jobs: ${totalCountResults.count}`
);
}

#logDebug(message: string, args?: any) {
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/services/email.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { DeliverEmail } from "emails";
import type { DeliverEmail, SendPlainTextOptions } from "emails";
import { EmailClient } from "emails";
import type { SendEmailOptions } from "remix-auth-email-link";
import { redirect } from "remix-typedjson";
Expand Down Expand Up @@ -27,6 +27,10 @@ export async function sendMagicLinkEmail(options: SendEmailOptions<AuthUser>): P
});
}

export async function sendPlainTextEmail(options: SendPlainTextOptions) {
return client.sendPlainText(options);
}

export async function scheduleWelcomeEmail(user: User) {
//delay for one minute in development, longer in production
const delay = process.env.NODE_ENV === "development" ? 1000 * 60 : 1000 * 60 * 22;
Expand Down
21 changes: 16 additions & 5 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { z } from "zod";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { ZodWorker } from "~/platform/zodWorker.server";
import { sendEmail } from "./email.server";
import { sendEmail, sendPlainTextEmail } from "./email.server";
import { IndexEndpointService } from "./endpoints/indexEndpoint.server";
import { RecurringEndpointIndexService } from "./endpoints/recurringEndpointIndex.server";
import { DeliverEventService } from "./events/deliverEvent.server";
Expand All @@ -21,6 +21,7 @@ import { DeliverHttpSourceRequestService } from "./sources/deliverHttpSourceRequ
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout";
import { addMissingVersionField } from "@trigger.dev/core";
import { logger } from "./logger.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -129,10 +130,20 @@ function getWorkerQueue() {
name: "workerQueue",
prisma,
cleanup: {
// cleanup once per hour
frequencyExpression: "0 * * * *",
// delete jobs that have been completed for more than 7 days
ttl: 7 * 24 * 60 * 60 * 1000,
frequencyExpression: "13,27,43 * * * *",
ttl: 7 * 24 * 60 * 60 * 1000, // 7 days
maxCount: 1000,
},
reporter: async (subject, message) => {
logger.info("workerQueue reporter", { workerMessage: message, subject });

if (env.WORKER_REPORTER_EMAIL) {
await sendPlainTextEmail({
to: env.WORKER_REPORTER_EMAIL,
subject: `workerQueue Report: ${subject}`,
text: message,
});
}
},
runnerOptions: {
connectionString: env.DATABASE_URL,
Expand Down
16 changes: 16 additions & 0 deletions packages/emails/src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export const DeliverEmailSchema = z

export type DeliverEmail = z.infer<typeof DeliverEmailSchema>;

export type SendPlainTextOptions = { to: string; subject: string; text: string };

export class EmailClient {
#client?: Resend;
#imagesBaseUrl: string;
Expand All @@ -66,6 +68,20 @@ export class EmailClient {
});
}

async sendPlainText(options: SendPlainTextOptions) {
if (this.#client) {
await this.#client.sendEmail({
from: this.#from,
to: options.to,
replyTo: this.#replyTo,
subject: options.subject,
text: options.text,
});

return;
}
}

#getTemplate(data: DeliverEmail): {
subject: string;
component: ReactElement;
Expand Down

0 comments on commit b737f7a

Please sign in to comment.