From 8d59d9ffb97014755e42530e1ac7e37863045ebf Mon Sep 17 00:00:00 2001 From: paulpascal Date: Tue, 12 Nov 2024 03:42:51 +0000 Subject: [PATCH] fix: address feedback and fix noticed issued on move contact --- src/config/config-worker.ts | 6 +++ src/lib/queues.ts | 9 ++-- src/worker/move-contact-worker.ts | 77 ++++++++++++++++++--------- test/integration/move-contact.spec.ts | 5 +- 4 files changed, 67 insertions(+), 30 deletions(-) diff --git a/src/config/config-worker.ts b/src/config/config-worker.ts index 693451e0..ae6ee3f9 100644 --- a/src/config/config-worker.ts +++ b/src/config/config-worker.ts @@ -12,6 +12,12 @@ export const WorkerConfig = { port: Number(environment.REDIS_PORT), }, moveContactQueue: 'MOVE_CONTACT_QUEUE', + defaultJobOptions: { + attempts: 3, // Max retries for a failed job + backoff: { + type: 'custom', + }, + } }; const assertRedisConfig = () => { diff --git a/src/lib/queues.ts b/src/lib/queues.ts index 2b68de2a..398495d4 100644 --- a/src/lib/queues.ts +++ b/src/lib/queues.ts @@ -1,5 +1,5 @@ import { v4 } from 'uuid'; -import { JobsOptions, Queue, ConnectionOptions } from 'bullmq'; +import { JobsOptions, Queue, ConnectionOptions, DefaultJobOptions } from 'bullmq'; import { WorkerConfig } from '../config/config-worker'; export interface IQueue { @@ -17,9 +17,9 @@ export class BullQueue implements IQueue { public readonly name: string; public readonly bullQueue: Queue; - constructor(queueName: string, connection: ConnectionOptions) { + constructor(queueName: string, connection: ConnectionOptions, defaultJobOptions?: DefaultJobOptions) { this.name = queueName; - this.bullQueue = new Queue(queueName, { connection }); + this.bullQueue = new Queue(queueName, { connection, defaultJobOptions }); } public async add(jobParams: JobParams): Promise { @@ -37,5 +37,6 @@ export class BullQueue implements IQueue { export const getMoveContactQueue = () => new BullQueue( WorkerConfig.moveContactQueue, - WorkerConfig.redisConnection + WorkerConfig.redisConnection, + WorkerConfig.defaultJobOptions ); diff --git a/src/worker/move-contact-worker.ts b/src/worker/move-contact-worker.ts index 0b18da1f..55e6aa85 100644 --- a/src/worker/move-contact-worker.ts +++ b/src/worker/move-contact-worker.ts @@ -1,6 +1,6 @@ import axios from 'axios'; import { spawn } from 'child_process'; -import { Worker, Job, DelayedError, ConnectionOptions } from 'bullmq'; +import { Worker, Job, DelayedError, ConnectionOptions, MinimalJob } from 'bullmq'; import { DateTime } from 'luxon'; import Auth from '../lib/authentication'; @@ -25,7 +25,13 @@ export class MoveContactWorker { this.worker = new Worker( queueName, this.handleJob, - { connection, concurrency: this.MAX_CONCURRENCY } + { + connection, + concurrency: this.MAX_CONCURRENCY, + settings: { + backoffStrategy: this.handleRetryBackoff, + } + } ); } @@ -40,16 +46,17 @@ export class MoveContactWorker { const jobData: MoveContactData = job.data; // Ensure server availability - if (await this.shouldPostpone(jobData)) { - await this.postpone(job, processingToken); + const { shouldPostpone, reason } = await this.shouldPostpone(jobData); + if (shouldPostpone) { + await this.postpone(job, reason, processingToken); throw new DelayedError(); } - const result = await this.moveContact(jobData); + const result = await this.moveContact(job); if (!result.success) { - job.log(`[${new Date().toISOString()}]: ${result.message}`); const errorMessage = `Job ${job.id} failed with the following error: ${result.message}`; console.error(errorMessage); + this.jobLogWithTimestamp(job, errorMessage); throw new Error(errorMessage); } @@ -57,23 +64,38 @@ export class MoveContactWorker { return true; }; - private static async shouldPostpone(jobData: MoveContactData): Promise { + private static handleRetryBackoff = ( + attemptsMade: number, type: string | undefined, err: Error | undefined, job: MinimalJob | undefined + ): number => { + const {retryTimeFormatted} = this.computeRetryTime(); + const fullMessage = `Job ${job?.id} will be retried ${attemptsMade + 1} time at ${retryTimeFormatted}. Due to failure: ${type}: ${err?.message}`; + this.jobLogWithTimestamp(job, fullMessage); + return this.DELAY_IN_MILLIS; + }; + + private static async shouldPostpone(jobData: MoveContactData): Promise<{ shouldPostpone: boolean; reason: string }> { try { const { instanceUrl } = jobData; const response = await axios.get(`${instanceUrl}/api/v2/monitoring`); const sentinelBacklog = response.data.sentinel?.backlog; console.log(`Sentinel backlog at ${sentinelBacklog} of ${this.MAX_SENTINEL_BACKLOG}`); - return sentinelBacklog > this.MAX_SENTINEL_BACKLOG; + return { shouldPostpone: sentinelBacklog > this.MAX_SENTINEL_BACKLOG, reason: `Sentinel backlog too high at ${sentinelBacklog}` }; } catch (err: any) { const errorMessage = err.response?.data?.error?.message || err.response?.error || err?.message; console.error('Error fetching monitoring data:', errorMessage); - return true; + + // Handle server unavailability (HTTP 500 errors) + if (err.response?.status === 500) { + console.log('Server error encountered, postponing job...'); + return { shouldPostpone: true, reason: `Server error encountered: ${errorMessage}` }; + } + return { shouldPostpone: false, reason: '' }; } } - private static async moveContact(jobData: MoveContactData): Promise { + private static async moveContact(job: Job): Promise { try { - const { contactId, parentId, instanceUrl, sessionToken } = jobData; + const { contactId, parentId, instanceUrl, sessionToken } = job.data as MoveContactData; if (!sessionToken) { return { success: false, message: 'Missing session token' }; @@ -86,7 +108,7 @@ export class MoveContactWorker { const args = this.buildCommandArgs(instanceUrl, token, contactId, parentId); this.logCommand(command, args); - await this.executeCommand(command, args); + await this.executeCommand(command, args, job); return { success: true, message: `Job processing completed.` }; } catch (error) { @@ -112,7 +134,7 @@ export class MoveContactWorker { console.log('Executing command:', `${command} ${maskedArgs.join(' ')}`); } - private static async executeCommand(command: string, args: string[]): Promise { + private static async executeCommand(command: string, args: string[], job: Job): Promise { return new Promise((resolve, reject) => { const chtProcess = spawn(command, args); let lastOutput = ''; @@ -123,12 +145,13 @@ export class MoveContactWorker { }, this.MAX_TIMEOUT_IN_MILLIS); chtProcess.stdout.on('data', data => { - console.log(`cht-conf: ${data}`); lastOutput = data.toString(); + this.jobLogWithTimestamp(job, `cht-conf output: ${data.toString()}`); }); chtProcess.stderr.on('data', error => { - console.error(`cht-conf error: ${error}`); + lastOutput = error.toString(); + this.jobLogWithTimestamp(job, `cht-conf error: ${error.toString()}`); }); chtProcess.on('close', code => { @@ -141,22 +164,28 @@ export class MoveContactWorker { chtProcess.on('error', error => { clearTimeout(timeout); - console.log(error); + this.jobLogWithTimestamp(job, `cht-conf process error: ${error.toString()}`); reject(error); }); }); } - private static async postpone(job: Job, processingToken?: string): Promise { - // Calculate the retry time using luxon + private static async postpone(job: Job, retryMessage: string, processingToken?: string): Promise { + const { retryTimeFormatted, retryTime } = this.computeRetryTime(); + this.jobLogWithTimestamp(job, `Job ${job.id} postponed until ${retryTimeFormatted}. Reason: ${retryMessage}.`); + await job.moveToDelayed(retryTime.toMillis(), processingToken); + } + + private static computeRetryTime(): { retryTime: DateTime; retryTimeFormatted: string } { const retryTime = DateTime.now().plus({ milliseconds: this.DELAY_IN_MILLIS }); const retryTimeFormatted = retryTime.toLocaleString(DateTime.TIME_SIMPLE); - - // Delayed this job by DELAY_IN_MILLIS, using the current worker processing token - await job.moveToDelayed(retryTime.toMillis(), processingToken); + return { retryTime, retryTimeFormatted }; + } - const retryMessage = `Job ${job.id} postponed until ${retryTimeFormatted}. Reason was sentinel backlog.`; - job.log(`[${new Date().toISOString()}]: ${retryMessage}`); - console.log(retryMessage); + private static jobLogWithTimestamp(job: Job|MinimalJob|undefined, message: string): void { + const timestamp = new Date().toISOString(); + const fullMessage = `[${timestamp}] ${message}`; + job?.log(fullMessage); + console.log(fullMessage); } } diff --git a/test/integration/move-contact.spec.ts b/test/integration/move-contact.spec.ts index d4b348bb..ab5469dd 100644 --- a/test/integration/move-contact.spec.ts +++ b/test/integration/move-contact.spec.ts @@ -20,6 +20,7 @@ describe('integration/move-contact', function () { const queueName = 'move_contact_queue'; const connection = { host: '127.0.0.1', port: 6363 }; + const defaultJobOptions = { attempts: 3, backoff: { type: 'custom' } }; let sandbox: sinon.SinonSandbox; let addStub: sinon.SinonStub; @@ -32,7 +33,7 @@ describe('integration/move-contact', function () { beforeEach(async () => { sandbox = sinon.createSandbox(); - moveContactQueue = new BullQueue(queueName, connection); + moveContactQueue = new BullQueue(queueName, connection, defaultJobOptions); addStub = sandbox.stub(moveContactQueue, 'add'); handleJobStub = sandbox.stub(MoveContactWorker as any, 'handleJob'); @@ -116,7 +117,7 @@ describe('integration/move-contact', function () { await new Promise(resolve => setTimeout(resolve, 1000)); // Check if the job has failed - const job = await moveContactQueue['bullQueue'].getJob(jobId) as Job; + const job = await moveContactQueue['bullQueue'].getJob(jobId) as unknown as Job; expect(await job.getState()).to.equal('failed'); }); });