Skip to content

Commit

Permalink
test(sidetrack): test multiple queues and crons to make sure things a…
Browse files Browse the repository at this point in the history
…re running in parallel
  • Loading branch information
aniravi24 committed Feb 14, 2025
1 parent 7b8957c commit 6790194
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 35 deletions.
117 changes: 82 additions & 35 deletions packages/sidetrack/test/cron.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,70 +103,117 @@ describe("cron jobs", () => {
});
});

it("executes cron jobs on schedule", async () => {
it("executes multiple cron jobs and processes their jobs", async () => {
const client = await pool.connect();
let cronJobId: string | undefined;
const cronJobIds: string[] = [];
let jobIds: string[] = [];

const sidetrackInstance = new SidetrackTest<{
test: { message: string };
queue1: { description: string; sequence: number };
queue2: { description: string; sequence: number };
}>({
dbClient: usePg(client),
queues: {
test: {
queue1: {
run: (payload) => Promise.resolve(payload),
},
queue2: {
run: (payload) => Promise.resolve(payload),
},
},
});

try {
// Schedule a cron job to run every second
const cronJob = await sidetrackInstance.scheduleCron(
"test",
"* * * * * *", // every second (6-part cron expression)
{ message: "cron execution test" },
// Schedule two cron jobs
const cronJob1 = await sidetrackInstance.scheduleCron(
"queue1",
"* * * * * *", // every second
{
description: "First cron job running every second",
sequence: 1,
},
);
cronJobId = cronJob.id;
cronJobIds.push(cronJob1.id);

// Verify the cron job was inserted correctly
const cronResult = await client.query(
"SELECT * FROM sidetrack_cron_jobs WHERE id = $1",
[cronJobId],
const cronJob2 = await sidetrackInstance.scheduleCron(
"queue2",
"* * * * * *", // every second
{
description: "Second cron job running every second",
sequence: 2,
},
);
expect(cronResult.rows.length).toBe(1);
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
expect(cronResult.rows[0].queue).toBe("test");
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
expect(cronResult.rows[0].cron_expression).toBe("* * * * * *");
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
expect(cronResult.rows[0].payload).toEqual({
message: "cron execution test",
});
cronJobIds.push(cronJob2.id);

// Start the service
await sidetrackInstance.start();

// Wait for a couple of executions
// Wait for jobs to be created and processed
await new Promise((resolve) => setTimeout(resolve, 3000));

// List all jobs created by the cron
const jobs = await sidetrackInstance.listJobs({ queue: ["test"] });
jobIds = jobs.map((job) => job.id);
// Get jobs from both queues
const queue1Jobs = await sidetrackInstance.listJobs({
queue: ["queue1"],
});
const queue2Jobs = await sidetrackInstance.listJobs({
queue: ["queue2"],
});

jobIds = [
...queue1Jobs.map((job) => job.id),
...queue2Jobs.map((job) => job.id),
];

// Should have at least 2 jobs created
expect(jobs.length).toBeGreaterThanOrEqual(2);
// Verify jobs were created
expect(queue1Jobs.length).toBeGreaterThanOrEqual(1);
expect(queue2Jobs.length).toBeGreaterThanOrEqual(1);

// Jobs should have the correct payload
expect(jobs[0].payload).toEqual({ message: "cron execution test" });
// Verify payloads
expect(queue1Jobs[0].payload).toEqual({
description: "First cron job running every second",
sequence: 1,
});
expect(queue2Jobs[0].payload).toEqual({
description: "Second cron job running every second",
sequence: 2,
});

// At least some jobs should be completed
// Verify at least one job from each queue was completed
expect(
jobs.some((job) => job.status === SidetrackJobStatusEnum.completed),
queue1Jobs.some(
(job) => job.status === SidetrackJobStatusEnum.completed,
),
).toBe(true);
expect(
queue2Jobs.some(
(job) => job.status === SidetrackJobStatusEnum.completed,
),
).toBe(true);

// Find completed jobs
const completedQueue1Job = queue1Jobs.find(
(job) => job.status === SidetrackJobStatusEnum.completed,
);
const completedQueue2Job = queue2Jobs.find(
(job) => job.status === SidetrackJobStatusEnum.completed,
);

// Verify jobs ran close together (within 1.5s)
if (
completedQueue1Job?.attempted_at &&
completedQueue2Job?.attempted_at
) {
const timeDiff = Math.abs(
completedQueue1Job.attempted_at.getTime() -
completedQueue2Job.attempted_at.getTime(),
);
expect(timeDiff).toBeLessThan(1500);
}
} finally {
// Clean up
await sidetrackInstance.stop();

if (cronJobId) {
// Clean up cron jobs
for (const cronJobId of cronJobIds) {
try {
await client.query("DELETE FROM sidetrack_cron_jobs WHERE id = $1", [
cronJobId,
Expand Down
65 changes: 65 additions & 0 deletions packages/sidetrack/test/poll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,71 @@ describe("polling", () => {
}
});

it("processes jobs from multiple queues concurrently", async () => {
const client = await pool.connect();
const jobIds: string[] = [];
const sidetrack = new SidetrackTest<{
queue1: { description: string; value: number };
queue2: { description: string; value: string };
}>({
dbClient: usePg(client),
pollingInterval: 100,
queues: {
queue1: {
run: async (payload) => Promise.resolve(payload),
},
queue2: {
run: async (payload) => Promise.resolve(payload),
},
},
});

try {
await sidetrack.start();

// Insert jobs to both queues
const job1 = await sidetrack.insertJob("queue1", {
description: "Job from queue 1",
value: 42,
});
jobIds.push(job1.id);

const job2 = await sidetrack.insertJob("queue2", {
description: "Job from queue 2",
value: "test",
});
jobIds.push(job2.id);

// Wait for jobs to be processed
await new Promise((resolve) => setTimeout(resolve, 200));

// Verify both jobs were processed
const processedJob1 = await sidetrack.getJob(job1.id);
const processedJob2 = await sidetrack.getJob(job2.id);

expect(processedJob1.status).toBe("completed");
expect(processedJob2.status).toBe("completed");

// Verify jobs from different queues can be processed around the same time
const job1Time = processedJob1.attempted_at!.getTime();
const job2Time = processedJob2.attempted_at!.getTime();
expect(Math.abs(job1Time - job2Time)).toBeLessThan(100); // Should be processed within the same polling interval
} finally {
await sidetrack.stop();

for (const jobId of jobIds) {
try {
await client.query("DELETE FROM sidetrack_jobs WHERE id = $1", [
jobId,
]);
} catch (e) {
console.error("Failed to clean up job:", e);
}
}
client.release();
}
});

it("stops processing jobs when polling is stopped", async () => {
const client = await pool.connect();
let jobId: string | undefined;
Expand Down

0 comments on commit 6790194

Please sign in to comment.