diff --git a/packages/deployment/src/queue/BullQueue.ts b/packages/deployment/src/queue/BullQueue.ts index 7b8188aa5..c6c6ac744 100644 --- a/packages/deployment/src/queue/BullQueue.ts +++ b/packages/deployment/src/queue/BullQueue.ts @@ -21,6 +21,10 @@ export interface BullQueueConfig { retryAttempts?: number; } +interface BullWorker extends Closeable { + get worker(): Worker; +} + /** * TaskQueue implementation for BullMQ */ @@ -30,6 +34,9 @@ export class BullQueue { private activePromise?: Promise; + private activeWorkers: Record = {}; + private activeJobs = 0; + public createWorker( name: string, executor: (data: TaskPayload) => Promise, @@ -42,6 +49,7 @@ export class BullQueue // This is by far not optimal - since it still picks up 1 task per queue but waits until // computing them, so that leads to bad performance over multiple workers. // For that we need to restructure tasks to be flowing through a single queue however + this.activeJobs += 1; // TODO Use worker.pause() while (this.activePromise !== undefined) { @@ -54,10 +62,27 @@ export class BullQueue }); this.activePromise = promise; + // Pause all other workers + const workersToPause = Object.entries(this.activeWorkers).filter( + ([key]) => key !== name + ); + await Promise.all( + workersToPause.map(([, workerToPause]) => + workerToPause.worker.pause(true) + ) + ); + const result = await executor(job.data); this.activePromise = undefined; void resOutside(); + this.activeJobs -= 1; + if (this.activeJobs === 0) { + Object.entries(this.activeWorkers).forEach(([, resumingWorker]) => + resumingWorker.worker.resume() + ); + } + return result; }, { @@ -76,11 +101,16 @@ export class BullQueue log.error(error); }); - return { + const instantiatedWorker = { async close() { await worker.close(); }, + get worker() { + return worker; + }, }; + this.activeWorkers[name] = instantiatedWorker; + return instantiatedWorker; } public async getQueue(queueName: string): Promise {