From 6b64967334f5afbc0de104eb26d962b6e7cb9aef Mon Sep 17 00:00:00 2001 From: Ani Ravi <5902976+aniravi24@users.noreply.github.com> Date: Fri, 25 Oct 2024 21:54:30 -0400 Subject: [PATCH] refactor(sidetrack): poll each queue separately --- packages/client-prisma/test/index.test.ts | 4 +- packages/pg-migrate/src/migration.ts | 2 +- packages/sidetrack/src/effect.ts | 120 ++++++++++++---------- packages/sidetrack/src/index.ts | 2 +- packages/sidetrack/src/types.ts | 20 ++-- packages/sidetrack/test/index.test.ts | 50 +++++---- typedoc.json | 2 +- 7 files changed, 111 insertions(+), 89 deletions(-) diff --git a/packages/client-prisma/test/index.test.ts b/packages/client-prisma/test/index.test.ts index 989639bc..f82cee3a 100644 --- a/packages/client-prisma/test/index.test.ts +++ b/packages/client-prisma/test/index.test.ts @@ -72,9 +72,7 @@ describe("jobs", () => { dbClient: usePrisma(new PrismaClient()), queues: { test: { - options: { - maxAttempts: 2, - }, + maxAttempts: 2, run: async (_payload) => { throw new Error("failure"); }, diff --git a/packages/pg-migrate/src/migration.ts b/packages/pg-migrate/src/migration.ts index abfaa56c..a1a5d5b2 100644 --- a/packages/pg-migrate/src/migration.ts +++ b/packages/pg-migrate/src/migration.ts @@ -86,7 +86,7 @@ export class Migration implements RunMigration { static async create( name: string, directory: string, - _language?: CreateOptions | "js" | "sql" | "ts" , + _language?: CreateOptions | "js" | "sql" | "ts", _ignorePattern?: string, _filenameFormat?: FilenameFormat, ) { diff --git a/packages/sidetrack/src/effect.ts b/packages/sidetrack/src/effect.ts index 3407fcfa..64b5f50d 100644 --- a/packages/sidetrack/src/effect.ts +++ b/packages/sidetrack/src/effect.ts @@ -6,9 +6,9 @@ import * as Fiber from "effect/Fiber"; import * as Layer from "effect/Layer"; import { fromIterableWith } from "effect/Record"; import * as Record from "effect/Record"; -import * as Ref from "effect/Ref"; import * as Schedule from "effect/Schedule"; import * as Stream from "effect/Stream"; +import * as Supervisor from "effect/Supervisor"; import pg from "pg"; import { SidetrackDatabaseClient, usePg } from "./client"; @@ -17,6 +17,7 @@ import SidetrackCronJobs from "./models/generated/public/SidetrackCronJobs"; import SidetrackJobs from "./models/generated/public/SidetrackJobs"; import SidetrackJobStatusEnum from "./models/generated/public/SidetrackJobStatusEnum"; import { + PollingInterval, SidetrackCancelJobOptions, SidetrackCronJobOptions, SidetrackDeactivateCronScheduleOptions, @@ -127,6 +128,16 @@ export interface SidetrackService { }; } +const pollingIntervalMs = ( + pollingInterval?: PollingInterval, + defaultValue = 2000, +) => + Duration.isDuration(pollingInterval) + ? pollingInterval + : pollingInterval + ? Duration.millis(pollingInterval) + : Duration.millis(defaultValue); + export const createSidetrackServiceTag = < Queues extends SidetrackQueuesGenericType, >() => @@ -144,6 +155,7 @@ export function makeLayer( !layerOptions.dbClient && databaseOptions ? new pg.Pool(databaseOptions) : undefined; + const dbClient: SidetrackDatabaseClient = layerOptions.dbClient ?? (pool @@ -178,25 +190,22 @@ export function makeLayer( ? (globalPayloadTransformer.deserialize(payload) as Queues[K]) : payload; - const pollingIntervalMs = Duration.isDuration(layerOptions.pollingInterval) - ? layerOptions.pollingInterval - : layerOptions.pollingInterval - ? Duration.millis(layerOptions.pollingInterval) - : Duration.millis(2000); + const pollingInterval = pollingIntervalMs(layerOptions.pollingInterval); - const pollingFiber = Ref.unsafeMake>( - Fiber.void, - ); + const pollingSupervisor = Supervisor.unsafeTrack(); - // TODO should we use a hashmap or supervisor? We'll need to convert this layer into an Effect - const cronFibers = Ref.unsafeMake( - Record.empty>(), - ); + const cronSupervisor = Supervisor.unsafeTrack(); + /** + * Each queue is polled separately for new jobs, and the polling interval can be configured per queue + */ const startPolling = () => - Effect.promise(() => - dbClient.execute( - `WITH next_jobs AS ( + Effect.forEach( + Record.toEntries(queues), + ([queueName, queue]) => + Effect.promise(() => + dbClient.execute( + `WITH next_jobs AS ( SELECT id FROM @@ -204,6 +213,7 @@ export function makeLayer( WHERE (status = 'scheduled' or status = 'retrying') AND scheduled_at <= NOW() + AND queue = $1 ORDER BY scheduled_at FOR UPDATE SKIP LOCKED @@ -220,27 +230,35 @@ export function makeLayer( FROM next_jobs ) RETURNING *`, - ), - ).pipe( - Effect.map((result) => result.rows), - Effect.flatMap((result) => - Effect.forEach( - result, - (job) => - executeJobRunner(job).pipe( - Effect.catchAllCause(Effect.logError), - Effect.forkDaemon, + [queueName], + ), + ).pipe( + Effect.map((result) => result.rows), + Effect.flatMap((result) => + Effect.forEach( + result, + (job) => + executeJobRunner(job).pipe( + Effect.catchAllCause(Effect.logError), + Effect.forkDaemon, + ), + { + concurrency: "inherit", + }, ), - { - concurrency: "inherit", - }, + ), + Effect.repeat( + Schedule.spaced( + queue.pollingInterval + ? pollingIntervalMs(queue.pollingInterval) + : pollingInterval, + ), + ), + Effect.catchAllCause(Effect.logError), + Effect.supervised(pollingSupervisor), + Effect.fork, ), - ), - // TODO customize polling and decrease polling time potentially? - Effect.repeat(Schedule.spaced(pollingIntervalMs)), - Effect.catchAllCause(Effect.logError), - Effect.forkDaemon, - Effect.flatMap((fiber) => Ref.update(pollingFiber, () => fiber)), + { concurrency: "inherit" }, ); const start = () => @@ -276,19 +294,17 @@ export function makeLayer( ).pipe(Effect.asVoid); const stop = () => - Effect.all([ - Ref.get(pollingFiber).pipe( - Effect.flatMap((fiber) => Fiber.interrupt(fiber)), - ), - Ref.get(cronFibers).pipe( - Effect.flatMap((fibers) => - Effect.all( - Record.values(fibers).map((fiber) => Fiber.interrupt(fiber)), - { concurrency: "inherit" }, - ), + Effect.all( + [ + Effect.flatMap(pollingSupervisor.value, (fibers) => + Fiber.interruptAll(fibers), ), - ), - ]).pipe(Effect.asVoid); + Effect.flatMap(cronSupervisor.value, (fibers) => + Fiber.interruptAll(fibers), + ), + ], + { concurrency: "inherit" }, + ).pipe(Effect.asVoid); const executeJobRunner = ( job: SidetrackJobs, @@ -391,7 +407,7 @@ export function makeLayer( [ queueName, payloadSerializer(queueName, payload), - queues[queueName].options?.maxAttempts, + queues[queueName]?.maxAttempts, options?.scheduledAt, options?.uniqueKey, ], @@ -464,12 +480,8 @@ export function makeLayer( ), Stream.catchAllCause(Effect.logError), Stream.runDrain, - Effect.forkDaemon, - Effect.flatMap((fiber) => - Ref.update(cronFibers, (fibers) => - Record.set(fibers, cronJob.id, fiber), - ), - ), + Effect.supervised(cronSupervisor), + Effect.fork, ); }; diff --git a/packages/sidetrack/src/index.ts b/packages/sidetrack/src/index.ts index 717c485d..993337bf 100644 --- a/packages/sidetrack/src/index.ts +++ b/packages/sidetrack/src/index.ts @@ -174,7 +174,7 @@ export class SidetrackTest< * Test utility to get a list of jobs */ async listJobs( - options?: SidetrackListJobsOptions , + options?: SidetrackListJobsOptions, ) { return this.customRunPromise( Effect.flatMap(this.sidetrackService, (service) => diff --git a/packages/sidetrack/src/types.ts b/packages/sidetrack/src/types.ts index d0868a6f..71fe4f00 100644 --- a/packages/sidetrack/src/types.ts +++ b/packages/sidetrack/src/types.ts @@ -59,6 +59,8 @@ export interface SidetrackRunJobsOptions< queue?: K | K[] | undefined; } +export type PollingInterval = Duration.Duration | number; + export interface SidetrackOptions { databaseOptions?: { connectionString: string; @@ -69,7 +71,7 @@ export interface SidetrackOptions { * Number of milliseconds to wait between polling for new jobs * Alternatively, pass in an Effect.Duration of any duration */ - pollingInterval?: Duration.Duration | number; + pollingInterval?: PollingInterval; queues: SidetrackQueues; } @@ -78,17 +80,19 @@ export class SidetrackJobRunError { constructor(readonly error: unknown) {} } -export type SidetrackJob = Omit< - SidetrackJobs, - "payload" -> & { payload: Payload }; +export type SidetrackJob = Omit & { + payload: Payload; +}; export type SidetrackQueues> = { [K in keyof Queues]: { - options?: { - maxAttempts?: number; - }; + maxAttempts?: number; payloadTransformer?: SidetrackPayloadTransformer; + /** + * Number of milliseconds to wait between polling for new jobs + * Alternatively, pass in an Effect.Duration of any duration + */ + pollingInterval?: PollingInterval; run: ( payload: Queues[K], context: { job: SidetrackJob }, diff --git a/packages/sidetrack/test/index.test.ts b/packages/sidetrack/test/index.test.ts index 8d1a2cf7..58714863 100644 --- a/packages/sidetrack/test/index.test.ts +++ b/packages/sidetrack/test/index.test.ts @@ -54,7 +54,9 @@ describe.concurrent("jobs", () => { }, }); - const job = await sidetrack.insertJob("test", { id: "string" }); + const job = await sidetrack.insertJob("test", { + id: "accepts a database client", + }); expect((await sidetrack.getJob(job.id)).status).toBe("scheduled"); }); }); @@ -67,12 +69,12 @@ describe.concurrent("jobs", () => { test: { run: async (_payload, { job }) => { expect(job.status).toBe("running"); - expect(job.payload).toMatchObject({ id: "hello success" }); + expect(job.payload).toMatchObject({ id: "run job succeeds" }); }, }, }, }); - const job = await sidetrack.insertJob("test", { id: "hello success" }); + const job = await sidetrack.insertJob("test", { id: "run job succeeds" }); await sidetrack.runJob(job.id); expect((await sidetrack.getJob(job.id)).status).toBe("completed"); }); @@ -90,7 +92,7 @@ describe.concurrent("jobs", () => { }, }, }); - const job = await sidetrack.insertJob("test", { id: "hello fail" }); + const job = await sidetrack.insertJob("test", { id: "run job fails" }); await sidetrack.runJob(job.id); expect((await sidetrack.getJob(job.id)).status).toBe("failed"); }); @@ -102,14 +104,14 @@ describe.concurrent("jobs", () => { dbClient: usePg(client), queues: { test: { - options: { maxAttempts: 2 }, + maxAttempts: 2, run: async (_payload) => { throw new Error("Hello failed"); }, }, }, }); - let job = await sidetrack.insertJob("test", { id: "hello fail" }); + let job = await sidetrack.insertJob("test", { id: "job gets retried" }); await sidetrack.runJob(job.id); expect((await sidetrack.getJob(job.id)).status).toBe("retrying"); await sidetrack.runJob(job.id); @@ -131,7 +133,7 @@ describe.concurrent("jobs", () => { }, }, }); - let job = await sidetrack.insertJob("test", { id: "hello fail" }); + let job = await sidetrack.insertJob("test", { id: "job gets cancelled" }); await sidetrack.runJob(job.id); expect((await sidetrack.getJob(job.id)).status).toBe("failed"); @@ -157,7 +159,7 @@ describe.concurrent("jobs", () => { }, }); - let job = await sidetrack.insertJob("test", { id: "hello fail" }); + let job = await sidetrack.insertJob("test", { id: "job gets deleted" }); await sidetrack.runJob(job.id); expect((await sidetrack.getJob(job.id)).status).toBe("failed"); @@ -183,7 +185,7 @@ describe.concurrent("jobs", () => { }, }); - const job = await sidetrack.insertJob("test", { id: "hello fail" }); + const job = await sidetrack.insertJob("test", { id: "run job works" }); expect((await sidetrack.getJob(job.id)).status).toBe("scheduled"); @@ -207,7 +209,7 @@ describe.concurrent("jobs", () => { }, }); - const job = await sidetrack.insertJob("test", { id: "hello world" }); + const job = await sidetrack.insertJob("test", { id: "run queue works" }); expect((await sidetrack.getJob(job.id)).status).toBe("scheduled"); @@ -237,9 +239,9 @@ describe.concurrent("jobs", () => { }, }); - await sidetrack.insertJob("one", { id: "hello world" }); - await sidetrack.insertJob("one", { id: "hello universe" }); - await sidetrack.insertJob("two", { id: "hello universe" }); + await sidetrack.insertJob("one", { id: "list job works one first" }); + await sidetrack.insertJob("one", { id: "list job works one second" }); + await sidetrack.insertJob("two", { id: "list job works two" }); expect((await sidetrack.listJobs({ queue: ["one", "two"] })).length).toBe( 3, @@ -262,8 +264,8 @@ describe.concurrent("jobs", () => { }, }); - await sidetrack.insertJob("one", { id: "hello world" }); - await sidetrack.insertJob("one", { id: "hello universe" }); + await sidetrack.insertJob("one", { id: "list job status works first" }); + await sidetrack.insertJob("one", { id: "list job status works second" }); expect((await sidetrack.listJobStatuses()).scheduled).toBe(2); }); @@ -328,20 +330,24 @@ describe.concurrent("jobs", () => { }>({ dbClient: usePg(client), payloadTransformer: { - serialize: (payload) => { - if ((payload as any).date instanceof Date) { + deserialize: (payload) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access + if (typeof (payload as any).date === "string") { return { ...payload, - date: (payload as any).date.toISOString(), + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access + date: new Date((payload as any).date), }; } return payload; }, - deserialize: (payload) => { - if (typeof (payload as any).date === "string") { + serialize: (payload) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access + if ((payload as any).date instanceof Date) { return { ...payload, - date: new Date((payload as any).date), + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access + date: (payload as any).date.toISOString(), }; } return payload; @@ -352,6 +358,7 @@ describe.concurrent("jobs", () => { run: async (payload, { job }) => { expect(payload.date).toBeInstanceOf(Date); // The original payload is serialized, so the job payload is a string + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access expect((job.payload as any).date).toBeTypeOf("string"); return payload; }, @@ -363,6 +370,7 @@ describe.concurrent("jobs", () => { const job = await sidetrack.insertJob("test", { date }); const retrievedJob = await sidetrack.getJob(job.id); + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access expect((retrievedJob.payload as any).date).toBeTypeOf("string"); await sidetrack.runJob(job.id); diff --git a/typedoc.json b/typedoc.json index 006d1bc1..b959eb45 100644 --- a/typedoc.json +++ b/typedoc.json @@ -1,9 +1,9 @@ { "entryPointStrategy": "packages", "entryPoints": ["packages/*"], + "excludeInternal": true, "includeVersion": true, "name": "Sidetrack", "out": "docs", - "excludeInternal": true, "plugin": ["typedoc-github-theme"] }