Skip to content

Commit

Permalink
tech(api): add method to remove job
Browse files Browse the repository at this point in the history
  • Loading branch information
xav-car committed Sep 24, 2024
1 parent 81d61b7 commit 2e22e9e
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 30 deletions.
8 changes: 8 additions & 0 deletions api/src/shared/infrastructure/jobs/JobQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ class JobQueue {
});
}

scheduleCronJob({ name, cron, data, options }) {
return this.pgBoss.schedule(name, cron, data, options);
}

unscheduleCronJob(name) {
return this.pgBoss.unschedule(name);
}

async stop() {
await this.pgBoss.stop({ graceful: false, timeout: 1000, destroy: true });
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { JobRepository } from '../../../src/shared/infrastructure/repositories/jobs/job-repository.js';
import { catchErr, expect, knex } from '../../test-helper.js';
import PgBoss from 'pg-boss';

import { JobQueue } from '../../../../src/shared/infrastructure/jobs/JobQueue.js';
import { JobRepository } from '../../../../src/shared/infrastructure/repositories/jobs/job-repository.js';
import { catchErr, expect, knex } from '../../../test-helper.js';
describe('Integration | Tooling | Expect Job', function () {
describe('#withJobsCount', function () {
it('succeeds when count of executed jobs is correct', async function () {
Expand Down Expand Up @@ -48,9 +50,9 @@ describe('Integration | Tooling | Expect Job', function () {
await expect('JobTest').to.have.been.performed.withJob({
name: 'JobTest',
data: { foo: 'bar' },
retrylimit: job.retryLimit,
retrydelay: job.retryDelay,
retrybackoff: job.retryBackoff,
retrylimit: job.retry.retryLimit,
retrydelay: job.retry.retryDelay,
retrybackoff: job.retry.retryBackoff,
expirein: job.expireIn,
});
});
Expand Down Expand Up @@ -180,4 +182,58 @@ describe('Integration | Tooling | Expect Job', function () {
);
});
});

describe('cronJob helper', function () {
let pgBoss, jobQueue;

beforeEach(async function () {
const pgBossInstance = new PgBoss(process.env.TEST_DATABASE_URL);
pgBoss = await pgBossInstance.start();

jobQueue = new JobQueue(pgBoss);
});

afterEach(async function () {
await jobQueue.stop();
});

describe('#withCronJobsCount', function () {
it('succeeds when count of executed jobs is correct', async function () {
// given
const jobName = 'My_Job';
// when
await jobQueue.scheduleCronJob({
name: jobName,
cron: '*/5 * * * *',
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});

// then
await expect(jobName).to.have.been.schedule.withCronJobsCount(1);
});
});

describe('#withCronJob', function () {
it('succeeds when count of executed jobs is correct', async function () {
// given
const jobName = 'My_Job';
// when
await jobQueue.scheduleCronJob({
name: jobName,
cron: '*/5 * * * *',
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});

// then
await expect(jobName).to.have.been.schedule.withCronJob({
name: jobName,
cron: '*/5 * * * *',
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});
});
});
});
});
99 changes: 74 additions & 25 deletions api/tests/shared/integration/infrastructure/jobs/JobQueue_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,88 @@ import { JobRepository } from '../../../../../src/shared/infrastructure/reposito
import { expect } from '../../../../test-helper.js';

describe('Integration | Infrastructure | Jobs | JobQueue', function () {
it('executes job when a job is added to the queue', async function () {
const name = 'JobTest';
const expectedParams = { jobParam: 1 };
const job = new JobRepository({ name });
await job.performAsync(expectedParams);
const pgBoss = new PgBoss(process.env.TEST_DATABASE_URL);
let pgBoss, jobQueue;

beforeEach(async function () {
pgBoss = new PgBoss(process.env.TEST_DATABASE_URL);
await pgBoss.start();

const jobQueue = new JobQueue(pgBoss);
jobQueue = new JobQueue(pgBoss);
});

describe('register', function () {
it('executes job when a job is added to the queue', async function () {
// given
const name = 'JobTest';
const expectedParams = { jobParam: 1 };
const job = new JobRepository({ name });

// when
await job.performAsync(expectedParams);

const promise = new Promise((resolve, reject) => {
const handler = class {
get teamConcurrency() {
return 1;
}
// then
const promise = new Promise((resolve, reject) => {
const handler = class {
get teamConcurrency() {
return 1;
}

get teamSize() {
return 2;
}
get teamSize() {
return 2;
}

handle(params) {
try {
expect(params).to.deep.contains({ data: expectedParams });
} catch (err) {
reject(err);
handle(params) {
try {
expect(params).to.deep.contains({ data: expectedParams });
} catch (err) {
reject(err);
}
resolve();
}
resolve();
}
};
};

jobQueue.register(name, handler);
jobQueue.register(name, handler);
});

return promise;
});
});

return promise;
describe('cronJob', function () {
it('save schedule job', async function () {
// given
const name = 'CronJobTest';

// when
await jobQueue.scheduleCronJob({
name,
cron: '*/5 * * * *',
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});

await expect(name).to.have.been.schedule.withCronJob({
name,
cron: '*/5 * * * *',
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});
});

it('remove schedule job', async function () {
// given
const name = 'CronJobTest';
await jobQueue.scheduleCronJob({
name,
cron: '*/5 * * * *',
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});

// when
await jobQueue.unscheduleCronJob(name);

await expect(name).to.have.been.schedule.withCronJobsCount(0);
});
});
});
25 changes: 25 additions & 0 deletions api/tests/tooling/jobs/expect-job.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export const jobChai = (knex) => (_chai, utils) => {
return this;
});

utils.addProperty(Assertion.prototype, 'schedule', function () {
return this;
});

Assertion.addMethod('withJobsCount', async function (expectedCount) {
const jobName = this._obj;
const jobs = await knex('pgboss.job').where({ name: jobName });
Expand All @@ -23,6 +27,27 @@ export const jobChai = (knex) => (_chai, utils) => {
assert.deepInclude(jobs[0], jobData, `Job '${jobName}' was performed with a different payload`);
});

Assertion.addMethod('withCronJobsCount', async function (expectedCount) {
const jobName = this._obj;
const jobs = await knex('pgboss.schedule').where({ name: jobName });
assert.strictEqual(
jobs.length,
expectedCount,
`expected ${jobName} to have been performed ${expectedCount} times, but it was performed ${jobs.length} times`,
);
});

Assertion.addMethod('withCronJob', async function (jobData) {
await this.withCronJobsCount(1);

const jobName = this._obj;
const job = await knex('pgboss.schedule')
.select('name', 'cron', 'data', 'options')
.where({ name: jobName })
.first();
assert.deepInclude(job, jobData, `Job '${jobName}' was schedule with a different payload`);
});

Assertion.addMethod('withJobPayloads', async function (payloads) {
await this.withJobsCount(payloads.length);

Expand Down

0 comments on commit 2e22e9e

Please sign in to comment.