Skip to content

Commit

Permalink
fix(sidetrack): use provided scheduledAt on insertJob
Browse files Browse the repository at this point in the history
  • Loading branch information
aniravi24 committed Oct 11, 2024
1 parent 1c31afd commit dd503d1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
12 changes: 9 additions & 3 deletions packages/sidetrack/src/effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,15 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
queue,
payload,
current_attempt,
max_attempts
) VALUES ('scheduled', $1, $2, 0, $3) RETURNING *`,
[queueName, payload, queues[queueName].options?.maxAttempts ?? 1],
max_attempts,
scheduled_at
) VALUES ('scheduled', $1, $2, 0, $3, $4) RETURNING *`,
[
queueName,
payload,
queues[queueName].options?.maxAttempts ?? 1,
options?.scheduledAt ?? new Date(),
],
),
).pipe(Effect.map((result) => result.rows[0]));

Expand Down
60 changes: 60 additions & 0 deletions packages/sidetrack/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,64 @@ describe("jobs", () => {
(await sidetrack.listJobStatuses()).scheduled,
).toBeGreaterThanOrEqual(2);
});

it("job insertion with scheduledAt option works", async () => {
const sidetrack = new SidetrackTest<{
scheduled: { message: string };
}>({
databaseOptions: {
connectionString: process.env["DATABASE_URL"]!,
},
queues: {
scheduled: {
handler: async (payload) => {
return payload;
},
},
},
});

const futureDate = new Date(Date.now() + 60000); // 1 minute in the future
await sidetrack.insertJob(
"scheduled",
{ message: "Future job" },
{ scheduledAt: futureDate },
);

// Check that the job is inserted but not yet run
const jobsBeforeSchedule = await sidetrack.listJobs({
queue: ["scheduled"],
});
expect(jobsBeforeSchedule.length).toBe(1);
expect(jobsBeforeSchedule[0].status).toBe("scheduled");
expect(jobsBeforeSchedule[0].scheduled_at).toEqual(futureDate);

// TODO support time travel
// Wait for the scheduled time to pass
// await new Promise((resolve) => setTimeout(resolve, 61000));

// Run jobs to process the scheduled job
await sidetrack.runJobs({ queue: ["scheduled"] });

// Check that the job has been processed
const jobsAfterSchedule = await sidetrack.listJobs({
queue: ["scheduled"],
});
expect(jobsAfterSchedule.length).toBe(1);
expect(jobsAfterSchedule[0].status).toBe("scheduled");
expect(jobsAfterSchedule[0].payload).toEqual({ message: "Future job" });

await sidetrack.runJobs({
queue: ["scheduled"],
includeFutureJobs: true,
});

// Check that the job has been processed
const jobsAfterRun = await sidetrack.listJobs({
queue: ["scheduled"],
});
expect(jobsAfterRun.length).toBe(1);
expect(jobsAfterRun[0].status).toBe("completed");
expect(jobsAfterRun[0].payload).toEqual({ message: "Future job" });
});
});

0 comments on commit dd503d1

Please sign in to comment.