diff --git a/src/utils/lock.js b/src/utils/lock.js index 82dfd6ec1..ffb24bc65 100644 --- a/src/utils/lock.js +++ b/src/utils/lock.js @@ -39,10 +39,12 @@ module.exports = class Lock { } this[PRIVATE.WAITING].add(tryToAcquire) - timeoutId = setTimeout( - () => reject(new KafkaJSLockTimeout(this[PRIVATE.TIMEOUT_ERROR_MESSAGE]())), - this[PRIVATE.TIMEOUT] - ) + timeoutId = setTimeout(() => { + // The message should contain the number of waiters _including_ this one + const error = new KafkaJSLockTimeout(this[PRIVATE.TIMEOUT_ERROR_MESSAGE]()) + this[PRIVATE.WAITING].delete(tryToAcquire) + reject(error) + }, this[PRIVATE.TIMEOUT]) }) } diff --git a/src/utils/lock.spec.js b/src/utils/lock.spec.js index fe35c195b..dc47002c1 100644 --- a/src/utils/lock.spec.js +++ b/src/utils/lock.spec.js @@ -40,6 +40,26 @@ describe('Utils > Lock', () => { ).rejects.toHaveProperty('message', 'Timeout while acquiring lock (2 waiting locks)') }) + it('allows lock to be acquired after timeout', async () => { + const lock = new Lock({ timeout: 60 }) + const resource = jest.fn() + const callResource = async () => { + await lock.acquire() + try { + resource(Date.now()) + await sleep(100) + } finally { + lock.release() + } + } + + await expect( + Promise.all([callResource(), callResource(), callResource()]) + ).rejects.toHaveProperty('message', 'Timeout while acquiring lock (2 waiting locks)') + + await expect(callResource()).resolves.toBeUndefined() + }) + describe('with a description', () => { it('throws an error with the configured description if the lock cannot be acquired within a period', async () => { const lock = new Lock({ timeout: 60, description: 'My test mock' })