diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 78c0a01b20..f2f2d791fe 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -144,7 +144,7 @@ export default class RedisCommandsQueue { if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); } else if (options?.abortSignal?.aborted) { - return Promise.reject(new AbortError()); + return Promise.reject(new AbortError(options?.abortSignal?.reason)); } return new Promise((resolve, reject) => { @@ -165,7 +165,7 @@ export default class RedisCommandsQueue { signal, listener: () => { this.#toWrite.remove(node); - value.reject(new AbortError()); + value.reject(new AbortError(signal.reason)); } }; signal.addEventListener('abort', value.abort.listener, { once: true }); diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index e9ed7cdd4c..9da00c721e 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -1,7 +1,7 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisClient, { RedisClientOptions, RedisClientType } from '.'; -import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, CommandTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors'; +import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, WatchError } from '../errors'; import { defineScript } from '../lua-script'; import { spy } from 'sinon'; import { once } from 'node:events'; @@ -264,16 +264,21 @@ describe('Client', () => { ); }, GLOBAL.SERVERS.OPEN); - testUtils.testWithClient('AbortError with timeout', client => { - const controller = new AbortController(); - controller.abort(); + testUtils.testWithClient('rejects with AbortError - respects given abortSignal', client => { - return assert.rejects( - client.sendCommand(['PING'], { - abortSignal: controller.signal - }), + const promise = client.sendCommand(['PING'], { + abortSignal: AbortSignal.abort("my reason") + }) + + assert.rejects( + promise, AbortError ); + + promise.catch((error: unknown) => { + assert.ok((error as string).includes("my reason")); + }); + }, { ...GLOBAL.SERVERS.OPEN, clientOptions: { @@ -282,19 +287,20 @@ describe('Client', () => { }); }); - testUtils.testWithClient('CommandTimeoutError', async client => { - const promise = assert.rejects(client.sendCommand(['PING']), AbortError); + + testUtils.testWithClient('rejects with AbortError on commandTimeout timer', async client => { const start = process.hrtime.bigint(); + const promise = client.ping(); - while (process.hrtime.bigint() - start < 50_000_000) { - // block the event loop for 50ms, to make sure the connection will timeout - } + while (process.hrtime.bigint() - start < 10_000_000) { + // block the event loop for 10ms, to make sure the connection will timeout + }; - await promise; + assert.rejects(promise, AbortError); }, { ...GLOBAL.SERVERS.OPEN, clientOptions: { - commandTimeout: 50, + commandTimeout: 10, } }); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 5ecdeeb76c..0863d01c92 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -4,7 +4,7 @@ import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsP import RedisCommandsQueue, { CommandOptions } from './commands-queue'; import { EventEmitter } from 'node:events'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; -import { ClientClosedError, ClientOfflineError, AbortError, DisconnectsClientError, WatchError } from '../errors'; +import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors'; import { URL } from 'node:url'; import { TcpSocketConnectOpts } from 'node:net'; import { PUBSUB_TYPE, PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; @@ -530,7 +530,7 @@ export default class RedisClient< async #handshake(chainId: symbol, asap: boolean) { const promises = []; const commandsWithErrorHandlers = await this.#getHandshakeCommands(); - + if (asap) commandsWithErrorHandlers.reverse() for (const { cmd, errorHandler } of commandsWithErrorHandlers) { @@ -636,7 +636,7 @@ export default class RedisClient< // since they could be connected to an older version that doesn't support them. } }); - + commands.push({ cmd: [ 'CLIENT', @@ -893,15 +893,13 @@ export default class RedisClient< return Promise.reject(new ClientOfflineError()); } - let controller: AbortController; if (this._self.#options?.commandTimeout) { - controller = new AbortController() - let abortSignal = controller.signal; + let abortSignal = AbortSignal.timeout(this._self.#options?.commandTimeout); if (options?.abortSignal) { abortSignal = AbortSignal.any([ abortSignal, - options.abortSignal - ]); + options.abortSignal + ]); } options = { ...options, @@ -911,23 +909,7 @@ export default class RedisClient< const promise = this._self.#queue.addCommand(args, options); this._self.#scheduleWrite(); - if (!this._self.#options?.commandTimeout) { - return promise; - } - - return new Promise((resolve, reject) => { - const timeoutId = setTimeout(() => { - controller.abort(); - reject(new AbortError()); - }, this._self.#options?.commandTimeout) - promise.then(result => { - clearInterval(timeoutId); - resolve(result) - }).catch(error => { - clearInterval(timeoutId); - reject(error) - }); - }) + return promise; } async SELECT(db: number): Promise { diff --git a/packages/client/lib/errors.ts b/packages/client/lib/errors.ts index db37ec1a9b..eeed0e3bb3 100644 --- a/packages/client/lib/errors.ts +++ b/packages/client/lib/errors.ts @@ -1,6 +1,6 @@ export class AbortError extends Error { - constructor() { - super('The command was aborted'); + constructor(message = '') { + super(`The command was aborted: ${message}`); } }