Skip to content

Commit

Permalink
test: add test for kafkajs tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
basti1302 committed Jul 10, 2024
1 parent c3daa4e commit b2b5cb0
Show file tree
Hide file tree
Showing 15 changed files with 543 additions and 61 deletions.
1 change: 1 addition & 0 deletions eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module.exports = tsEsLint.config(
2,
],
'mocha/no-exclusive-tests': 'error',
'no-case-declarations': 'off',
'simpleImportSort/exports': 'error',
'unusedImports/no-unused-imports': 'error',
},
Expand Down
4 changes: 2 additions & 2 deletions src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { NodeSDK, NodeSDKConfiguration } from '@opentelemetry/sdk-node';
import { BatchSpanProcessor, SpanProcessor } from '@opentelemetry/sdk-trace-base';
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';

import PodUidDetector from './detectors/node/opentelemetry-resource-detector-kubernetes-pod';
import ServiceNameFallbackDetector from './detectors/node/opentelemetry-resource-detector-service-name-fallback';
import { FileSpanExporter } from './util/FileSpanExporter';
import { hasOptedIn, hasOptedOut, parseNumericEnvironmentVariableWithDefault } from './util/environment';
import { kafkaJsInstrumentation } from './util/kafkajs';

const logPrefix = 'Dash0 OpenTelemetry distribution for Node.js:';
const debugOutput = hasOptedIn('DASH0_DEBUG');
Expand Down Expand Up @@ -67,7 +67,7 @@ const configuration: Partial<NodeSDKConfiguration> = {
instrumentations: [
//
getNodeAutoInstrumentations(createInstrumentationConfig()),
new KafkaJsInstrumentation(),
kafkaJsInstrumentation,
],
resource: resource(),
resourceDetectors: resourceDetectors(),
Expand Down
6 changes: 6 additions & 0 deletions src/util/kafkajs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc.
// SPDX-License-Identifier: Apache-2.0

import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';

export const kafkaJsInstrumentation = new KafkaJsInstrumentation();
1 change: 1 addition & 0 deletions test/apps/empty-event-loop/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "dash0-app-under-test-empty-event-loop",
"version": "1.0.0",
"private": true,
"description": "",
"main": "app.ts",
"scripts": {
Expand Down
1 change: 1 addition & 0 deletions test/apps/express-typescript/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "dash0-app-under-test-express-typescript",
"version": "1.0.0",
"private": true,
"description": "",
"main": "app.ts",
"scripts": {
Expand Down
129 changes: 129 additions & 0 deletions test/apps/kafkajs/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc.
// SPDX-License-Identifier: Apache-2.0

import { inspect } from 'node:util';
import * as kafkajs from 'kafkajs';
import {
Consumer,
ConsumerRunConfig,
EachMessagePayload,
Kafka,
KafkaJSError,
KafkaMessage,
Producer,
RecordMetadata,
} from 'kafkajs';
import { IpcRequest } from '../../util/ipc';
import sendToParentProcess, { sendReadyToParentProcess } from '../../util/sendToParentProcess';

// for testing purposes, we need a reference to the kafakjs instrumentation
import { kafkaJsInstrumentation } from '../../../src/util/kafkajs';

const kafka = new Kafka({
clientId: 'dash0-kafkajs-tests',
brokers: ['dummy:1302'],
});

let producer: Producer;
let consumer: Consumer;
let runConfig: ConsumerRunConfig | undefined;

process.on('message', async message => {
const ipcRequest = <IpcRequest>message;
const command = ipcRequest.command;
const id = ipcRequest.id;
switch (command) {
case 'produce-message':
await produceMessage(id);
break;
case 'consume-message':
await consumeMessage(id);
break;
default:
const errorMsg = `Unknown message: ${inspect(message)}`;
sendToParentProcess({ id, error: errorMsg });
console.error(errorMsg);
}
});

(function initKafka() {
producer = kafka.producer();

getRunConfig();

// Since we patch the consumer, we need to disable/enable the instrumentation to make sure it the instrumentation is
// still applied, see
// https://github.com/open-telemetry/opentelemetry-js-contrib/blob/2c32e5869ef9b6d582ba4da02623a030309bcaf3/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts#L142-L143.
kafkaJsInstrumentation.disable();
kafkaJsInstrumentation.enable();

consumer = kafka.consumer({
groupId: 'testing-group-id',
});

sendReadyToParentProcess();
})();

function getRunConfig() {
const origConsumerFactory = kafkajs.Kafka.prototype.consumer;
kafkajs.Kafka.prototype.consumer = function (...args): Consumer {
const consumer: Consumer = origConsumerFactory.apply(this, args);
consumer.run = function (config?: ConsumerRunConfig): Promise<void> {
runConfig = config;
return Promise.resolve();
};
return consumer;
};
}

async function produceMessage(id: number) {
try {
const res: RecordMetadata[] = await producer.send({
topic: 'test-topic',
messages: [
{
value: 'testing message content',
},
],
});
sendToParentProcess({ id, ok: true, res });
} catch (err) {
if (err instanceof KafkaJSError && err.message === 'The producer is disconnected') {
// This is expected, since we are not starting an actual Kafka broker for the integration tests, hence we are
// also not connected to any broker.
sendToParentProcess({ id, ok: true });
} else {
sendToParentProcess({ id, error: err });
}
}
}

async function consumeMessage(id: number) {
consumer.run({
eachMessage: async (): Promise<void> => {},
});
const payload = createPayload();
await runConfig?.eachMessage!(payload);
sendToParentProcess({ id, ok: true });
}

function createPayload(): EachMessagePayload {
return {
topic: 'test-topic',
partition: 0,
message: createMessage('456'),
heartbeat: async () => {},
pause: () => () => {},
};
}

function createMessage(offset: string): KafkaMessage {
return {
key: Buffer.from('message-key', 'utf8'),
value: Buffer.from('message content', 'utf8'),
timestamp: '1234',
size: 10,
attributes: 1,
offset: offset,
};
}
205 changes: 205 additions & 0 deletions test/apps/kafkajs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b2b5cb0

Please sign in to comment.