diff --git a/api/src/shared/infrastructure/jobs/JobQueue.js b/api/src/shared/infrastructure/jobs/JobQueue.js index a1fcd4070b6..757e4ec82d1 100644 --- a/api/src/shared/infrastructure/jobs/JobQueue.js +++ b/api/src/shared/infrastructure/jobs/JobQueue.js @@ -21,6 +21,14 @@ class JobQueue { }); } + scheduleCronJob({ name, cron, data, options }) { + return this.pgBoss.schedule(name, cron, data, options); + } + + unscheduleCronJob(name) { + return this.pgBoss.unschedule(name); + } + async stop() { await this.pgBoss.stop({ graceful: false, timeout: 1000, destroy: true }); } diff --git a/api/tests/tooling/jobs/expect-job.test.js b/api/tests/integration/tooling/jobs/expect-job.test.js similarity index 75% rename from api/tests/tooling/jobs/expect-job.test.js rename to api/tests/integration/tooling/jobs/expect-job.test.js index 351a3f0930b..417c359318e 100644 --- a/api/tests/tooling/jobs/expect-job.test.js +++ b/api/tests/integration/tooling/jobs/expect-job.test.js @@ -1,6 +1,8 @@ -import { JobRepository } from '../../../src/shared/infrastructure/repositories/jobs/job-repository.js'; -import { catchErr, expect, knex } from '../../test-helper.js'; +import PgBoss from 'pg-boss'; +import { JobQueue } from '../../../../src/shared/infrastructure/jobs/JobQueue.js'; +import { JobRepository } from '../../../../src/shared/infrastructure/repositories/jobs/job-repository.js'; +import { catchErr, expect, knex } from '../../../test-helper.js'; describe('Integration | Tooling | Expect Job', function () { describe('#withJobsCount', function () { it('succeeds when count of executed jobs is correct', async function () { @@ -48,9 +50,9 @@ describe('Integration | Tooling | Expect Job', function () { await expect('JobTest').to.have.been.performed.withJob({ name: 'JobTest', data: { foo: 'bar' }, - retrylimit: job.retryLimit, - retrydelay: job.retryDelay, - retrybackoff: job.retryBackoff, + retrylimit: job.retry.retryLimit, + retrydelay: job.retry.retryDelay, + retrybackoff: job.retry.retryBackoff, expirein: job.expireIn, }); }); @@ -180,4 +182,58 @@ describe('Integration | Tooling | Expect Job', function () { ); }); }); + + describe('cronJob helper', function () { + let pgBoss, jobQueue; + + beforeEach(async function () { + const pgBossInstance = new PgBoss(process.env.TEST_DATABASE_URL); + pgBoss = await pgBossInstance.start(); + + jobQueue = new JobQueue(pgBoss); + }); + + afterEach(async function () { + await jobQueue.stop(); + }); + + describe('#withCronJobsCount', function () { + it('succeeds when count of executed jobs is correct', async function () { + // given + const jobName = 'My_Job'; + // when + await jobQueue.scheduleCronJob({ + name: jobName, + cron: '*/5 * * * *', + data: { my_data: 'awesome_data' }, + options: { tz: 'Europe/Paris' }, + }); + + // then + await expect(jobName).to.have.been.schedule.withCronJobsCount(1); + }); + }); + + describe('#withCronJob', function () { + it('succeeds when count of executed jobs is correct', async function () { + // given + const jobName = 'My_Job'; + // when + await jobQueue.scheduleCronJob({ + name: jobName, + cron: '*/5 * * * *', + data: { my_data: 'awesome_data' }, + options: { tz: 'Europe/Paris' }, + }); + + // then + await expect(jobName).to.have.been.schedule.withCronJob({ + name: jobName, + cron: '*/5 * * * *', + data: { my_data: 'awesome_data' }, + options: { tz: 'Europe/Paris' }, + }); + }); + }); + }); }); diff --git a/api/tests/shared/integration/infrastructure/jobs/JobQueue_test.js b/api/tests/shared/integration/infrastructure/jobs/JobQueue_test.js index 860c7d36280..87092dad516 100644 --- a/api/tests/shared/integration/infrastructure/jobs/JobQueue_test.js +++ b/api/tests/shared/integration/infrastructure/jobs/JobQueue_test.js @@ -5,39 +5,88 @@ import { JobRepository } from '../../../../../src/shared/infrastructure/reposito import { expect } from '../../../../test-helper.js'; describe('Integration | Infrastructure | Jobs | JobQueue', function () { - it('executes job when a job is added to the queue', async function () { - const name = 'JobTest'; - const expectedParams = { jobParam: 1 }; - const job = new JobRepository({ name }); - await job.performAsync(expectedParams); - const pgBoss = new PgBoss(process.env.TEST_DATABASE_URL); + let pgBoss, jobQueue; + + beforeEach(async function () { + pgBoss = new PgBoss(process.env.TEST_DATABASE_URL); await pgBoss.start(); - const jobQueue = new JobQueue(pgBoss); + jobQueue = new JobQueue(pgBoss); + }); + + describe('register', function () { + it('executes job when a job is added to the queue', async function () { + // given + const name = 'JobTest'; + const expectedParams = { jobParam: 1 }; + const job = new JobRepository({ name }); + + // when + await job.performAsync(expectedParams); - const promise = new Promise((resolve, reject) => { - const handler = class { - get teamConcurrency() { - return 1; - } + // then + const promise = new Promise((resolve, reject) => { + const handler = class { + get teamConcurrency() { + return 1; + } - get teamSize() { - return 2; - } + get teamSize() { + return 2; + } - handle(params) { - try { - expect(params).to.deep.contains({ data: expectedParams }); - } catch (err) { - reject(err); + handle(params) { + try { + expect(params).to.deep.contains({ data: expectedParams }); + } catch (err) { + reject(err); + } + resolve(); } - resolve(); - } - }; + }; - jobQueue.register(name, handler); + jobQueue.register(name, handler); + }); + + return promise; }); + }); - return promise; + describe('cronJob', function () { + it('save schedule job', async function () { + // given + const name = 'CronJobTest'; + + // when + await jobQueue.scheduleCronJob({ + name, + cron: '*/5 * * * *', + data: { my_data: 'awesome_data' }, + options: { tz: 'Europe/Paris' }, + }); + + await expect(name).to.have.been.schedule.withCronJob({ + name, + cron: '*/5 * * * *', + data: { my_data: 'awesome_data' }, + options: { tz: 'Europe/Paris' }, + }); + }); + + it('remove schedule job', async function () { + // given + const name = 'CronJobTest'; + await jobQueue.scheduleCronJob({ + name, + cron: '*/5 * * * *', + data: { my_data: 'awesome_data' }, + options: { tz: 'Europe/Paris' }, + }); + + // when + await jobQueue.unscheduleCronJob(name); + + await expect(name).to.have.been.schedule.withCronJobsCount(0); + }); }); }); diff --git a/api/tests/tooling/jobs/expect-job.js b/api/tests/tooling/jobs/expect-job.js index b8c844f0943..9b9d8cbd222 100644 --- a/api/tests/tooling/jobs/expect-job.js +++ b/api/tests/tooling/jobs/expect-job.js @@ -5,6 +5,10 @@ export const jobChai = (knex) => (_chai, utils) => { return this; }); + utils.addProperty(Assertion.prototype, 'schedule', function () { + return this; + }); + Assertion.addMethod('withJobsCount', async function (expectedCount) { const jobName = this._obj; const jobs = await knex('pgboss.job').where({ name: jobName }); @@ -23,6 +27,27 @@ export const jobChai = (knex) => (_chai, utils) => { assert.deepInclude(jobs[0], jobData, `Job '${jobName}' was performed with a different payload`); }); + Assertion.addMethod('withCronJobsCount', async function (expectedCount) { + const jobName = this._obj; + const jobs = await knex('pgboss.schedule').where({ name: jobName }); + assert.strictEqual( + jobs.length, + expectedCount, + `expected ${jobName} to have been performed ${expectedCount} times, but it was performed ${jobs.length} times`, + ); + }); + + Assertion.addMethod('withCronJob', async function (jobData) { + await this.withCronJobsCount(1); + + const jobName = this._obj; + const job = await knex('pgboss.schedule') + .select('name', 'cron', 'data', 'options') + .where({ name: jobName }) + .first(); + assert.deepInclude(job, jobData, `Job '${jobName}' was schedule with a different payload`); + }); + Assertion.addMethod('withJobPayloads', async function (payloads) { await this.withJobsCount(payloads.length);