diff --git a/packages/client-common/__tests__/integration/abort_request.test.ts b/packages/client-common/__tests__/integration/abort_request.test.ts index 4ee91c7d..92b1543a 100644 --- a/packages/client-common/__tests__/integration/abort_request.test.ts +++ b/packages/client-common/__tests__/integration/abort_request.test.ts @@ -13,22 +13,6 @@ describe('abort request', () => { }) describe('select', () => { - it('cancels a select query before it is sent', async () => { - const controller = new AbortController() - const selectPromise = client.query({ - query: 'SELECT sleep(3)', - format: 'CSV', - abort_signal: controller.signal, - }) - controller.abort() - - await expectAsync(selectPromise).toBeRejectedWith( - jasmine.objectContaining({ - message: jasmine.stringMatching('The user aborted a request'), - }), - ) - }) - it('cancels a select query after it is sent', async () => { const controller = new AbortController() const selectPromise = client.query({ diff --git a/packages/client-common/src/utils/sleep.ts b/packages/client-common/src/utils/sleep.ts index f5b17f23..5ed82bc0 100644 --- a/packages/client-common/src/utils/sleep.ts +++ b/packages/client-common/src/utils/sleep.ts @@ -1,3 +1,7 @@ -export async function sleep(ms: number) { - await new Promise((resolve) => setTimeout(resolve, ms)) +export async function sleep(ms: number): Promise { + await new Promise((resolve) => + setTimeout(() => { + resolve(void 0) + }, ms), + ) } diff --git a/packages/client-node/__tests__/integration/node_abort_request.test.ts b/packages/client-node/__tests__/integration/node_abort_request.test.ts index f681def6..afed9784 100644 --- a/packages/client-node/__tests__/integration/node_abort_request.test.ts +++ b/packages/client-node/__tests__/integration/node_abort_request.test.ts @@ -5,7 +5,7 @@ import { createTestClient, guid } from '@test/utils' import type Stream from 'stream' import { makeObjectStream } from '../utils/stream' -describe('[Node.js] abort request streaming', () => { +describe('[Node.js] abort request', () => { let client: ClickHouseClient beforeEach(() => { @@ -16,6 +16,23 @@ describe('[Node.js] abort request streaming', () => { await client.close() }) + it('cancels a select query before it is sent', async () => { + const controller = new AbortController() + const selectPromise = client.query({ + query: 'SELECT sleep(3)', + format: 'CSV', + abort_signal: controller.signal, + }) + controller.abort() + + await expectAsync(selectPromise).toBeRejectedWith( + jasmine.objectContaining({ + // this happens even before we instantiate the request and its listeners, so that is just a plain AbortError + name: 'AbortError', + }), + ) + }) + it('cancels a select query while reading response', async () => { const controller = new AbortController() const selectPromise = client @@ -136,7 +153,8 @@ describe('[Node.js] abort request streaming', () => { await expectAsync(insertPromise).toBeRejectedWith( jasmine.objectContaining({ - message: jasmine.stringMatching('The user aborted a request'), + // this happens even before we instantiate the request and its listeners, so that is just a plain AbortError + name: 'AbortError', }), ) }) diff --git a/packages/client-node/__tests__/integration/node_client.test.ts b/packages/client-node/__tests__/integration/node_client.test.ts index 994891a5..601d6805 100644 --- a/packages/client-node/__tests__/integration/node_client.test.ts +++ b/packages/client-node/__tests__/integration/node_client.test.ts @@ -144,7 +144,7 @@ describe('[Node.js] Client', () => { const selectPromise = client.query({ query: 'SELECT * FROM system.numbers LIMIT 5', }) - emitResponseBody(clientRequest, 'hi') + await emitResponseBody(clientRequest, 'hi') await selectPromise } diff --git a/packages/client-node/__tests__/unit/node_client.test.ts b/packages/client-node/__tests__/unit/node_client.test.ts index de42c3fd..70aaedc5 100644 --- a/packages/client-node/__tests__/unit/node_client.test.ts +++ b/packages/client-node/__tests__/unit/node_client.test.ts @@ -9,7 +9,7 @@ import * as c from '../../src/connection/create_connection' describe('[Node.js] createClient', () => { it('throws on incorrect "url" config value', () => { - expect(() => createClient({ url: 'foo' })).toThrow( + expect(() => createClient({ url: 'foobar' })).toThrow( jasmine.objectContaining({ message: jasmine.stringContaining('ClickHouse URL is malformed.'), }), diff --git a/packages/client-node/__tests__/unit/node_connection.test.ts b/packages/client-node/__tests__/unit/node_connection.test.ts index 440c5cc9..5c63be08 100644 --- a/packages/client-node/__tests__/unit/node_connection.test.ts +++ b/packages/client-node/__tests__/unit/node_connection.test.ts @@ -52,7 +52,7 @@ describe('[Node.js] Connection', () => { query: 'SELECT * FROM system.numbers LIMIT 5', }) const responseBody1 = 'foobar' - emitResponseBody(request1, responseBody1) + await emitResponseBody(request1, responseBody1) const queryResult1 = await selectPromise1 const request2 = stubClientRequest() @@ -62,7 +62,7 @@ describe('[Node.js] Connection', () => { query: 'SELECT * FROM system.numbers LIMIT 5', }) const responseBody2 = 'qaz' - emitResponseBody(request2, responseBody2) + await emitResponseBody(request2, responseBody2) const queryResult2 = await selectPromise2 await assertConnQueryResult(queryResult1, responseBody1) @@ -93,7 +93,7 @@ describe('[Node.js] Connection', () => { query_id, }) const responseBody = 'foobar' - emitResponseBody(request, responseBody) + await emitResponseBody(request, responseBody) const { stream } = await selectPromise expect(await getAsText(stream)).toBe(responseBody) @@ -119,7 +119,7 @@ describe('[Node.js] Connection', () => { query: 'SELECT * FROM system.numbers LIMIT 5', }) const responseBody1 = 'foobar' - emitResponseBody(request1, responseBody1) + await emitResponseBody(request1, responseBody1) const queryResult1 = await execPromise1 const request2 = stubClientRequest() @@ -129,7 +129,7 @@ describe('[Node.js] Connection', () => { query: 'SELECT * FROM system.numbers LIMIT 5', }) const responseBody2 = 'qaz' - emitResponseBody(request2, responseBody2) + await emitResponseBody(request2, responseBody2) const queryResult2 = await execPromise2 await assertConnQueryResult(queryResult1, responseBody1) @@ -162,7 +162,7 @@ describe('[Node.js] Connection', () => { query_id, }) const responseBody = 'foobar' - emitResponseBody(request, responseBody) + await emitResponseBody(request, responseBody) const { stream } = await execPromise expect(await getAsText(stream)).toBe(responseBody) @@ -187,7 +187,7 @@ describe('[Node.js] Connection', () => { const cmdPromise = adapter.command({ query: 'SELECT * FROM system.numbers LIMIT 5', }) - emitResponseBody(request1, 'Ok.') + await emitResponseBody(request1, 'Ok.') const { query_id } = await cmdPromise const request2 = stubClientRequest() @@ -196,7 +196,7 @@ describe('[Node.js] Connection', () => { const cmdPromise2 = adapter.command({ query: 'SELECT * FROM system.numbers LIMIT 5', }) - emitResponseBody(request2, 'Ok.') + await emitResponseBody(request2, 'Ok.') const { query_id: query_id2 } = await cmdPromise2 expect(query_id).not.toEqual(query_id2) @@ -223,7 +223,7 @@ describe('[Node.js] Connection', () => { query: 'SELECT * FROM system.numbers LIMIT 5', query_id, }) - emitResponseBody(request, 'Ok.') + await emitResponseBody(request, 'Ok.') const { query_id: result_query_id } = await cmdPromise expect(httpRequestStub).toHaveBeenCalledTimes(1) @@ -250,7 +250,7 @@ describe('[Node.js] Connection', () => { values: 'foobar', }) const responseBody1 = 'foobar' - emitResponseBody(request1, responseBody1) + await emitResponseBody(request1, responseBody1) const { query_id: queryId1 } = await insertPromise1 const request2 = stubClientRequest() @@ -261,7 +261,7 @@ describe('[Node.js] Connection', () => { values: 'foobar', }) const responseBody2 = 'qaz' - emitResponseBody(request2, responseBody2) + await emitResponseBody(request2, responseBody2) const { query_id: queryId2 } = await insertPromise2 assertQueryId(queryId1) @@ -293,7 +293,7 @@ describe('[Node.js] Connection', () => { query_id, }) const responseBody = 'foobar' - emitResponseBody(request, responseBody) + await emitResponseBody(request, responseBody) await insertPromise const [url] = httpRequestStub.calls.mostRecent().args diff --git a/packages/client-node/__tests__/unit/node_connection_compression.test.ts b/packages/client-node/__tests__/unit/node_connection_compression.test.ts index b03b14be..52b469e4 100644 --- a/packages/client-node/__tests__/unit/node_connection_compression.test.ts +++ b/packages/client-node/__tests__/unit/node_connection_compression.test.ts @@ -13,10 +13,15 @@ import { } from '../utils/http_stubs' describe('Node.js Connection compression', () => { + let httpRequestStub: jasmine.Spy + beforeEach(() => { + httpRequestStub = spyOn(Http, 'request') + }) + describe('response decompression', () => { it('hints ClickHouse server to send a gzip compressed response if compress_request: true', async () => { const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + httpRequestStub.and.returnValue(request) const adapter = buildHttpConnection({ compression: { @@ -41,7 +46,8 @@ describe('Node.js Connection compression', () => { it('does not send a compression algorithm hint if compress_request: false', async () => { const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + httpRequestStub.and.returnValue(request) + const adapter = buildHttpConnection({ compression: { decompress_response: false, @@ -54,7 +60,7 @@ describe('Node.js Connection compression', () => { }) const responseBody = 'foobar' - emitResponseBody(request, responseBody) + await emitResponseBody(request, responseBody) const queryResult = await selectPromise await assertConnQueryResult(queryResult, responseBody) @@ -66,7 +72,8 @@ describe('Node.js Connection compression', () => { it('uses request-specific settings over config settings', async () => { const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + httpRequestStub.and.returnValue(request) + const adapter = buildHttpConnection({ compression: { decompress_response: false, @@ -94,7 +101,8 @@ describe('Node.js Connection compression', () => { it('decompresses a gzip response', async () => { const request = stubClientRequest() - spyOn(Http, 'request').and.returnValue(request) + httpRequestStub.and.returnValue(request) + const adapter = buildHttpConnection({ compression: { decompress_response: true, @@ -115,7 +123,7 @@ describe('Node.js Connection compression', () => { it('throws on an unexpected encoding', async () => { const request = stubClientRequest() - spyOn(Http, 'request').and.returnValue(request) + httpRequestStub.and.returnValue(request) const adapter = buildHttpConnection({ compression: { decompress_response: true, @@ -138,7 +146,7 @@ describe('Node.js Connection compression', () => { it('provides decompression error to a stream consumer', async () => { const request = stubClientRequest() - spyOn(Http, 'request').and.returnValue(request) + httpRequestStub.and.returnValue(request) const adapter = buildHttpConnection({ compression: { decompress_response: true, @@ -151,6 +159,7 @@ describe('Node.js Connection compression', () => { }) // No GZIP encoding for the body here + await sleep(0) request.emit( 'response', buildIncomingMessage({ @@ -196,13 +205,12 @@ describe('Node.js Connection compression', () => { next() }, final() { - Zlib.unzip(chunks, (err, result) => { + Zlib.unzip(chunks, (_err, result) => { finalResult = result }) }, }) as ClientRequest - - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + httpRequestStub.and.returnValue(request) void adapter.insert({ query: 'INSERT INTO insert_compression_table', @@ -210,9 +218,10 @@ describe('Node.js Connection compression', () => { }) // trigger stream pipeline + await sleep(0) request.emit('socket', socketStub) - await sleep(100) + expect(finalResult!.toString('utf8')).toEqual(values) expect(httpRequestStub).toHaveBeenCalledTimes(1) const calledWith = httpRequestStub.calls.mostRecent().args[1] diff --git a/packages/client-node/__tests__/utils/http_stubs.ts b/packages/client-node/__tests__/utils/http_stubs.ts index e5c79a9a..4304dedd 100644 --- a/packages/client-node/__tests__/utils/http_stubs.ts +++ b/packages/client-node/__tests__/utils/http_stubs.ts @@ -1,5 +1,5 @@ import { LogWriter } from '@clickhouse/client-common' -import { TestLogger } from '@test/utils' +import { sleep, TestLogger } from '@test/utils' import { randomUUID } from '@test/utils/guid' import type Http from 'http' import type { ClientRequest } from 'http' @@ -63,10 +63,11 @@ export function stubClientRequest(): ClientRequest { return request } -export function emitResponseBody( +export async function emitResponseBody( request: Http.ClientRequest, body: string | Buffer | undefined, ) { + await sleep(0) request.emit( 'response', buildIncomingMessage({ @@ -80,6 +81,7 @@ export async function emitCompressedBody( body: string | Buffer, encoding = 'gzip', ) { + await sleep(0) const compressedBody = await gzip(body) request.emit( 'response', diff --git a/packages/client-web/__tests__/integration/web_abort_request.test.ts b/packages/client-web/__tests__/integration/web_abort_request.test.ts index 9a7acaa2..426fee70 100644 --- a/packages/client-web/__tests__/integration/web_abort_request.test.ts +++ b/packages/client-web/__tests__/integration/web_abort_request.test.ts @@ -1,7 +1,7 @@ import type { ClickHouseClient, Row } from '@clickhouse/client-common' import { createTestClient } from '@test/utils' -describe('[Web] abort request streaming', () => { +describe('[Web] abort request', () => { let client: ClickHouseClient beforeEach(() => { @@ -12,6 +12,23 @@ describe('[Web] abort request streaming', () => { await client.close() }) + // a slightly different assertion vs the same Node.js test + it('cancels a select query before it is sent', async () => { + const controller = new AbortController() + const selectPromise = client.query({ + query: 'SELECT sleep(3)', + format: 'CSV', + abort_signal: controller.signal, + }) + controller.abort() + + await expectAsync(selectPromise).toBeRejectedWith( + jasmine.objectContaining({ + message: jasmine.stringMatching('The user aborted a request'), + }), + ) + }) + it('cancels a select query while reading response', async () => { const controller = new AbortController() const selectPromise = client