Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: address feedback and fix noticed issued on move contact #223

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ dist
src/package.json
.eslintcache
.DS_Store
upload-docs*
6 changes: 6 additions & 0 deletions src/config/config-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export const WorkerConfig = {
port: Number(environment.REDIS_PORT),
},
moveContactQueue: 'MOVE_CONTACT_QUEUE',
defaultJobOptions: {
attempts: 3, // Max retries for a failed job
backoff: {
type: 'custom',
},
}
};

const assertRedisConfig = () => {
Expand Down
9 changes: 5 additions & 4 deletions src/lib/queues.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { v4 } from 'uuid';
import { JobsOptions, Queue, ConnectionOptions } from 'bullmq';
import { JobsOptions, Queue, ConnectionOptions, DefaultJobOptions } from 'bullmq';
import { WorkerConfig } from '../config/config-worker';

export interface IQueue {
Expand All @@ -17,9 +17,9 @@ export class BullQueue implements IQueue {
public readonly name: string;
public readonly bullQueue: Queue;

constructor(queueName: string, connection: ConnectionOptions) {
constructor(queueName: string, connection: ConnectionOptions, defaultJobOptions?: DefaultJobOptions) {
this.name = queueName;
this.bullQueue = new Queue(queueName, { connection });
this.bullQueue = new Queue(queueName, { connection, defaultJobOptions });
}

public async add(jobParams: JobParams): Promise<string> {
Expand All @@ -37,5 +37,6 @@ export class BullQueue implements IQueue {

export const getMoveContactQueue = () => new BullQueue(
WorkerConfig.moveContactQueue,
WorkerConfig.redisConnection
WorkerConfig.redisConnection,
WorkerConfig.defaultJobOptions
);
7 changes: 6 additions & 1 deletion src/liquid/place/move_form.html
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ <h1 class="subtitle">To</h1>
<script type="text/javascript">
function handleResponse(event) {
const response = event?.detail?.xhr?.responseText;
if (response && response.includes('data-success="true"')) {
const parser = new DOMParser();
const doc = parser.parseFromString(response, 'text/html');

// Find the element that indicates success
const successElement = doc.querySelector('[data-success="true"]');
if (successElement) {
bulmaToast.toast({
duration: 5000,
dismissible: true,
Expand Down
87 changes: 63 additions & 24 deletions src/worker/move-contact-worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import axios from 'axios';
import { spawn } from 'child_process';
import { Worker, Job, DelayedError, ConnectionOptions } from 'bullmq';
import { Worker, Job, DelayedError, ConnectionOptions, MinimalJob } from 'bullmq';
import { DateTime } from 'luxon';

import Auth from '../lib/authentication';
Expand All @@ -12,6 +12,10 @@ export interface MoveContactData {
instanceUrl: string;
}

export interface PostponeReason {
reason: string;
}

export type JobResult = { success: boolean; message: string };

export class MoveContactWorker {
Expand All @@ -25,7 +29,13 @@ export class MoveContactWorker {
this.worker = new Worker(
queueName,
this.handleJob,
{ connection, concurrency: this.MAX_CONCURRENCY }
{
connection,
concurrency: this.MAX_CONCURRENCY,
settings: {
backoffStrategy: this.handleRetryBackoff,
}
}
);
}

Expand All @@ -40,40 +50,62 @@ export class MoveContactWorker {
const jobData: MoveContactData = job.data;

// Ensure server availability
if (await this.shouldPostpone(jobData)) {
await this.postpone(job, processingToken);
const shouldPostpone = await this.shouldPostpone(jobData);
if (shouldPostpone) {
await this.postpone(job, shouldPostpone.reason, processingToken);
throw new DelayedError();
}

const result = await this.moveContact(jobData);
const result = await this.moveContact(job);
if (!result.success) {
job.log(`[${new Date().toISOString()}]: ${result.message}`);
const errorMessage = `Job ${job.id} failed with the following error: ${result.message}`;
console.error(errorMessage);
this.logWithTimestamp(job, errorMessage);
throw new Error(errorMessage);
}

console.log(`Job completed successfully: ${job.id}`);
return true;
};

private static async shouldPostpone(jobData: MoveContactData): Promise<boolean> {
private static handleRetryBackoff = (
attemptsMade: number, type: string | undefined, err: Error | undefined, job: MinimalJob | undefined
): number => {
const {retryTimeFormatted} = this.computeRetryTime();

const fullMessage = `Job ${job?.id} will retry at ${retryTimeFormatted}.\
Attempt Number: ${attemptsMade + 1}. Due to failure: ${type}: ${err?.message}`;

this.logWithTimestamp(job, fullMessage);
return this.DELAY_IN_MILLIS;
};

private static async shouldPostpone(jobData: MoveContactData): Promise<PostponeReason | undefined> {
try {
const { instanceUrl } = jobData;
const response = await axios.get(`${instanceUrl}/api/v2/monitoring`);
const sentinelBacklog = response.data.sentinel?.backlog;
console.log(`Sentinel backlog at ${sentinelBacklog} of ${this.MAX_SENTINEL_BACKLOG}`);
return sentinelBacklog > this.MAX_SENTINEL_BACKLOG;

return sentinelBacklog > this.MAX_SENTINEL_BACKLOG
? { reason: `Sentinel backlog too high at ${sentinelBacklog}` }
: undefined;
} catch (err: any) {
const errorMessage = err.response?.data?.error?.message || err.response?.error || err?.message;
console.error('Error fetching monitoring data:', errorMessage);
return true;

// Handle server unavailability (HTTP 500 errors)
if (err.response?.status === 500) {
console.log('Server error encountered, postponing job...');
return { reason: `Server error encountered: ${errorMessage}` };
}
return undefined;
}
}

private static async moveContact(jobData: MoveContactData): Promise<JobResult> {
private static async moveContact(job: Job): Promise<JobResult> {
try {
const { contactId, parentId, instanceUrl, sessionToken } = jobData;
const { contactId, parentId, instanceUrl, sessionToken } = job.data as MoveContactData;

if (!sessionToken) {
return { success: false, message: 'Missing session token' };
Expand All @@ -86,7 +118,7 @@ export class MoveContactWorker {
const args = this.buildCommandArgs(instanceUrl, token, contactId, parentId);

this.logCommand(command, args);
await this.executeCommand(command, args);
await this.executeCommand(command, args, job);

return { success: true, message: `Job processing completed.` };
} catch (error) {
Expand All @@ -112,7 +144,7 @@ export class MoveContactWorker {
console.log('Executing command:', `${command} ${maskedArgs.join(' ')}`);
}

private static async executeCommand(command: string, args: string[]): Promise<void> {
private static async executeCommand(command: string, args: string[], job: Job): Promise<void> {
return new Promise((resolve, reject) => {
const chtProcess = spawn(command, args);
let lastOutput = '';
Expand All @@ -123,12 +155,13 @@ export class MoveContactWorker {
}, this.MAX_TIMEOUT_IN_MILLIS);

chtProcess.stdout.on('data', data => {
console.log(`cht-conf: ${data}`);
lastOutput = data.toString();
this.logWithTimestamp(job, `cht-conf output: ${data.toString()}`);
});

chtProcess.stderr.on('data', error => {
console.error(`cht-conf error: ${error}`);
lastOutput = error.toString();
this.logWithTimestamp(job, `cht-conf error: ${error.toString()}`);
});

chtProcess.on('close', code => {
Expand All @@ -141,22 +174,28 @@ export class MoveContactWorker {

chtProcess.on('error', error => {
clearTimeout(timeout);
console.log(error);
this.logWithTimestamp(job, `cht-conf process error: ${error.toString()}`);
reject(error);
});
});
}

private static async postpone(job: Job, processingToken?: string): Promise<void> {
// Calculate the retry time using luxon
private static async postpone(job: Job, retryMessage: string, processingToken?: string): Promise<void> {
const { retryTimeFormatted, retryTime } = this.computeRetryTime();
this.logWithTimestamp(job, `Job ${job.id} postponed until ${retryTimeFormatted}. Reason: ${retryMessage}.`);
await job.moveToDelayed(retryTime.toMillis(), processingToken);
}

private static computeRetryTime(): { retryTime: DateTime; retryTimeFormatted: string } {
const retryTime = DateTime.now().plus({ milliseconds: this.DELAY_IN_MILLIS });
const retryTimeFormatted = retryTime.toLocaleString(DateTime.TIME_SIMPLE);

// Delayed this job by DELAY_IN_MILLIS, using the current worker processing token
await job.moveToDelayed(retryTime.toMillis(), processingToken);
return { retryTime, retryTimeFormatted };
}

const retryMessage = `Job ${job.id} postponed until ${retryTimeFormatted}. Reason was sentinel backlog.`;
job.log(`[${new Date().toISOString()}]: ${retryMessage}`);
console.log(retryMessage);
private static logWithTimestamp(job: Job|MinimalJob|undefined, message: string): void {
const timestamp = DateTime.now().toISO();
const fullMessage = `[${timestamp}] ${message}`;
job?.log(fullMessage);
console.log(fullMessage);
}
}
3 changes: 2 additions & 1 deletion test/integration/move-contact.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe('integration/move-contact', function () {

const queueName = 'move_contact_queue';
const connection = { host: '127.0.0.1', port: 6363 };
const defaultJobOptions = { attempts: 3, backoff: { type: 'custom' } };

let sandbox: sinon.SinonSandbox;
let addStub: sinon.SinonStub;
Expand All @@ -32,7 +33,7 @@ describe('integration/move-contact', function () {

beforeEach(async () => {
sandbox = sinon.createSandbox();
moveContactQueue = new BullQueue(queueName, connection);
moveContactQueue = new BullQueue(queueName, connection, defaultJobOptions);
addStub = sandbox.stub(moveContactQueue, 'add');

handleJobStub = sandbox.stub(MoveContactWorker as any, 'handleJob');
Expand Down