diff --git a/packages/sidetrack/src/effect.ts b/packages/sidetrack/src/effect.ts index 350dfca2..cd4c5996 100644 --- a/packages/sidetrack/src/effect.ts +++ b/packages/sidetrack/src/effect.ts @@ -286,9 +286,15 @@ export function makeLayer( queue, payload, current_attempt, - max_attempts - ) VALUES ('scheduled', $1, $2, 0, $3) RETURNING *`, - [queueName, payload, queues[queueName].options?.maxAttempts ?? 1], + max_attempts, + scheduled_at + ) VALUES ('scheduled', $1, $2, 0, $3, $4) RETURNING *`, + [ + queueName, + payload, + queues[queueName].options?.maxAttempts ?? 1, + options?.scheduledAt ?? new Date(), + ], ), ).pipe(Effect.map((result) => result.rows[0])); diff --git a/packages/sidetrack/test/index.test.ts b/packages/sidetrack/test/index.test.ts index fbf6c4bb..452b7522 100644 --- a/packages/sidetrack/test/index.test.ts +++ b/packages/sidetrack/test/index.test.ts @@ -275,4 +275,64 @@ describe("jobs", () => { (await sidetrack.listJobStatuses()).scheduled, ).toBeGreaterThanOrEqual(2); }); + + it("job insertion with scheduledAt option works", async () => { + const sidetrack = new SidetrackTest<{ + scheduled: { message: string }; + }>({ + databaseOptions: { + connectionString: process.env["DATABASE_URL"]!, + }, + queues: { + scheduled: { + handler: async (payload) => { + return payload; + }, + }, + }, + }); + + const futureDate = new Date(Date.now() + 60000); // 1 minute in the future + await sidetrack.insertJob( + "scheduled", + { message: "Future job" }, + { scheduledAt: futureDate }, + ); + + // Check that the job is inserted but not yet run + const jobsBeforeSchedule = await sidetrack.listJobs({ + queue: ["scheduled"], + }); + expect(jobsBeforeSchedule.length).toBe(1); + expect(jobsBeforeSchedule[0].status).toBe("scheduled"); + expect(jobsBeforeSchedule[0].scheduled_at).toEqual(futureDate); + + // TODO support time travel + // Wait for the scheduled time to pass + // await new Promise((resolve) => setTimeout(resolve, 61000)); + + // Run jobs to process the scheduled job + await sidetrack.runJobs({ queue: ["scheduled"] }); + + // Check that the job has been processed + const jobsAfterSchedule = await sidetrack.listJobs({ + queue: ["scheduled"], + }); + expect(jobsAfterSchedule.length).toBe(1); + expect(jobsAfterSchedule[0].status).toBe("scheduled"); + expect(jobsAfterSchedule[0].payload).toEqual({ message: "Future job" }); + + await sidetrack.runJobs({ + queue: ["scheduled"], + includeFutureJobs: true, + }); + + // Check that the job has been processed + const jobsAfterRun = await sidetrack.listJobs({ + queue: ["scheduled"], + }); + expect(jobsAfterRun.length).toBe(1); + expect(jobsAfterRun[0].status).toBe("completed"); + expect(jobsAfterRun[0].payload).toEqual({ message: "Future job" }); + }); });