From 6790194657a70c22054b1e1b1440e6f465e5f39e Mon Sep 17 00:00:00 2001 From: Ani Ravi <5902976+aniravi24@users.noreply.github.com> Date: Fri, 14 Feb 2025 01:06:01 -0500 Subject: [PATCH] test(sidetrack): test multiple queues and crons to make sure things are running in parallel --- packages/sidetrack/test/cron.test.ts | 117 +++++++++++++++++++-------- packages/sidetrack/test/poll.test.ts | 65 +++++++++++++++ 2 files changed, 147 insertions(+), 35 deletions(-) diff --git a/packages/sidetrack/test/cron.test.ts b/packages/sidetrack/test/cron.test.ts index 0143bd60..66e03a84 100644 --- a/packages/sidetrack/test/cron.test.ts +++ b/packages/sidetrack/test/cron.test.ts @@ -103,70 +103,117 @@ describe("cron jobs", () => { }); }); - it("executes cron jobs on schedule", async () => { + it("executes multiple cron jobs and processes their jobs", async () => { const client = await pool.connect(); - let cronJobId: string | undefined; + const cronJobIds: string[] = []; let jobIds: string[] = []; + const sidetrackInstance = new SidetrackTest<{ - test: { message: string }; + queue1: { description: string; sequence: number }; + queue2: { description: string; sequence: number }; }>({ dbClient: usePg(client), queues: { - test: { + queue1: { + run: (payload) => Promise.resolve(payload), + }, + queue2: { run: (payload) => Promise.resolve(payload), }, }, }); try { - // Schedule a cron job to run every second - const cronJob = await sidetrackInstance.scheduleCron( - "test", - "* * * * * *", // every second (6-part cron expression) - { message: "cron execution test" }, + // Schedule two cron jobs + const cronJob1 = await sidetrackInstance.scheduleCron( + "queue1", + "* * * * * *", // every second + { + description: "First cron job running every second", + sequence: 1, + }, ); - cronJobId = cronJob.id; + cronJobIds.push(cronJob1.id); - // Verify the cron job was inserted correctly - const cronResult = await client.query( - "SELECT * FROM sidetrack_cron_jobs WHERE id = $1", - [cronJobId], + const cronJob2 = await sidetrackInstance.scheduleCron( + "queue2", + "* * * * * *", // every second + { + description: "Second cron job running every second", + sequence: 2, + }, ); - expect(cronResult.rows.length).toBe(1); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - expect(cronResult.rows[0].queue).toBe("test"); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - expect(cronResult.rows[0].cron_expression).toBe("* * * * * *"); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - expect(cronResult.rows[0].payload).toEqual({ - message: "cron execution test", - }); + cronJobIds.push(cronJob2.id); // Start the service await sidetrackInstance.start(); - // Wait for a couple of executions + // Wait for jobs to be created and processed await new Promise((resolve) => setTimeout(resolve, 3000)); - // List all jobs created by the cron - const jobs = await sidetrackInstance.listJobs({ queue: ["test"] }); - jobIds = jobs.map((job) => job.id); + // Get jobs from both queues + const queue1Jobs = await sidetrackInstance.listJobs({ + queue: ["queue1"], + }); + const queue2Jobs = await sidetrackInstance.listJobs({ + queue: ["queue2"], + }); + + jobIds = [ + ...queue1Jobs.map((job) => job.id), + ...queue2Jobs.map((job) => job.id), + ]; - // Should have at least 2 jobs created - expect(jobs.length).toBeGreaterThanOrEqual(2); + // Verify jobs were created + expect(queue1Jobs.length).toBeGreaterThanOrEqual(1); + expect(queue2Jobs.length).toBeGreaterThanOrEqual(1); - // Jobs should have the correct payload - expect(jobs[0].payload).toEqual({ message: "cron execution test" }); + // Verify payloads + expect(queue1Jobs[0].payload).toEqual({ + description: "First cron job running every second", + sequence: 1, + }); + expect(queue2Jobs[0].payload).toEqual({ + description: "Second cron job running every second", + sequence: 2, + }); - // At least some jobs should be completed + // Verify at least one job from each queue was completed expect( - jobs.some((job) => job.status === SidetrackJobStatusEnum.completed), + queue1Jobs.some( + (job) => job.status === SidetrackJobStatusEnum.completed, + ), ).toBe(true); + expect( + queue2Jobs.some( + (job) => job.status === SidetrackJobStatusEnum.completed, + ), + ).toBe(true); + + // Find completed jobs + const completedQueue1Job = queue1Jobs.find( + (job) => job.status === SidetrackJobStatusEnum.completed, + ); + const completedQueue2Job = queue2Jobs.find( + (job) => job.status === SidetrackJobStatusEnum.completed, + ); + + // Verify jobs ran close together (within 1.5s) + if ( + completedQueue1Job?.attempted_at && + completedQueue2Job?.attempted_at + ) { + const timeDiff = Math.abs( + completedQueue1Job.attempted_at.getTime() - + completedQueue2Job.attempted_at.getTime(), + ); + expect(timeDiff).toBeLessThan(1500); + } } finally { - // Clean up await sidetrackInstance.stop(); - if (cronJobId) { + // Clean up cron jobs + for (const cronJobId of cronJobIds) { try { await client.query("DELETE FROM sidetrack_cron_jobs WHERE id = $1", [ cronJobId, diff --git a/packages/sidetrack/test/poll.test.ts b/packages/sidetrack/test/poll.test.ts index 5bd2955d..03e163a1 100644 --- a/packages/sidetrack/test/poll.test.ts +++ b/packages/sidetrack/test/poll.test.ts @@ -94,6 +94,71 @@ describe("polling", () => { } }); + it("processes jobs from multiple queues concurrently", async () => { + const client = await pool.connect(); + const jobIds: string[] = []; + const sidetrack = new SidetrackTest<{ + queue1: { description: string; value: number }; + queue2: { description: string; value: string }; + }>({ + dbClient: usePg(client), + pollingInterval: 100, + queues: { + queue1: { + run: async (payload) => Promise.resolve(payload), + }, + queue2: { + run: async (payload) => Promise.resolve(payload), + }, + }, + }); + + try { + await sidetrack.start(); + + // Insert jobs to both queues + const job1 = await sidetrack.insertJob("queue1", { + description: "Job from queue 1", + value: 42, + }); + jobIds.push(job1.id); + + const job2 = await sidetrack.insertJob("queue2", { + description: "Job from queue 2", + value: "test", + }); + jobIds.push(job2.id); + + // Wait for jobs to be processed + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Verify both jobs were processed + const processedJob1 = await sidetrack.getJob(job1.id); + const processedJob2 = await sidetrack.getJob(job2.id); + + expect(processedJob1.status).toBe("completed"); + expect(processedJob2.status).toBe("completed"); + + // Verify jobs from different queues can be processed around the same time + const job1Time = processedJob1.attempted_at!.getTime(); + const job2Time = processedJob2.attempted_at!.getTime(); + expect(Math.abs(job1Time - job2Time)).toBeLessThan(100); // Should be processed within the same polling interval + } finally { + await sidetrack.stop(); + + for (const jobId of jobIds) { + try { + await client.query("DELETE FROM sidetrack_jobs WHERE id = $1", [ + jobId, + ]); + } catch (e) { + console.error("Failed to clean up job:", e); + } + } + client.release(); + } + }); + it("stops processing jobs when polling is stopped", async () => { const client = await pool.connect(); let jobId: string | undefined;