From 691d0d3ac4b76f5152bb54e1d22d63468e185be5 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 1 Feb 2025 15:49:01 -0600 Subject: [PATCH 1/2] fix(worker): process job when closing and it was move to active --- src/classes/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 58bd3257a7..8da73a4170 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -819,7 +819,7 @@ will never work with more accuracy than 1ms. */ fetchNextCallback = () => true, jobsInProgress: Set<{ job: Job; ts: number }>, ): Promise> { - if (!job || this.closing || this.paused) { + if (!job) { return; } From 697521b64d4062849f726a732e19b81bf5d8a8d9 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 1 Feb 2025 18:00:02 -0600 Subject: [PATCH 2/2] fix(worker): wait for pending jobs when closing --- src/classes/worker.ts | 11 +++++++++-- src/commands/addDelayedJob-6.lua | 2 +- src/commands/addParentJob-4.lua | 2 +- src/commands/addPrioritizedJob-8.lua | 2 +- src/commands/addStandardJob-8.lua | 2 +- src/commands/includes/deduplicateJob.lua | 18 +++++++++--------- tests/test_events.ts | 18 ++++++++++++++++-- 7 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 8da73a4170..21f012807a 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -189,6 +189,7 @@ export class Worker< private stalledCheckStopper?: () => void; private waiting: Promise | null = null; + private waitingRun: Promise = Promise.resolve(); private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler protected _jobScheduler: JobScheduler; @@ -431,6 +432,12 @@ export class Worker< } async run() { + this.waitingRun = this.waitRun(); + + await this.waitingRun; + } + + async waitRun() { if (!this.processFn) { throw new Error('No process function is defined.'); } @@ -535,8 +542,8 @@ export class Worker< } } + await asyncFifoQueue.waitAll(); this.running = false; - return await asyncFifoQueue.waitAll(); } catch (error) { this.running = false; throw error; @@ -1164,7 +1171,7 @@ will never work with more accuracy than 1ms. */ } if (this.asyncFifoQueue) { - await this.asyncFifoQueue.waitAll(); + await this.waitingRun; } reconnect && (await this.blockingConnection.reconnect()); diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index f615fe20f6..d12fd4c660 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -88,7 +88,7 @@ else end end -local deduplicationJobId = deduplicateJob(args[1], opts['de'], +local deduplicationJobId = deduplicateJob(opts['de'], jobId, deduplicationKey, eventsKey, maxEvents) if deduplicationJobId then return deduplicationJobId diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index bde421fca8..f4b324d328 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -81,7 +81,7 @@ else end end -local deduplicationJobId = deduplicateJob(args[1], opts['de'], +local deduplicationJobId = deduplicateJob(opts['de'], jobId, deduplicationKey, eventsKey, maxEvents) if deduplicationJobId then return deduplicationJobId diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 4fa6dbc616..fc992102c6 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -90,7 +90,7 @@ else end end -local deduplicationJobId = deduplicateJob(args[1], opts['de'], +local deduplicationJobId = deduplicateJob(opts['de'], jobId, deduplicationKey, eventsKey, maxEvents) if deduplicationJobId then return deduplicationJobId diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index 7005e91af7..134ea9f45e 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -94,7 +94,7 @@ else end end -local deduplicationJobId = deduplicateJob(args[1], opts['de'], +local deduplicationJobId = deduplicateJob(opts['de'], jobId, deduplicationKey, eventsKey, maxEvents) if deduplicationJobId then return deduplicationJobId diff --git a/src/commands/includes/deduplicateJob.lua b/src/commands/includes/deduplicateJob.lua index ff873e599c..ef1245bdbc 100644 --- a/src/commands/includes/deduplicateJob.lua +++ b/src/commands/includes/deduplicateJob.lua @@ -1,24 +1,24 @@ --[[ - Function to debounce a job. + Function to deduplicate a job. ]] -local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents) +local function deduplicateJob(deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents) local deduplicationId = deduplicationOpts and deduplicationOpts['id'] if deduplicationId then local ttl = deduplicationOpts['ttl'] local deduplicationKeyExists if ttl then - deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX') + deduplicationKeyExists = rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX') else - deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX') + deduplicationKeyExists = rcall('SET', deduplicationKey, jobId, 'NX') end - if deduplicationKeyExists then - local currentDebounceJobId = rcall('GET', deduplicationKey) + if deduplicationKeyExists == false then + local currentDeduplicatedJobId = rcall('GET', deduplicationKey) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "debounced", "jobId", currentDebounceJobId, "debounceId", deduplicationId) + "debounced", "jobId", currentDeduplicatedJobId, "debounceId", deduplicationId) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "deduplicated", "jobId", currentDebounceJobId, "deduplicationId", deduplicationId) - return currentDebounceJobId + "deduplicated", "jobId", currentDeduplicatedJobId, "deduplicationId", deduplicationId) + return currentDeduplicatedJobId end end end diff --git a/tests/test_events.ts b/tests/test_events.ts index 3ec45cc9ce..9c5c01d5b0 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -587,9 +587,15 @@ describe('events', function () { ); let debouncedCounter = 0; - queueEvents.on('debounced', ({ jobId }) => { - debouncedCounter++; + const debounced = new Promise(resolve => { + queueEvents.on('debounced', () => { + debouncedCounter++; + if (debouncedCounter == 2) { + resolve(); + } + }); }); + await job.remove(); await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); @@ -602,6 +608,11 @@ describe('events', function () { { debounce: { id: 'a1' } }, ); await secondJob.remove(); + await debounced; + + const getDeboundedJobId = await queue.getDebounceJobId('a1'); + + expect(getDeboundedJobId).to.be.null; expect(debouncedCounter).to.be.equal(2); }); @@ -822,6 +833,9 @@ describe('events', function () { await secondJob.remove(); await deduplication; + const getDeduplicationJobId = await queue.getDeduplicationJobId('a1'); + + expect(getDeduplicationJobId).to.be.null; expect(deduplicatedCounter).to.be.equal(2); }); });