From b2b5cb073e7e26b8eee1df1b9ec1d469ca343b70 Mon Sep 17 00:00:00 2001 From: Bastian Krol Date: Wed, 10 Jul 2024 14:12:28 +0200 Subject: [PATCH] test: add test for kafkajs tracing --- eslint.config.js | 1 + src/init.ts | 4 +- src/util/kafkajs.ts | 6 + test/apps/empty-event-loop/package.json | 1 + test/apps/express-typescript/package.json | 1 + test/apps/kafkajs/app.ts | 129 +++++++++++ test/apps/kafkajs/package-lock.json | 205 ++++++++++++++++++ test/apps/kafkajs/package.json | 18 ++ .../collector/CollectorChildProcessWrapper.ts | 36 ++- test/collector/Sink.ts | 11 + test/collector/index.ts | 9 +- test/integration/ChildProcessWrapper.ts | 14 +- test/integration/constants.ts | 4 + test/integration/kafka_test.ts | 99 +++++++++ test/integration/test.ts | 66 ++---- 15 files changed, 543 insertions(+), 61 deletions(-) create mode 100644 src/util/kafkajs.ts create mode 100644 test/apps/kafkajs/app.ts create mode 100644 test/apps/kafkajs/package-lock.json create mode 100644 test/apps/kafkajs/package.json create mode 100644 test/integration/constants.ts create mode 100644 test/integration/kafka_test.ts diff --git a/eslint.config.js b/eslint.config.js index 94db26c..03591fa 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -45,6 +45,7 @@ module.exports = tsEsLint.config( 2, ], 'mocha/no-exclusive-tests': 'error', + 'no-case-declarations': 'off', 'simpleImportSort/exports': 'error', 'unusedImports/no-unused-imports': 'error', }, diff --git a/src/init.ts b/src/init.ts index 6262fb8..879253f 100644 --- a/src/init.ts +++ b/src/init.ts @@ -13,12 +13,12 @@ import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; import { NodeSDK, NodeSDKConfiguration } from '@opentelemetry/sdk-node'; import { BatchSpanProcessor, SpanProcessor } from '@opentelemetry/sdk-trace-base'; import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node'; -import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs'; import PodUidDetector from './detectors/node/opentelemetry-resource-detector-kubernetes-pod'; import ServiceNameFallbackDetector from './detectors/node/opentelemetry-resource-detector-service-name-fallback'; import { FileSpanExporter } from './util/FileSpanExporter'; import { hasOptedIn, hasOptedOut, parseNumericEnvironmentVariableWithDefault } from './util/environment'; +import { kafkaJsInstrumentation } from './util/kafkajs'; const logPrefix = 'Dash0 OpenTelemetry distribution for Node.js:'; const debugOutput = hasOptedIn('DASH0_DEBUG'); @@ -67,7 +67,7 @@ const configuration: Partial = { instrumentations: [ // getNodeAutoInstrumentations(createInstrumentationConfig()), - new KafkaJsInstrumentation(), + kafkaJsInstrumentation, ], resource: resource(), resourceDetectors: resourceDetectors(), diff --git a/src/util/kafkajs.ts b/src/util/kafkajs.ts new file mode 100644 index 0000000..0e7c627 --- /dev/null +++ b/src/util/kafkajs.ts @@ -0,0 +1,6 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs'; + +export const kafkaJsInstrumentation = new KafkaJsInstrumentation(); diff --git a/test/apps/empty-event-loop/package.json b/test/apps/empty-event-loop/package.json index 4e824d9..02f89b5 100644 --- a/test/apps/empty-event-loop/package.json +++ b/test/apps/empty-event-loop/package.json @@ -1,6 +1,7 @@ { "name": "dash0-app-under-test-empty-event-loop", "version": "1.0.0", + "private": true, "description": "", "main": "app.ts", "scripts": { diff --git a/test/apps/express-typescript/package.json b/test/apps/express-typescript/package.json index 83b99fd..7fad4ea 100644 --- a/test/apps/express-typescript/package.json +++ b/test/apps/express-typescript/package.json @@ -1,6 +1,7 @@ { "name": "dash0-app-under-test-express-typescript", "version": "1.0.0", + "private": true, "description": "", "main": "app.ts", "scripts": { diff --git a/test/apps/kafkajs/app.ts b/test/apps/kafkajs/app.ts new file mode 100644 index 0000000..5534278 --- /dev/null +++ b/test/apps/kafkajs/app.ts @@ -0,0 +1,129 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +import { inspect } from 'node:util'; +import * as kafkajs from 'kafkajs'; +import { + Consumer, + ConsumerRunConfig, + EachMessagePayload, + Kafka, + KafkaJSError, + KafkaMessage, + Producer, + RecordMetadata, +} from 'kafkajs'; +import { IpcRequest } from '../../util/ipc'; +import sendToParentProcess, { sendReadyToParentProcess } from '../../util/sendToParentProcess'; + +// for testing purposes, we need a reference to the kafakjs instrumentation +import { kafkaJsInstrumentation } from '../../../src/util/kafkajs'; + +const kafka = new Kafka({ + clientId: 'dash0-kafkajs-tests', + brokers: ['dummy:1302'], +}); + +let producer: Producer; +let consumer: Consumer; +let runConfig: ConsumerRunConfig | undefined; + +process.on('message', async message => { + const ipcRequest = message; + const command = ipcRequest.command; + const id = ipcRequest.id; + switch (command) { + case 'produce-message': + await produceMessage(id); + break; + case 'consume-message': + await consumeMessage(id); + break; + default: + const errorMsg = `Unknown message: ${inspect(message)}`; + sendToParentProcess({ id, error: errorMsg }); + console.error(errorMsg); + } +}); + +(function initKafka() { + producer = kafka.producer(); + + getRunConfig(); + + // Since we patch the consumer, we need to disable/enable the instrumentation to make sure it the instrumentation is + // still applied, see + // https://github.com/open-telemetry/opentelemetry-js-contrib/blob/2c32e5869ef9b6d582ba4da02623a030309bcaf3/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts#L142-L143. + kafkaJsInstrumentation.disable(); + kafkaJsInstrumentation.enable(); + + consumer = kafka.consumer({ + groupId: 'testing-group-id', + }); + + sendReadyToParentProcess(); +})(); + +function getRunConfig() { + const origConsumerFactory = kafkajs.Kafka.prototype.consumer; + kafkajs.Kafka.prototype.consumer = function (...args): Consumer { + const consumer: Consumer = origConsumerFactory.apply(this, args); + consumer.run = function (config?: ConsumerRunConfig): Promise { + runConfig = config; + return Promise.resolve(); + }; + return consumer; + }; +} + +async function produceMessage(id: number) { + try { + const res: RecordMetadata[] = await producer.send({ + topic: 'test-topic', + messages: [ + { + value: 'testing message content', + }, + ], + }); + sendToParentProcess({ id, ok: true, res }); + } catch (err) { + if (err instanceof KafkaJSError && err.message === 'The producer is disconnected') { + // This is expected, since we are not starting an actual Kafka broker for the integration tests, hence we are + // also not connected to any broker. + sendToParentProcess({ id, ok: true }); + } else { + sendToParentProcess({ id, error: err }); + } + } +} + +async function consumeMessage(id: number) { + consumer.run({ + eachMessage: async (): Promise => {}, + }); + const payload = createPayload(); + await runConfig?.eachMessage!(payload); + sendToParentProcess({ id, ok: true }); +} + +function createPayload(): EachMessagePayload { + return { + topic: 'test-topic', + partition: 0, + message: createMessage('456'), + heartbeat: async () => {}, + pause: () => () => {}, + }; +} + +function createMessage(offset: string): KafkaMessage { + return { + key: Buffer.from('message-key', 'utf8'), + value: Buffer.from('message content', 'utf8'), + timestamp: '1234', + size: 10, + attributes: 1, + offset: offset, + }; +} diff --git a/test/apps/kafkajs/package-lock.json b/test/apps/kafkajs/package-lock.json new file mode 100644 index 0000000..e9fbefb --- /dev/null +++ b/test/apps/kafkajs/package-lock.json @@ -0,0 +1,205 @@ +{ + "name": "dash0-app-under-test-kafkajs", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "dash0-app-under-test-kafkajs", + "version": "1.0.0", + "license": "Apache-2.0", + "dependencies": { + "@types/node": "^20.12.7", + "kafkajs": "^2.2.4", + "ts-node": "^10.9.2", + "typescript": "^5.4.5" + } + }, + "node_modules/@cspotcode/source-map-support": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz", + "integrity": "sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==", + "dependencies": { + "@jridgewell/trace-mapping": "0.3.9" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/@jridgewell/resolve-uri": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@jridgewell/resolve-uri/-/resolve-uri-3.1.2.tgz", + "integrity": "sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw==", + "engines": { + "node": ">=6.0.0" + } + }, + "node_modules/@jridgewell/sourcemap-codec": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@jridgewell/sourcemap-codec/-/sourcemap-codec-1.5.0.tgz", + "integrity": "sha512-gv3ZRaISU3fjPAgNsriBRqGWQL6quFx04YMPW/zD8XMLsU32mhCCbfbO6KZFLjvYpCZ8zyDEgqsgf+PwPaM7GQ==" + }, + "node_modules/@jridgewell/trace-mapping": { + "version": "0.3.9", + "resolved": "https://registry.npmjs.org/@jridgewell/trace-mapping/-/trace-mapping-0.3.9.tgz", + "integrity": "sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==", + "dependencies": { + "@jridgewell/resolve-uri": "^3.0.3", + "@jridgewell/sourcemap-codec": "^1.4.10" + } + }, + "node_modules/@tsconfig/node10": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.11.tgz", + "integrity": "sha512-DcRjDCujK/kCk/cUe8Xz8ZSpm8mS3mNNpta+jGCA6USEDfktlNvm1+IuZ9eTcDbNk41BHwpHHeW+N1lKCz4zOw==" + }, + "node_modules/@tsconfig/node12": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.11.tgz", + "integrity": "sha512-cqefuRsh12pWyGsIoBKJA9luFu3mRxCA+ORZvA4ktLSzIuCUtWVxGIuXigEwO5/ywWFMZ2QEGKWvkZG1zDMTag==" + }, + "node_modules/@tsconfig/node14": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.3.tgz", + "integrity": "sha512-ysT8mhdixWK6Hw3i1V2AeRqZ5WfXg1G43mqoYlM2nc6388Fq5jcXyr5mRsqViLx/GJYdoL0bfXD8nmF+Zn/Iow==" + }, + "node_modules/@tsconfig/node16": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.4.tgz", + "integrity": "sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==" + }, + "node_modules/@types/node": { + "version": "20.14.10", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.14.10.tgz", + "integrity": "sha512-MdiXf+nDuMvY0gJKxyfZ7/6UFsETO7mGKF54MVD/ekJS6HdFtpZFBgrh6Pseu64XTb2MLyFPlbW6hj8HYRQNOQ==", + "dependencies": { + "undici-types": "~5.26.4" + } + }, + "node_modules/acorn": { + "version": "8.12.1", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.12.1.tgz", + "integrity": "sha512-tcpGyI9zbizT9JbV6oYE477V6mTlXvvi0T0G3SNIYE2apm/G5huBa1+K89VGeovbg+jycCrfhl3ADxErOuO6Jg==", + "bin": { + "acorn": "bin/acorn" + }, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/acorn-walk": { + "version": "8.3.3", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.3.3.tgz", + "integrity": "sha512-MxXdReSRhGO7VlFe1bRG/oI7/mdLV9B9JJT0N8vZOhF7gFRR5l3M8W9G8JxmKV+JC5mGqJ0QvqfSOLsCPa4nUw==", + "dependencies": { + "acorn": "^8.11.0" + }, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==" + }, + "node_modules/create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==" + }, + "node_modules/diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "engines": { + "node": ">=0.3.1" + } + }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==" + }, + "node_modules/ts-node": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.2.tgz", + "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", + "dependencies": { + "@cspotcode/source-map-support": "^0.8.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "v8-compile-cache-lib": "^3.0.1", + "yn": "3.1.1" + }, + "bin": { + "ts-node": "dist/bin.js", + "ts-node-cwd": "dist/bin-cwd.js", + "ts-node-esm": "dist/bin-esm.js", + "ts-node-script": "dist/bin-script.js", + "ts-node-transpile-only": "dist/bin-transpile.js", + "ts-script": "dist/bin-script-deprecated.js" + }, + "peerDependencies": { + "@swc/core": ">=1.2.50", + "@swc/wasm": ">=1.2.50", + "@types/node": "*", + "typescript": ">=2.7" + }, + "peerDependenciesMeta": { + "@swc/core": { + "optional": true + }, + "@swc/wasm": { + "optional": true + } + } + }, + "node_modules/typescript": { + "version": "5.5.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.5.3.tgz", + "integrity": "sha512-/hreyEujaB0w76zKo6717l3L0o/qEUtRgdvUBvlkhoWeOVMjMuHNHk0BRBzikzuGDqNmPQbg5ifMEqsHLiIUcQ==", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, + "node_modules/undici-types": { + "version": "5.26.5", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==" + }, + "node_modules/v8-compile-cache-lib": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", + "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==" + }, + "node_modules/yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "engines": { + "node": ">=6" + } + } + } +} diff --git a/test/apps/kafkajs/package.json b/test/apps/kafkajs/package.json new file mode 100644 index 0000000..b2b2743 --- /dev/null +++ b/test/apps/kafkajs/package.json @@ -0,0 +1,18 @@ +{ + "name": "dash0-app-under-test-kafkajs", + "version": "1.0.0", + "private": true, + "description": "", + "main": "app.ts", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "Bastian Krol ", + "license": "Apache-2.0", + "dependencies": { + "@types/node": "^20.12.7", + "kafkajs": "^2.2.4", + "ts-node": "^10.9.2", + "typescript": "^5.4.5" + } +} diff --git a/test/collector/CollectorChildProcessWrapper.ts b/test/collector/CollectorChildProcessWrapper.ts index b626e99..31dec88 100644 --- a/test/collector/CollectorChildProcessWrapper.ts +++ b/test/collector/CollectorChildProcessWrapper.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 import ChildProcessWrapper, { ChildProcessWrapperOptions } from '../integration/ChildProcessWrapper'; -import { collector } from '../integration/rootHooks'; import { OpenTelemetryData, Stats } from './Sink'; export default class CollectorChildProcessWrapper extends ChildProcessWrapper { @@ -11,34 +10,55 @@ export default class CollectorChildProcessWrapper extends ChildProcessWrapper { } async fetchStats() { - return await collector().sendRequest({ command: 'stats' }); + return await this.sendIpcRequest({ command: 'stats' }); } async hasTraces() { - const stats = await collector().fetchStats(); + const stats = await this.fetchStats(); return stats.traces >= 1; } + async fetchTraces() { + if (!(await this.hasTraces())) { + throw new Error('The collector never received any spans.'); + } + return (await this.fetchTelemetry()).traces; + } + async hasMetrics() { - const stats = await collector().fetchStats(); + const stats = await this.fetchStats(); return stats.metrics >= 1; } + async fetchMetrics() { + if (!(await this.hasMetrics())) { + throw new Error('The collector never received any metrics.'); + } + return (await this.fetchTelemetry()).metrics; + } + async hasLogs() { - const stats = await collector().fetchStats(); + const stats = await this.fetchStats(); return stats.logs >= 1; } + async fetchLogRecords() { + if (!(await this.hasLogs())) { + throw new Error('The collector never received any log records.'); + } + return (await this.fetchTelemetry()).logs; + } + async hasTelemetry() { - const stats = await collector().fetchStats(); + const stats = await this.fetchStats(); return stats.traces >= 1 || stats.metrics >= 1 || stats.logs >= 1; } async fetchTelemetry() { - return await collector().sendRequest({ command: 'telemetry' }); + return await this.sendIpcRequest({ command: 'telemetry' }); } async clear() { - await collector().sendRequest({ command: 'clear' }); + await this.sendIpcRequest({ command: 'clear' }); } } diff --git a/test/collector/Sink.ts b/test/collector/Sink.ts index f7976fe..b1849fd 100644 --- a/test/collector/Sink.ts +++ b/test/collector/Sink.ts @@ -34,6 +34,17 @@ export default class Sink { if (this.telemetry.traces.length > Sink.MAX_ITEMS) { throw new Error('Too many traces, please clear the mock collector between test runs.'); } + traces.resource_spans?.forEach(resourceSpan => { + resourceSpan.scope_spans?.forEach(scopeSpan => { + scopeSpan.spans?.forEach(span => { + if (typeof span.kind === 'number') { + // Enumerations in protobuf are 1-based, while the JS constants are 0-based; we convert the raw protobuf + // data back to JS OTel conventions. + span.kind--; + } + }); + }); + }); this.telemetry.traces.push(traces); } diff --git a/test/collector/index.ts b/test/collector/index.ts index 2a6445b..9c2d93b 100644 --- a/test/collector/index.ts +++ b/test/collector/index.ts @@ -29,16 +29,19 @@ function registerIpcMessageListener() { const id = ipcRequest.id; switch (command) { case 'stats': - sendToParentProcess({ id, ...sink.stats() }); + sendToParentProcess({ id, ok: true, ...sink.stats() }); break; case 'telemetry': - sendToParentProcess({ id, ...sink.getTelemetry() }); + sendToParentProcess({ id, ok: true, ...sink.getTelemetry() }); break; case 'clear': sink.clear(); + sendToParentProcess({ id, ok: true }); break; default: - console.error(`Unknown message: ${inspect(message)}`); + const errorMsg = `Unknown message: ${inspect(message)}`; + sendToParentProcess({ id, error: errorMsg }); + console.error(errorMsg); } }); } diff --git a/test/integration/ChildProcessWrapper.ts b/test/integration/ChildProcessWrapper.ts index a4d5e48..671f036 100644 --- a/test/integration/ChildProcessWrapper.ts +++ b/test/integration/ChildProcessWrapper.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. // SPDX-License-Identifier: Apache-2.0 +import { inspect } from 'node:util'; import { ChildProcess, fork, ForkOptions } from 'node:child_process'; import EventEmitter from 'node:events'; import fs from 'node:fs/promises'; @@ -138,15 +139,20 @@ export default class ChildProcessWrapper { }); } - async sendRequest(message: object): Promise { - return new Promise(resolve => { + async sendIpcRequest(message: object): Promise { + return new Promise((resolve, reject) => { const requestId = this.nextIpcRequestId++; - this.childProcess?.send({ + const messageWithId = { id: requestId, ...message, - }); + }; + this.childProcess?.send(messageWithId); const eventName = String(requestId); this.responseEmitter.once(eventName, response => { + if (!response.ok) { + reject(new Error(`IPC request ${inspect(messageWithId)} did not succeed: ${inspect(response)}`)); + return; + } resolve(response); }); }); diff --git a/test/integration/constants.ts b/test/integration/constants.ts new file mode 100644 index 0000000..49087eb --- /dev/null +++ b/test/integration/constants.ts @@ -0,0 +1,4 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +export const skipWhenNodeJsVersionIsSmallerThan = '18.0.0'; diff --git a/test/integration/kafka_test.ts b/test/integration/kafka_test.ts new file mode 100644 index 0000000..97b3ee7 --- /dev/null +++ b/test/integration/kafka_test.ts @@ -0,0 +1,99 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +import { SpanKind } from '@opentelemetry/api'; +import { expect } from 'chai'; +import semver from 'semver'; + +import { expectResourceAttribute, expectSpanAttribute } from '../util/expectAttribute'; +import { expectMatchingSpan } from '../util/expectMatchingSpan'; +import runCommand from '../util/runCommand'; +import waitUntil from '../util/waitUntil'; +import ChildProcessWrapper, { defaultAppConfiguration } from './ChildProcessWrapper'; +import { collector } from './rootHooks'; +import { skipWhenNodeJsVersionIsSmallerThan } from './constants'; + +const appConfiguration = defaultAppConfiguration(0); +delete appConfiguration.env!.PORT; +appConfiguration.env!.KAFKAJS_NO_PARTITIONER_WARNING = '1'; +appConfiguration.path = 'test/apps/kafkajs'; + +describe('kafka tracing', () => { + let appUnderTest: ChildProcessWrapper; + + before(async function () { + if (semver.lt(process.version, skipWhenNodeJsVersionIsSmallerThan)) { + this.skip(); + return; + } + + await runCommand('npm ci', 'test/apps/kafkajs'); + + appUnderTest = new ChildProcessWrapper(appConfiguration); + await appUnderTest.start(); + }); + + beforeEach(async function () { + collector().clear(); + }); + + after(async () => { + await appUnderTest.stop(); + }); + + it('should capture producer spans', async () => { + await waitUntil(async () => { + const traces = await produceMessageAndFetchTraceData(); + expectMatchingSpan( + traces, + [ + resource => expectResourceAttribute(resource, 'telemetry.sdk.name', 'opentelemetry'), + resource => expectResourceAttribute(resource, 'telemetry.sdk.language', 'nodejs'), + resource => expectResourceAttribute(resource, 'telemetry.distro.name', 'dash0-nodejs'), + ], + [ + span => expect(span.kind).to.equal(SpanKind.PRODUCER, 'span kind should be producer'), + span => expectSpanAttribute(span, 'messaging.system', 'kafka'), + span => expectSpanAttribute(span, 'messaging.destination', 'test-topic'), + ], + ); + }); + }); + + it('should capture consumer spans', async () => { + await waitUntil(async () => { + const traces = await consumeMessageAndFetchTraceData(); + expectMatchingSpan( + traces, + [ + resource => expectResourceAttribute(resource, 'telemetry.sdk.name', 'opentelemetry'), + resource => expectResourceAttribute(resource, 'telemetry.sdk.language', 'nodejs'), + resource => expectResourceAttribute(resource, 'telemetry.distro.name', 'dash0-nodejs'), + ], + [ + span => expect(span.kind).to.equal(SpanKind.CONSUMER, 'span kind should be consumer'), + span => expectSpanAttribute(span, 'messaging.system', 'kafka'), + span => expectSpanAttribute(span, 'messaging.destination', 'test-topic'), + ], + ); + }); + }); + + async function produceMessageAndFetchTraceData() { + await produceMessage(); + return collector().fetchTraces(); + } + + async function produceMessage() { + await appUnderTest.sendIpcRequest({ command: 'produce-message' }); + } + + async function consumeMessageAndFetchTraceData() { + await consumeMessage(); + return collector().fetchTraces(); + } + + async function consumeMessage() { + await appUnderTest.sendIpcRequest({ command: 'consume-message' }); + } +}); diff --git a/test/integration/test.ts b/test/integration/test.ts index db7a2e8..a172d18 100644 --- a/test/integration/test.ts +++ b/test/integration/test.ts @@ -22,8 +22,7 @@ import runCommand from '../util/runCommand'; import waitUntil from '../util/waitUntil'; import ChildProcessWrapper, { defaultAppConfiguration } from './ChildProcessWrapper'; import { collector } from './rootHooks'; - -const skipWhenNodeJsVersionIsSmallerThan = '18.0.0'; +import { skipWhenNodeJsVersionIsSmallerThan } from './constants'; const { fail } = expect; @@ -60,7 +59,7 @@ describe('attach', () => { it('should attach via --require and capture spans', async () => { await waitUntil(async () => { - const traces = await sendRequestAndWaitForTraceData(); + const traces = await sendHttpRequestAndFetchTraceData(); expectMatchingSpan( traces, [ @@ -79,7 +78,7 @@ describe('attach', () => { it('should attach via --require and capture metrics', async () => { await waitUntil(async () => { - const metrics = await sendRequestAndWaitForMetrics(); + const metrics = await sendHttpRequestAndFetchMetrics(); expectMatchingMetric( metrics, [ @@ -106,7 +105,7 @@ describe('attach', () => { it('should attach via --require and capture logs', async () => { await waitUntil(async () => { - const logs = await sendRequestAndWaitForLogRecords(); + const logs = await sendHttpRequestAndFetchLogRecords(); expectMatchingLogRecord( logs, [ @@ -146,7 +145,7 @@ describe('attach', () => { it('should attach via --require and detect the pod uid', async () => { await waitUntil(async () => { - const traces = await sendRequestAndWaitForTraceData(); + const traces = await sendHttpRequestAndFetchTraceData(); expectMatchingSpan( traces, [resource => expectResourceAttribute(resource, 'k8s.pod.uid', 'f57400dc-94ce-4806-a52e-d2726f448f15')], @@ -174,7 +173,7 @@ describe('attach', () => { it('should attach via --require and derive a service name from the package.json file', async () => { await waitUntil(async () => { - const traces = await sendRequestAndWaitForTraceData(); + const traces = await sendHttpRequestAndFetchTraceData(); expectMatchingSpan( traces, [ @@ -209,7 +208,7 @@ describe('attach', () => { // (because the top level beforeHook is executed after this suite's before hook). await appUnderTest.start(); await waitUntil(async () => { - const traces = await waitForTraceData(); + const traces = await collector().fetchTraces(); expectMatchingSpan( traces, [ @@ -246,7 +245,7 @@ describe('attach', () => { await appUnderTest.start(); await appUnderTest.stop(); await waitUntil(async () => { - const traces = await waitForTraceData(); + const traces = await collector().fetchTraces(); expectMatchingSpan( traces, [ @@ -264,7 +263,7 @@ describe('attach', () => { await appUnderTest.start(); await appUnderTest.stop('SIGINT'); await waitUntil(async () => { - const traces = await waitForTraceData(); + const traces = await collector().fetchTraces(); expectMatchingSpan( traces, [ @@ -300,7 +299,7 @@ describe('attach', () => { it('should flush telemetry before process exit due to empty event loop', async () => { await appUnderTest.start(); await waitUntil(async () => { - const traces = await waitForTraceData(); + const traces = await collector().fetchTraces(); expectMatchingSpan( traces, [ @@ -353,7 +352,7 @@ describe('attach', () => { // }); await waitUntil(async () => { - await sendRequestAndVerifyResponse(); + await sendHttpRequestAndVerifyResponse(); const spanFile = await verifyFileHasBeenCreated(spanFilename); const spans = []; for await (const line of spanFile.readLines()) { @@ -396,7 +395,7 @@ describe('attach', () => { it('should do nothing if disabled', async () => { await delay(1000); - await sendRequestAndVerifyResponse(); + await sendHttpRequestAndVerifyResponse(); await delay(2000); if (await collector().hasTelemetry()) { @@ -405,49 +404,28 @@ describe('attach', () => { }); }); - async function sendRequestAndWaitForTraceData() { - await sendRequestAndVerifyResponse(); - return waitForTraceData(); + async function sendHttpRequestAndFetchTraceData() { + await sendHttpRequestAndVerifyResponse(); + return collector().fetchTraces(); } - async function sendRequestAndWaitForMetrics() { - await sendRequestAndVerifyResponse(); - return waitForMetrics(); + async function sendHttpRequestAndFetchMetrics() { + await sendHttpRequestAndVerifyResponse(); + return collector().fetchMetrics(); } - async function sendRequestAndWaitForLogRecords() { - await sendRequestAndVerifyResponse(); - return waitForLogRecords(); + async function sendHttpRequestAndFetchLogRecords() { + await sendHttpRequestAndVerifyResponse(); + return collector().fetchLogRecords(); } - async function sendRequestAndVerifyResponse() { + async function sendHttpRequestAndVerifyResponse() { const response = await fetch(`http://localhost:${appPort}/ohai`); expect(response.status).to.equal(200); const responsePayload = await response.json(); expect(responsePayload).to.deep.equal({ message: 'We make Observability easy for every developer.' }); } - async function waitForTraceData() { - if (!(await collector().hasTraces())) { - throw new Error('The collector never received any spans.'); - } - return (await collector().fetchTelemetry()).traces; - } - - async function waitForMetrics() { - if (!(await collector().hasMetrics())) { - throw new Error('The collector never received any metrics.'); - } - return (await collector().fetchTelemetry()).metrics; - } - - async function waitForLogRecords() { - if (!(await collector().hasLogs())) { - throw new Error('The collector never received any log records.'); - } - return (await collector().fetchTelemetry()).logs; - } - async function verifyFileHasBeenCreated(filename: string): Promise { let file; try {