From b41797b2b8a11a7db4d3ec13c2452d824f45f82d Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Fri, 17 May 2024 11:02:19 +0300 Subject: [PATCH] feat: kafkajs instrumentation (#2089) * feat: kafkajs instrumentation * doc: update readme * update package-lock * update release-please * update release please config * prefer this._diag * remove moduleVersionAttributeName config option * use internal kafka message type * use generic messageinfo type * skip version update from compile step * remove kafkajs type from exports * add aspecto to copyright * upgrade to latest otel * add docs about semconv * NOTICE updates * specify supported version and remove unnecessary diag logs * allow for exact and case insensitive compare for propagation fields * update component owners * remove messaging.destination_kind attribute --------- Co-authored-by: Amir Blum --- .github/component_owners.yml | 2 + .release-please-manifest.json | 1 + eslint.config.js | 2 +- package-lock.json | 701 ++++++++++++++ .../instrumentation-kafkajs/.eslintignore | 1 + .../node/instrumentation-kafkajs/.eslintrc.js | 7 + plugins/node/instrumentation-kafkajs/LICENSE | 201 ++++ plugins/node/instrumentation-kafkajs/NOTICE | 8 + .../node/instrumentation-kafkajs/README.md | 76 ++ .../node/instrumentation-kafkajs/package.json | 64 ++ .../node/instrumentation-kafkajs/src/index.ts | 18 + .../src/instrumentation.ts | 407 ++++++++ .../instrumentation-kafkajs/src/propagator.ts | 43 + .../node/instrumentation-kafkajs/src/types.ts | 46 + .../test/DummyPropagation.ts | 64 ++ .../test/kafkajs.test.ts | 885 ++++++++++++++++++ .../instrumentation-kafkajs/tsconfig.json | 11 + release-please-config.json | 1 + 18 files changed, 2537 insertions(+), 1 deletion(-) create mode 100644 plugins/node/instrumentation-kafkajs/.eslintignore create mode 100644 plugins/node/instrumentation-kafkajs/.eslintrc.js create mode 100644 plugins/node/instrumentation-kafkajs/LICENSE create mode 100644 plugins/node/instrumentation-kafkajs/NOTICE create mode 100644 plugins/node/instrumentation-kafkajs/README.md create mode 100644 plugins/node/instrumentation-kafkajs/package.json create mode 100644 plugins/node/instrumentation-kafkajs/src/index.ts create mode 100644 plugins/node/instrumentation-kafkajs/src/instrumentation.ts create mode 100644 plugins/node/instrumentation-kafkajs/src/propagator.ts create mode 100644 plugins/node/instrumentation-kafkajs/src/types.ts create mode 100644 plugins/node/instrumentation-kafkajs/test/DummyPropagation.ts create mode 100644 plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts create mode 100644 plugins/node/instrumentation-kafkajs/tsconfig.json diff --git a/.github/component_owners.yml b/.github/component_owners.yml index 210413659d..0565ca3edf 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -58,6 +58,8 @@ components: - henrinormak plugins/node/instrumentation-fs: - rauno56 + plugins/node/instrumentation-kafkajs: + - seemk plugins/node/instrumentation-lru-memoizer: - blumamir plugins/node/instrumentation-mongoose: diff --git a/.release-please-manifest.json b/.release-please-manifest.json index 68cedee9ce..5225c39db0 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -20,6 +20,7 @@ "plugins/node/instrumentation-cucumber": "0.6.0", "plugins/node/instrumentation-dataloader": "0.9.0", "plugins/node/instrumentation-fs": "0.12.0", + "plugins/node/instrumentation-kafkajs": "0.0.1", "plugins/node/instrumentation-lru-memoizer": "0.37.0", "plugins/node/instrumentation-mongoose": "0.38.1", "plugins/node/instrumentation-runtime-node": "0.4.0", diff --git a/eslint.config.js b/eslint.config.js index e342425670..71488af0c9 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -20,7 +20,7 @@ module.exports = { "no-shadow": "off", "node/no-deprecated-api": ["warn"], "header/header": ["error", "block", [{ - pattern: / \* Copyright The OpenTelemetry Authors[\r\n]+ \*[\r\n]+ \* Licensed under the Apache License, Version 2\.0 \(the \"License\"\);[\r\n]+ \* you may not use this file except in compliance with the License\.[\r\n]+ \* You may obtain a copy of the License at[\r\n]+ \*[\r\n]+ \* https:\/\/www\.apache\.org\/licenses\/LICENSE-2\.0[\r\n]+ \*[\r\n]+ \* Unless required by applicable law or agreed to in writing, software[\r\n]+ \* distributed under the License is distributed on an \"AS IS\" BASIS,[\r\n]+ \* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied\.[\r\n]+ \* See the License for the specific language governing permissions and[\r\n]+ \* limitations under the License\./gm, + pattern: / \* Copyright The OpenTelemetry Authors(, .+)*[\r\n]+ \*[\r\n]+ \* Licensed under the Apache License, Version 2\.0 \(the \"License\"\);[\r\n]+ \* you may not use this file except in compliance with the License\.[\r\n]+ \* You may obtain a copy of the License at[\r\n]+ \*[\r\n]+ \* https:\/\/www\.apache\.org\/licenses\/LICENSE-2\.0[\r\n]+ \*[\r\n]+ \* Unless required by applicable law or agreed to in writing, software[\r\n]+ \* distributed under the License is distributed on an \"AS IS\" BASIS,[\r\n]+ \* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied\.[\r\n]+ \* See the License for the specific language governing permissions and[\r\n]+ \* limitations under the License\./gm, template: `\n * Copyright The OpenTelemetry Authors\n *\n * Licensed under the Apache License, Version 2.0 (the "License");\n * you may not use this file except in compliance with the License.\n * You may obtain a copy of the License at\n *\n * https://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an "AS IS" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n ` }]] diff --git a/package-lock.json b/package-lock.json index 0763fb489f..30b61cfd9a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8851,6 +8851,10 @@ "resolved": "plugins/node/opentelemetry-instrumentation-ioredis", "link": true }, + "node_modules/@opentelemetry/instrumentation-kafkajs": { + "resolved": "plugins/node/instrumentation-kafkajs", + "link": true + }, "node_modules/@opentelemetry/instrumentation-knex": { "resolved": "plugins/node/opentelemetry-instrumentation-knex", "link": true @@ -21692,6 +21696,15 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "dev": true, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/kareem": { "version": "2.5.1", "resolved": "https://registry.npmjs.org/kareem/-/kareem-2.5.1.tgz", @@ -37042,6 +37055,414 @@ "@opentelemetry/api": "^1.3.0" } }, + "plugins/node/instrumentation-kafkajs": { + "name": "@opentelemetry/instrumentation-kafkajs", + "version": "0.0.1", + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/instrumentation": "^0.51.0", + "@opentelemetry/semantic-conventions": "^1.24.0" + }, + "devDependencies": { + "@opentelemetry/api": "^1.3.0", + "@opentelemetry/contrib-test-utils": "^0.38.0", + "@opentelemetry/sdk-trace-base": "^1.24.0", + "@types/mocha": "7.0.2", + "@types/node": "18.6.5", + "@types/sinon": "^10.0.11", + "kafkajs": "^2.2.4", + "mocha": "7.2.0", + "nyc": "15.1.0", + "rimraf": "5.0.5", + "sinon": "15.2.0", + "ts-mocha": "10.0.0", + "typescript": "4.4.4" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/api-logs": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api-logs/-/api-logs-0.50.0.tgz", + "integrity": "sha512-JdZuKrhOYggqOpUljAq4WWNi5nB10PmgoF0y2CvedLGXd0kSawb/UBnWT8gg1ND3bHCNHStAIVT0ELlxJJRqrA==", + "dependencies": { + "@opentelemetry/api": "^1.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/context-async-hooks": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/context-async-hooks/-/context-async-hooks-1.23.0.tgz", + "integrity": "sha512-wazGJZDRevibOJ+VgyrT+9+8sybZAxpZx2G7vy30OAtk92OpZCg7HgNxT11NUx0VBDWcRx1dOatMYGOVplQ7QA==", + "dev": true, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/contrib-test-utils": { + "version": "0.38.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/contrib-test-utils/-/contrib-test-utils-0.38.0.tgz", + "integrity": "sha512-wdFiZfkCIuQ93Z3JXs1lIjEIabIGxfpEqwdLVYXLhASd8JBRlqLQ4bg0/FRa8Nnd+0BZr3U6Rihg/y8WC3z1ig==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "^1.0.0", + "@opentelemetry/exporter-jaeger": "^1.3.1", + "@opentelemetry/instrumentation": "^0.50.0", + "@opentelemetry/resources": "^1.8.0", + "@opentelemetry/sdk-node": "^0.50.0", + "@opentelemetry/sdk-trace-base": "^1.8.0", + "@opentelemetry/sdk-trace-node": "^1.8.0", + "@opentelemetry/semantic-conventions": "^1.0.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/core": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.23.0.tgz", + "integrity": "sha512-hdQ/a9TMzMQF/BO8Cz1juA43/L5YGtCSiKoOHmrTEf7VMDAZgy8ucpWx3eQTnQ3gBloRcWtzvcrMZABC3PTSKQ==", + "dev": true, + "dependencies": { + "@opentelemetry/semantic-conventions": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/exporter-trace-otlp-grpc": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-trace-otlp-grpc/-/exporter-trace-otlp-grpc-0.50.0.tgz", + "integrity": "sha512-w/NF4TrwHxx+Uz1M0rCOSVr6KgcoQPv3zF9JRqcebY2euD7ddWnLP0hE8JavyA1uq4UchnMp9faAk9n7hTCePw==", + "dev": true, + "dependencies": { + "@grpc/grpc-js": "^1.7.1", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-grpc-exporter-base": "0.50.0", + "@opentelemetry/otlp-transformer": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/exporter-trace-otlp-http": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-trace-otlp-http/-/exporter-trace-otlp-http-0.50.0.tgz", + "integrity": "sha512-L7OtIMT7MsFqkmhbQlPBGRXt7152VN5esHpQEJYIBFedOEo3Da+yHpu5ojMZtPzpIvSpB5Xr5lnJUjJCbkttCA==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "@opentelemetry/otlp-transformer": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/exporter-trace-otlp-proto": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-trace-otlp-proto/-/exporter-trace-otlp-proto-0.50.0.tgz", + "integrity": "sha512-vavD9Ow6yOLiD+ocuS/oeciCsXNdsN41aYUrEljNaLXogvnkfMhJ+JLAhOnRSpzlVtRp7Ciw2BYGdYSebR0OsA==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "@opentelemetry/otlp-proto-exporter-base": "0.50.0", + "@opentelemetry/otlp-transformer": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/exporter-zipkin": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-zipkin/-/exporter-zipkin-1.23.0.tgz", + "integrity": "sha512-2LOGvNUGONuIcWhynFaJorVyqv03uZkURScciLmOxvBf2lWTNPEj77br1dCpShIWBM+YlrH7Tc+JXAs+GC7DqA==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/instrumentation": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/instrumentation/-/instrumentation-0.50.0.tgz", + "integrity": "sha512-bhGhbJiZKpuu7wTaSak4hyZcFPlnDeuSF/2vglze8B4w2LubcSbbOnkVTzTs5SXtzh4Xz8eRjaNnAm+u2GYufQ==", + "dependencies": { + "@opentelemetry/api-logs": "0.50.0", + "@types/shimmer": "^1.0.2", + "import-in-the-middle": "1.7.1", + "require-in-the-middle": "^7.1.1", + "semver": "^7.5.2", + "shimmer": "^1.2.1" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/otlp-exporter-base": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-exporter-base/-/otlp-exporter-base-0.50.0.tgz", + "integrity": "sha512-JUmjmrCmE1/fc4LjCQMqLfudgSl5OpUkzx7iA94b4jgeODM7zWxUoVXL7/CT7fWf47Cn+pmKjMvTCSESqZZ3mA==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/otlp-grpc-exporter-base": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-grpc-exporter-base/-/otlp-grpc-exporter-base-0.50.0.tgz", + "integrity": "sha512-J500AczSD7xEsjXpwNzSh5HQqxW73PT3CCNsi1VEWCE+8UPgVfkHYIGRHGoch35DV+CMe1svbi7gAk3e5eCSVA==", + "dev": true, + "dependencies": { + "@grpc/grpc-js": "^1.7.1", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "protobufjs": "^7.2.3" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/otlp-proto-exporter-base": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-proto-exporter-base/-/otlp-proto-exporter-base-0.50.0.tgz", + "integrity": "sha512-hlbn3eZbhxoK79Sq1ddj1f7qcx+PzsPQC/SFpJvaWgTaqacCbqJmpzWDKfRRCAC7iGX2Hj/sgpf8vysazqyMOw==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "protobufjs": "^7.2.3" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/otlp-transformer": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-transformer/-/otlp-transformer-0.50.0.tgz", + "integrity": "sha512-s0sl1Yfqd5q1Kjrf6DqXPWzErL+XHhrXOfejh4Vc/SMTNqC902xDsC8JQxbjuramWt/+hibfguIvi7Ns8VLolA==", + "dev": true, + "dependencies": { + "@opentelemetry/api-logs": "0.50.0", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-logs": "0.50.0", + "@opentelemetry/sdk-metrics": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.3.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/propagator-b3": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/propagator-b3/-/propagator-b3-1.23.0.tgz", + "integrity": "sha512-cZ6rl8y2bdxYQ4e+zP2CQ+QmuPebaLBLO1skjFpj3eEu7zar+6hBzUP3llMOUupkQeQSwXz+4c8dZ26OhYfG/g==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/propagator-jaeger": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/propagator-jaeger/-/propagator-jaeger-1.23.0.tgz", + "integrity": "sha512-6iArixfgIl3ZgzeltQ5jyiKbjZygM+MbM84pXi1HL0Qs4x4Ck5rM6wEtjhZffFnlDMWEkEqrnM0xF6bTfbiMAQ==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/resources": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-1.23.0.tgz", + "integrity": "sha512-iPRLfVfcEQynYGo7e4Di+ti+YQTAY0h5mQEUJcHlU9JOqpb4x965O6PZ+wMcwYVY63G96KtdS86YCM1BF1vQZg==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/sdk-logs": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-logs/-/sdk-logs-0.50.0.tgz", + "integrity": "sha512-PeUEupBB29p9nlPNqXoa1PUWNLsZnxG0DCDj3sHqzae+8y76B/A5hvZjg03ulWdnvBLYpnJslqzylG9E0IL87g==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.4.0 <1.9.0", + "@opentelemetry/api-logs": ">=0.39.1" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/sdk-metrics": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-metrics/-/sdk-metrics-1.23.0.tgz", + "integrity": "sha512-4OkvW6+wST4h6LFG23rXSTf6nmTf201h9dzq7bE0z5R9ESEVLERZz6WXwE7PSgg1gdjlaznm1jLJf8GttypFDg==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "lodash.merge": "^4.6.2" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.3.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/sdk-node": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-node/-/sdk-node-0.50.0.tgz", + "integrity": "sha512-LhIXHnvcnhRYcPwG9VG4G6lJ7x4ElYF6UYHHmXA7e4ZWzSUEFmAPfR1IBWv358aD1KwffcEBu7J6zeAR7lPZag==", + "dev": true, + "dependencies": { + "@opentelemetry/api-logs": "0.50.0", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/exporter-trace-otlp-grpc": "0.50.0", + "@opentelemetry/exporter-trace-otlp-http": "0.50.0", + "@opentelemetry/exporter-trace-otlp-proto": "0.50.0", + "@opentelemetry/exporter-zipkin": "1.23.0", + "@opentelemetry/instrumentation": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-logs": "0.50.0", + "@opentelemetry/sdk-metrics": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0", + "@opentelemetry/sdk-trace-node": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.3.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/sdk-trace-base": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-base/-/sdk-trace-base-1.23.0.tgz", + "integrity": "sha512-PzBmZM8hBomUqvCddF/5Olyyviayka44O5nDWq673np3ctnvwMOvNrsUORZjKja1zJbwEuD9niAGbnVrz3jwRQ==", + "dev": true, + "dependencies": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/sdk-trace-node": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-node/-/sdk-trace-node-1.23.0.tgz", + "integrity": "sha512-dwnin5Go2r6VzJZkVc9JBPupssWp7j2EFto+S7qRkwQ00WDykWeq3x2Skk7I1Jr448FeBSvGCQVPgV5e6s6O3w==", + "dev": true, + "dependencies": { + "@opentelemetry/context-async-hooks": "1.23.0", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/propagator-b3": "1.23.0", + "@opentelemetry/propagator-jaeger": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0", + "semver": "^7.5.2" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.9.0" + } + }, + "plugins/node/instrumentation-kafkajs/node_modules/@opentelemetry/semantic-conventions": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.23.0.tgz", + "integrity": "sha512-MiqFvfOzfR31t8cc74CTP1OZfz7MbqpAnLCra8NqQoaHJX6ncIRTdYOQYBDQ2uFISDq0WY8Y9dDTWvsgzzBYRg==", + "engines": { + "node": ">=14" + } + }, "plugins/node/instrumentation-lru-memoizer": { "name": "@opentelemetry/instrumentation-lru-memoizer", "version": "0.37.0", @@ -46743,6 +47164,280 @@ "typescript": "4.4.4" } }, + "@opentelemetry/instrumentation-kafkajs": { + "version": "file:plugins/node/instrumentation-kafkajs", + "requires": { + "@opentelemetry/api": "^1.3.0", + "@opentelemetry/contrib-test-utils": "^0.38.0", + "@opentelemetry/instrumentation": "^0.51.0", + "@opentelemetry/sdk-trace-base": "^1.24.0", + "@opentelemetry/semantic-conventions": "^1.24.0", + "@types/mocha": "7.0.2", + "@types/node": "18.6.5", + "@types/sinon": "^10.0.11", + "kafkajs": "^2.2.4", + "mocha": "7.2.0", + "nyc": "15.1.0", + "rimraf": "5.0.5", + "sinon": "15.2.0", + "ts-mocha": "10.0.0", + "typescript": "4.4.4" + }, + "dependencies": { + "@opentelemetry/api-logs": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api-logs/-/api-logs-0.50.0.tgz", + "integrity": "sha512-JdZuKrhOYggqOpUljAq4WWNi5nB10PmgoF0y2CvedLGXd0kSawb/UBnWT8gg1ND3bHCNHStAIVT0ELlxJJRqrA==", + "requires": { + "@opentelemetry/api": "^1.0.0" + } + }, + "@opentelemetry/context-async-hooks": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/context-async-hooks/-/context-async-hooks-1.23.0.tgz", + "integrity": "sha512-wazGJZDRevibOJ+VgyrT+9+8sybZAxpZx2G7vy30OAtk92OpZCg7HgNxT11NUx0VBDWcRx1dOatMYGOVplQ7QA==", + "dev": true, + "requires": {} + }, + "@opentelemetry/contrib-test-utils": { + "version": "0.38.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/contrib-test-utils/-/contrib-test-utils-0.38.0.tgz", + "integrity": "sha512-wdFiZfkCIuQ93Z3JXs1lIjEIabIGxfpEqwdLVYXLhASd8JBRlqLQ4bg0/FRa8Nnd+0BZr3U6Rihg/y8WC3z1ig==", + "dev": true, + "requires": { + "@opentelemetry/core": "^1.0.0", + "@opentelemetry/exporter-jaeger": "^1.3.1", + "@opentelemetry/instrumentation": "^0.50.0", + "@opentelemetry/resources": "^1.8.0", + "@opentelemetry/sdk-node": "^0.50.0", + "@opentelemetry/sdk-trace-base": "^1.8.0", + "@opentelemetry/sdk-trace-node": "^1.8.0", + "@opentelemetry/semantic-conventions": "^1.0.0" + } + }, + "@opentelemetry/core": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.23.0.tgz", + "integrity": "sha512-hdQ/a9TMzMQF/BO8Cz1juA43/L5YGtCSiKoOHmrTEf7VMDAZgy8ucpWx3eQTnQ3gBloRcWtzvcrMZABC3PTSKQ==", + "dev": true, + "requires": { + "@opentelemetry/semantic-conventions": "1.23.0" + } + }, + "@opentelemetry/exporter-trace-otlp-grpc": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-trace-otlp-grpc/-/exporter-trace-otlp-grpc-0.50.0.tgz", + "integrity": "sha512-w/NF4TrwHxx+Uz1M0rCOSVr6KgcoQPv3zF9JRqcebY2euD7ddWnLP0hE8JavyA1uq4UchnMp9faAk9n7hTCePw==", + "dev": true, + "requires": { + "@grpc/grpc-js": "^1.7.1", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-grpc-exporter-base": "0.50.0", + "@opentelemetry/otlp-transformer": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + } + }, + "@opentelemetry/exporter-trace-otlp-http": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-trace-otlp-http/-/exporter-trace-otlp-http-0.50.0.tgz", + "integrity": "sha512-L7OtIMT7MsFqkmhbQlPBGRXt7152VN5esHpQEJYIBFedOEo3Da+yHpu5ojMZtPzpIvSpB5Xr5lnJUjJCbkttCA==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "@opentelemetry/otlp-transformer": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + } + }, + "@opentelemetry/exporter-trace-otlp-proto": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-trace-otlp-proto/-/exporter-trace-otlp-proto-0.50.0.tgz", + "integrity": "sha512-vavD9Ow6yOLiD+ocuS/oeciCsXNdsN41aYUrEljNaLXogvnkfMhJ+JLAhOnRSpzlVtRp7Ciw2BYGdYSebR0OsA==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "@opentelemetry/otlp-proto-exporter-base": "0.50.0", + "@opentelemetry/otlp-transformer": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + } + }, + "@opentelemetry/exporter-zipkin": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-zipkin/-/exporter-zipkin-1.23.0.tgz", + "integrity": "sha512-2LOGvNUGONuIcWhynFaJorVyqv03uZkURScciLmOxvBf2lWTNPEj77br1dCpShIWBM+YlrH7Tc+JXAs+GC7DqA==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + } + }, + "@opentelemetry/instrumentation": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/instrumentation/-/instrumentation-0.50.0.tgz", + "integrity": "sha512-bhGhbJiZKpuu7wTaSak4hyZcFPlnDeuSF/2vglze8B4w2LubcSbbOnkVTzTs5SXtzh4Xz8eRjaNnAm+u2GYufQ==", + "requires": { + "@opentelemetry/api-logs": "0.50.0", + "@types/shimmer": "^1.0.2", + "import-in-the-middle": "1.7.1", + "require-in-the-middle": "^7.1.1", + "semver": "^7.5.2", + "shimmer": "^1.2.1" + } + }, + "@opentelemetry/otlp-exporter-base": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-exporter-base/-/otlp-exporter-base-0.50.0.tgz", + "integrity": "sha512-JUmjmrCmE1/fc4LjCQMqLfudgSl5OpUkzx7iA94b4jgeODM7zWxUoVXL7/CT7fWf47Cn+pmKjMvTCSESqZZ3mA==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0" + } + }, + "@opentelemetry/otlp-grpc-exporter-base": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-grpc-exporter-base/-/otlp-grpc-exporter-base-0.50.0.tgz", + "integrity": "sha512-J500AczSD7xEsjXpwNzSh5HQqxW73PT3CCNsi1VEWCE+8UPgVfkHYIGRHGoch35DV+CMe1svbi7gAk3e5eCSVA==", + "dev": true, + "requires": { + "@grpc/grpc-js": "^1.7.1", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "protobufjs": "^7.2.3" + } + }, + "@opentelemetry/otlp-proto-exporter-base": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-proto-exporter-base/-/otlp-proto-exporter-base-0.50.0.tgz", + "integrity": "sha512-hlbn3eZbhxoK79Sq1ddj1f7qcx+PzsPQC/SFpJvaWgTaqacCbqJmpzWDKfRRCAC7iGX2Hj/sgpf8vysazqyMOw==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/otlp-exporter-base": "0.50.0", + "protobufjs": "^7.2.3" + } + }, + "@opentelemetry/otlp-transformer": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-transformer/-/otlp-transformer-0.50.0.tgz", + "integrity": "sha512-s0sl1Yfqd5q1Kjrf6DqXPWzErL+XHhrXOfejh4Vc/SMTNqC902xDsC8JQxbjuramWt/+hibfguIvi7Ns8VLolA==", + "dev": true, + "requires": { + "@opentelemetry/api-logs": "0.50.0", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-logs": "0.50.0", + "@opentelemetry/sdk-metrics": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0" + } + }, + "@opentelemetry/propagator-b3": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/propagator-b3/-/propagator-b3-1.23.0.tgz", + "integrity": "sha512-cZ6rl8y2bdxYQ4e+zP2CQ+QmuPebaLBLO1skjFpj3eEu7zar+6hBzUP3llMOUupkQeQSwXz+4c8dZ26OhYfG/g==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0" + } + }, + "@opentelemetry/propagator-jaeger": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/propagator-jaeger/-/propagator-jaeger-1.23.0.tgz", + "integrity": "sha512-6iArixfgIl3ZgzeltQ5jyiKbjZygM+MbM84pXi1HL0Qs4x4Ck5rM6wEtjhZffFnlDMWEkEqrnM0xF6bTfbiMAQ==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0" + } + }, + "@opentelemetry/resources": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-1.23.0.tgz", + "integrity": "sha512-iPRLfVfcEQynYGo7e4Di+ti+YQTAY0h5mQEUJcHlU9JOqpb4x965O6PZ+wMcwYVY63G96KtdS86YCM1BF1vQZg==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + } + }, + "@opentelemetry/sdk-logs": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-logs/-/sdk-logs-0.50.0.tgz", + "integrity": "sha512-PeUEupBB29p9nlPNqXoa1PUWNLsZnxG0DCDj3sHqzae+8y76B/A5hvZjg03ulWdnvBLYpnJslqzylG9E0IL87g==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0" + } + }, + "@opentelemetry/sdk-metrics": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-metrics/-/sdk-metrics-1.23.0.tgz", + "integrity": "sha512-4OkvW6+wST4h6LFG23rXSTf6nmTf201h9dzq7bE0z5R9ESEVLERZz6WXwE7PSgg1gdjlaznm1jLJf8GttypFDg==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "lodash.merge": "^4.6.2" + } + }, + "@opentelemetry/sdk-node": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-node/-/sdk-node-0.50.0.tgz", + "integrity": "sha512-LhIXHnvcnhRYcPwG9VG4G6lJ7x4ElYF6UYHHmXA7e4ZWzSUEFmAPfR1IBWv358aD1KwffcEBu7J6zeAR7lPZag==", + "dev": true, + "requires": { + "@opentelemetry/api-logs": "0.50.0", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/exporter-trace-otlp-grpc": "0.50.0", + "@opentelemetry/exporter-trace-otlp-http": "0.50.0", + "@opentelemetry/exporter-trace-otlp-proto": "0.50.0", + "@opentelemetry/exporter-zipkin": "1.23.0", + "@opentelemetry/instrumentation": "0.50.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/sdk-logs": "0.50.0", + "@opentelemetry/sdk-metrics": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0", + "@opentelemetry/sdk-trace-node": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + } + }, + "@opentelemetry/sdk-trace-base": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-base/-/sdk-trace-base-1.23.0.tgz", + "integrity": "sha512-PzBmZM8hBomUqvCddF/5Olyyviayka44O5nDWq673np3ctnvwMOvNrsUORZjKja1zJbwEuD9niAGbnVrz3jwRQ==", + "dev": true, + "requires": { + "@opentelemetry/core": "1.23.0", + "@opentelemetry/resources": "1.23.0", + "@opentelemetry/semantic-conventions": "1.23.0" + } + }, + "@opentelemetry/sdk-trace-node": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-node/-/sdk-trace-node-1.23.0.tgz", + "integrity": "sha512-dwnin5Go2r6VzJZkVc9JBPupssWp7j2EFto+S7qRkwQ00WDykWeq3x2Skk7I1Jr448FeBSvGCQVPgV5e6s6O3w==", + "dev": true, + "requires": { + "@opentelemetry/context-async-hooks": "1.23.0", + "@opentelemetry/core": "1.23.0", + "@opentelemetry/propagator-b3": "1.23.0", + "@opentelemetry/propagator-jaeger": "1.23.0", + "@opentelemetry/sdk-trace-base": "1.23.0", + "semver": "^7.5.2" + } + }, + "@opentelemetry/semantic-conventions": { + "version": "1.23.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.23.0.tgz", + "integrity": "sha512-MiqFvfOzfR31t8cc74CTP1OZfz7MbqpAnLCra8NqQoaHJX6ncIRTdYOQYBDQ2uFISDq0WY8Y9dDTWvsgzzBYRg==" + } + } + }, "@opentelemetry/instrumentation-knex": { "version": "file:plugins/node/opentelemetry-instrumentation-knex", "requires": { @@ -58249,6 +58944,12 @@ "safe-buffer": "^5.0.1" } }, + "kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "dev": true + }, "kareem": { "version": "2.5.1", "resolved": "https://registry.npmjs.org/kareem/-/kareem-2.5.1.tgz", diff --git a/plugins/node/instrumentation-kafkajs/.eslintignore b/plugins/node/instrumentation-kafkajs/.eslintignore new file mode 100644 index 0000000000..378eac25d3 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/.eslintignore @@ -0,0 +1 @@ +build diff --git a/plugins/node/instrumentation-kafkajs/.eslintrc.js b/plugins/node/instrumentation-kafkajs/.eslintrc.js new file mode 100644 index 0000000000..f756f4488b --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + "env": { + "mocha": true, + "node": true + }, + ...require('../../../eslint.config.js') +} diff --git a/plugins/node/instrumentation-kafkajs/LICENSE b/plugins/node/instrumentation-kafkajs/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/plugins/node/instrumentation-kafkajs/NOTICE b/plugins/node/instrumentation-kafkajs/NOTICE new file mode 100644 index 0000000000..5c83fd9609 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/NOTICE @@ -0,0 +1,8 @@ +[Based on the instrumentation written by Aspecto](https://github.com/aspecto-io/opentelemetry-ext-js/tree/master/packages/instrumentation-kafkajs). + +The library contains the following changes compared to the original: +* The codebase was converted to TypeScript. +* bufferTextMapGetter compares propagation headers in a case insensitive manner. +* Removed `moduleVersionAttributeName` configuration option. +* Changed the function signature of `producerHook` and `consumerHook`. +* Removed `messaging.destination_kind` attribute. diff --git a/plugins/node/instrumentation-kafkajs/README.md b/plugins/node/instrumentation-kafkajs/README.md new file mode 100644 index 0000000000..9f00d430e2 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/README.md @@ -0,0 +1,76 @@ +# OpenTelemetry `kafkajs` Instrumentation for Node.js + +[![NPM Published Version][npm-img]][npm-url] +[![Apache License][license-image]][license-image] + +This module provides automatic instrumentation for the [`kafkajs`](https://www.npmjs.com/package/kafkajs) package, which may be loaded using the [`@opentelemetry/sdk-trace-node`](https://github.com/open-telemetry/opentelemetry-js/tree/main/packages/opentelemetry-sdk-trace-node) package and is included in the [`@opentelemetry/auto-instrumentations-node`](https://www.npmjs.com/package/@opentelemetry/auto-instrumentations-node) bundle. + +If total installation size is not constrained, it is recommended to use the [`@opentelemetry/auto-instrumentations-node`](https://www.npmjs.com/package/@opentelemetry/auto-instrumentations-node) bundle with [@opentelemetry/sdk-node](`https://www.npmjs.com/package/@opentelemetry/sdk-node`) for the most seamless instrumentation experience. + +Compatible with OpenTelemetry JS API and SDK `1.0+`. + +## Installation + +```bash +npm install --save @opentelemetry/instrumentation-kafkajs +``` + +### Supported versions + +- `<3.0.0` + +## Usage + +```js +const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node'); +const { KafkaJsInstrumentation } = require('@opentelemetry/instrumentation-kafkajs'); +const { registerInstrumentations } = require('@opentelemetry/instrumentation'); + +const provider = new NodeTracerProvider(); +provider.register(); + +registerInstrumentations({ + instrumentations: [ + new KafkaJsInstrumentation({ + // see below for available configuration + }), + ], +}); +``` + +### Instrumentation Options + +You can set the following: + +| Options | Type | Description | +| ---------------------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------------------- | +| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. | +| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. | + +## Semantic Conventions + +This package uses `@opentelemetry/semantic-conventions` version `1.24+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md) + +Attributes collected: + +| Attribute | Short Description | +| -----------------------------| ----------------------------------------------------- | +| `messaging.system` | An identifier for the messaging system being used. | +| `messaging.destination` | The message destination name. | +| `messaging.operation` | A string identifying the kind of messaging operation. | + +## Useful links + +- For more information on OpenTelemetry, visit: +- For more about OpenTelemetry JavaScript: +- For help or feedback on this project, join us in [GitHub Discussions][discussions-url] + +## License + +Apache 2.0 - See [LICENSE][license-url] for more information. + +[discussions-url]: https://github.com/open-telemetry/opentelemetry-js/discussions +[license-url]: https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/LICENSE +[license-image]: https://img.shields.io/badge/license-Apache_2.0-green.svg?style=flat +[npm-url]: https://www.npmjs.com/package/@opentelemetry/instrumentation-kafkajs +[npm-img]: https://badge.fury.io/js/%40opentelemetry%2Finstrumentation-kafkajs.svg diff --git a/plugins/node/instrumentation-kafkajs/package.json b/plugins/node/instrumentation-kafkajs/package.json new file mode 100644 index 0000000000..7e6ff384e1 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/package.json @@ -0,0 +1,64 @@ +{ + "name": "@opentelemetry/instrumentation-kafkajs", + "version": "0.0.1", + "description": "OpenTelemetry automatic instrumentation package for kafkajs", + "main": "build/src/index.js", + "types": "build/src/index.d.ts", + "repository": "open-telemetry/opentelemetry-js-contrib", + "scripts": { + "test": "ts-mocha --require @opentelemetry/contrib-test-utils -p tsconfig.json 'test/**/*.test.ts'", + "tdd": "npm run test -- --watch-extensions ts --watch", + "clean": "rimraf build/*", + "lint": "eslint . --ext .ts", + "lint:fix": "eslint . --ext .ts --fix", + "precompile": "tsc --version && lerna run version:update --scope @opentelemetry/instrumentation-kafkajs --include-dependencies", + "prewatch": "npm run precompile", + "prepublishOnly": "npm run compile", + "version:update": "node ../../../scripts/version-update.js", + "compile": "tsc -p ." + }, + "keywords": [ + "kafkajs", + "instrumentation", + "nodejs", + "opentelemetry", + "profiling", + "tracing" + ], + "author": "OpenTelemetry Authors", + "license": "Apache-2.0", + "engines": { + "node": ">=14" + }, + "files": [ + "build/src/**/*.js", + "build/src/**/*.js.map", + "build/src/**/*.d.ts" + ], + "publishConfig": { + "access": "public" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + }, + "devDependencies": { + "@opentelemetry/api": "^1.3.0", + "@opentelemetry/contrib-test-utils": "^0.38.0", + "@opentelemetry/sdk-trace-base": "^1.24.0", + "@types/mocha": "7.0.2", + "@types/node": "18.6.5", + "@types/sinon": "^10.0.11", + "kafkajs": "^2.2.4", + "mocha": "7.2.0", + "nyc": "15.1.0", + "rimraf": "5.0.5", + "sinon": "15.2.0", + "ts-mocha": "10.0.0", + "typescript": "4.4.4" + }, + "dependencies": { + "@opentelemetry/instrumentation": "^0.51.0", + "@opentelemetry/semantic-conventions": "^1.24.0" + }, + "homepage": "https://github.com/open-telemetry/opentelemetry-js-contrib/tree/main/plugins/node/instrumentation-kafkajs#readme" +} diff --git a/plugins/node/instrumentation-kafkajs/src/index.ts b/plugins/node/instrumentation-kafkajs/src/index.ts new file mode 100644 index 0000000000..91c7641c95 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/src/index.ts @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors, Aspecto + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export * from './instrumentation'; +export * from './types'; diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts new file mode 100644 index 0000000000..09762e8e6f --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -0,0 +1,407 @@ +/* + * Copyright The OpenTelemetry Authors, Aspecto + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + SpanKind, + Span, + SpanStatusCode, + Context, + propagation, + Link, + trace, + context, + ROOT_CONTEXT, +} from '@opentelemetry/api'; +import { + MESSAGINGOPERATIONVALUES_PROCESS, + MESSAGINGOPERATIONVALUES_RECEIVE, + SEMATTRS_MESSAGING_SYSTEM, + SEMATTRS_MESSAGING_DESTINATION, + SEMATTRS_MESSAGING_OPERATION, +} from '@opentelemetry/semantic-conventions'; +import type * as kafkaJs from 'kafkajs'; +import type { + EachBatchHandler, + EachMessageHandler, + Producer, + RecordMetadata, + Message, + ConsumerRunConfig, + KafkaMessage, + Consumer, +} from 'kafkajs'; +import { KafkaJsInstrumentationConfig } from './types'; +import { VERSION } from './version'; +import { bufferTextMapGetter } from './propagator'; +import { + InstrumentationBase, + InstrumentationNodeModuleDefinition, + safeExecuteInTheMiddle, + isWrapped, +} from '@opentelemetry/instrumentation'; + +export class KafkaJsInstrumentation extends InstrumentationBase { + protected override _config!: KafkaJsInstrumentationConfig; + + constructor(config: KafkaJsInstrumentationConfig = {}) { + super('@opentelemetry/instrumentation-kafkajs', VERSION, config); + } + + protected init() { + const unpatch = (moduleExports: typeof kafkaJs) => { + if (isWrapped(moduleExports?.Kafka?.prototype.producer)) { + this._unwrap(moduleExports.Kafka.prototype, 'producer'); + } + if (isWrapped(moduleExports?.Kafka?.prototype.consumer)) { + this._unwrap(moduleExports.Kafka.prototype, 'consumer'); + } + }; + + const module = new InstrumentationNodeModuleDefinition( + 'kafkajs', + ['< 3'], + (moduleExports: typeof kafkaJs) => { + unpatch(moduleExports); + this._wrap( + moduleExports?.Kafka?.prototype, + 'producer', + this._getProducerPatch() + ); + this._wrap( + moduleExports?.Kafka?.prototype, + 'consumer', + this._getConsumerPatch() + ); + + return moduleExports; + }, + unpatch + ); + return module; + } + + private _getConsumerPatch() { + const instrumentation = this; + return (original: kafkaJs.Kafka['consumer']) => { + return function consumer( + this: kafkaJs.Kafka, + ...args: Parameters + ) { + const newConsumer: Consumer = original.apply(this, args); + + if (isWrapped(newConsumer.run)) { + instrumentation._unwrap(newConsumer, 'run'); + } + + instrumentation._wrap( + newConsumer, + 'run', + instrumentation._getConsumerRunPatch() + ); + + return newConsumer; + }; + }; + } + + private _getProducerPatch() { + const instrumentation = this; + return (original: kafkaJs.Kafka['producer']) => { + return function consumer( + this: kafkaJs.Kafka, + ...args: Parameters + ) { + const newProducer: Producer = original.apply(this, args); + + if (isWrapped(newProducer.sendBatch)) { + instrumentation._unwrap(newProducer, 'sendBatch'); + } + instrumentation._wrap( + newProducer, + 'sendBatch', + instrumentation._getProducerSendBatchPatch() + ); + + if (isWrapped(newProducer.send)) { + instrumentation._unwrap(newProducer, 'send'); + } + instrumentation._wrap( + newProducer, + 'send', + instrumentation._getProducerSendPatch() + ); + + return newProducer; + }; + }; + } + + private _getConsumerRunPatch() { + const instrumentation = this; + return (original: Consumer['run']) => { + return function run( + this: Consumer, + ...args: Parameters + ): ReturnType { + const config = args[0]; + if (config?.eachMessage) { + if (isWrapped(config.eachMessage)) { + instrumentation._unwrap(config, 'eachMessage'); + } + instrumentation._wrap( + config, + 'eachMessage', + instrumentation._getConsumerEachMessagePatch() + ); + } + if (config?.eachBatch) { + if (isWrapped(config.eachBatch)) { + instrumentation._unwrap(config, 'eachBatch'); + } + instrumentation._wrap( + config, + 'eachBatch', + instrumentation._getConsumerEachBatchPatch() + ); + } + return original.call(this, config); + }; + }; + } + + private _getConsumerEachMessagePatch() { + const instrumentation = this; + return (original: ConsumerRunConfig['eachMessage']) => { + return function eachMessage( + this: unknown, + ...args: Parameters + ): Promise { + const payload = args[0]; + const propagatedContext: Context = propagation.extract( + ROOT_CONTEXT, + payload.message.headers, + bufferTextMapGetter + ); + const span = instrumentation._startConsumerSpan( + payload.topic, + payload.message, + MESSAGINGOPERATIONVALUES_PROCESS, + propagatedContext + ); + + const eachMessagePromise = context.with( + trace.setSpan(propagatedContext, span), + () => { + return original!.apply(this, args); + } + ); + return instrumentation._endSpansOnPromise([span], eachMessagePromise); + }; + }; + } + + private _getConsumerEachBatchPatch() { + return (original: ConsumerRunConfig['eachBatch']) => { + const instrumentation = this; + return function eachBatch( + this: unknown, + ...args: Parameters + ): Promise { + const payload = args[0]; + // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers + const receivingSpan = instrumentation._startConsumerSpan( + payload.batch.topic, + undefined, + MESSAGINGOPERATIONVALUES_RECEIVE, + ROOT_CONTEXT + ); + return context.with( + trace.setSpan(context.active(), receivingSpan), + () => { + const spans = payload.batch.messages.map( + (message: KafkaMessage) => { + const propagatedContext: Context = propagation.extract( + ROOT_CONTEXT, + message.headers, + bufferTextMapGetter + ); + const spanContext = trace + .getSpan(propagatedContext) + ?.spanContext(); + let origSpanLink: Link | undefined; + if (spanContext) { + origSpanLink = { + context: spanContext, + }; + } + return instrumentation._startConsumerSpan( + payload.batch.topic, + message, + MESSAGINGOPERATIONVALUES_PROCESS, + undefined, + origSpanLink + ); + } + ); + const batchMessagePromise: Promise = original!.apply( + this, + args + ); + spans.unshift(receivingSpan); + return instrumentation._endSpansOnPromise( + spans, + batchMessagePromise + ); + } + ); + }; + }; + } + + private _getProducerSendBatchPatch() { + const instrumentation = this; + return (original: Producer['sendBatch']) => { + return function sendBatch( + this: Producer, + ...args: Parameters + ): ReturnType { + const batch = args[0]; + const messages = batch.topicMessages || []; + const spans: Span[] = messages + .map(topicMessage => + topicMessage.messages.map(message => + instrumentation._startProducerSpan(topicMessage.topic, message) + ) + ) + .reduce((acc, val) => acc.concat(val), []); + + const origSendResult: Promise = original.apply( + this, + args + ); + return instrumentation._endSpansOnPromise(spans, origSendResult); + }; + }; + } + + private _getProducerSendPatch() { + const instrumentation = this; + return (original: Producer['send']) => { + return function send( + this: Producer, + ...args: Parameters + ): ReturnType { + const record = args[0]; + const spans: Span[] = record.messages.map(message => { + return instrumentation._startProducerSpan(record.topic, message); + }); + + const origSendResult: Promise = original.apply( + this, + args + ); + return instrumentation._endSpansOnPromise(spans, origSendResult); + }; + }; + } + + private _endSpansOnPromise( + spans: Span[], + sendPromise: Promise + ): Promise { + return Promise.resolve(sendPromise) + .catch(reason => { + let errorMessage: string; + if (typeof reason === 'string') errorMessage = reason; + else if ( + typeof reason === 'object' && + Object.prototype.hasOwnProperty.call(reason, 'message') + ) + errorMessage = reason.message; + + spans.forEach(span => + span.setStatus({ + code: SpanStatusCode.ERROR, + message: errorMessage, + }) + ); + + throw reason; + }) + .finally(() => { + spans.forEach(span => span.end()); + }); + } + + private _startConsumerSpan( + topic: string, + message: KafkaMessage | undefined, + operation: string, + context: Context | undefined, + link?: Link + ) { + const span = this.tracer.startSpan( + topic, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_SYSTEM]: 'kafka', + [SEMATTRS_MESSAGING_DESTINATION]: topic, + [SEMATTRS_MESSAGING_OPERATION]: operation, + }, + links: link ? [link] : [], + }, + context + ); + + if (this._config?.consumerHook && message) { + safeExecuteInTheMiddle( + () => this._config.consumerHook!(span, { topic, message }), + e => { + if (e) this._diag.error('consumerHook error', e); + }, + true + ); + } + + return span; + } + + private _startProducerSpan(topic: string, message: Message) { + const span = this.tracer.startSpan(topic, { + kind: SpanKind.PRODUCER, + attributes: { + [SEMATTRS_MESSAGING_SYSTEM]: 'kafka', + [SEMATTRS_MESSAGING_DESTINATION]: topic, + }, + }); + + message.headers = message.headers ?? {}; + propagation.inject(trace.setSpan(context.active(), span), message.headers); + + if (this._config?.producerHook) { + safeExecuteInTheMiddle( + () => this._config.producerHook!(span, { topic, message }), + e => { + if (e) this._diag.error('producerHook error', e); + }, + true + ); + } + + return span; + } +} diff --git a/plugins/node/instrumentation-kafkajs/src/propagator.ts b/plugins/node/instrumentation-kafkajs/src/propagator.ts new file mode 100644 index 0000000000..abe2b1ee8b --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/src/propagator.ts @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors, Aspecto + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { TextMapGetter } from '@opentelemetry/api'; + +/* +same as open telemetry's `defaultTextMapGetter`, +but also handle case where header is buffer, +adding toString() to make sure string is returned +*/ +export const bufferTextMapGetter: TextMapGetter = { + get(carrier, key) { + if (!carrier) { + return undefined; + } + + const keys = Object.keys(carrier); + + for (const carrierKey of keys) { + if (carrierKey === key || carrierKey.toLowerCase() === key) { + return carrier[carrierKey]?.toString(); + } + } + + return undefined; + }, + + keys(carrier) { + return carrier ? Object.keys(carrier) : []; + }, +}; diff --git a/plugins/node/instrumentation-kafkajs/src/types.ts b/plugins/node/instrumentation-kafkajs/src/types.ts new file mode 100644 index 0000000000..3c660ffdc8 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/src/types.ts @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors, Aspecto + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { Span } from '@opentelemetry/api'; +import { InstrumentationConfig } from '@opentelemetry/instrumentation'; + +export interface KafkajsMessage { + key?: Buffer | string | null; + value: Buffer | string | null; + partition?: number; + headers?: Record; + timestamp?: string; +} + +export interface MessageInfo { + topic: string; + message: T; +} + +export interface KafkaProducerCustomAttributeFunction { + (span: Span, info: MessageInfo): void; +} + +export interface KafkaConsumerCustomAttributeFunction { + (span: Span, info: MessageInfo): void; +} + +export interface KafkaJsInstrumentationConfig extends InstrumentationConfig { + /** hook for adding custom attributes before producer message is sent */ + producerHook?: KafkaProducerCustomAttributeFunction; + + /** hook for adding custom attributes before consumer message is processed */ + consumerHook?: KafkaConsumerCustomAttributeFunction; +} diff --git a/plugins/node/instrumentation-kafkajs/test/DummyPropagation.ts b/plugins/node/instrumentation-kafkajs/test/DummyPropagation.ts new file mode 100644 index 0000000000..5daa6d65a7 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/test/DummyPropagation.ts @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors, Aspecto + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + Context, + TraceFlags, + TextMapPropagator, + TextMapSetter, + TextMapGetter, + trace, +} from '@opentelemetry/api'; + +export class DummyPropagation implements TextMapPropagator { + static TRACE_CONTEXT_KEY = 'x-dummy-trace-id'; + static SPAN_CONTEXT_KEY = 'x-dummy-span-id'; + + extract(context: Context, carrier: unknown, getter: TextMapGetter) { + const extractedSpanContext = { + traceId: getter.get( + carrier, + DummyPropagation.TRACE_CONTEXT_KEY + ) as string, + spanId: getter.get(carrier, DummyPropagation.SPAN_CONTEXT_KEY) as string, + traceFlags: TraceFlags.SAMPLED, + }; + + if (!extractedSpanContext.traceId || !extractedSpanContext.spanId) + return context; + + return trace.setSpanContext(context, extractedSpanContext); + } + + inject(context: Context, carrier: unknown, setter: TextMapSetter): void { + const spanContext = trace.getSpanContext(context); + if (!spanContext) return; + + setter.set( + carrier, + DummyPropagation.TRACE_CONTEXT_KEY, + spanContext.traceId + ); + setter.set(carrier, DummyPropagation.SPAN_CONTEXT_KEY, spanContext.spanId); + } + + fields(): string[] { + return [ + DummyPropagation.TRACE_CONTEXT_KEY, + DummyPropagation.SPAN_CONTEXT_KEY, + ]; + } +} diff --git a/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts new file mode 100644 index 0000000000..258dddd7d0 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts @@ -0,0 +1,885 @@ +/* + * Copyright The OpenTelemetry Authors, Aspecto + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { KafkaJsInstrumentation, KafkaJsInstrumentationConfig } from '../src'; +import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; +import { + propagation, + context, + SpanKind, + SpanStatusCode, + Baggage, +} from '@opentelemetry/api'; +import { + SEMATTRS_MESSAGING_SYSTEM, + SEMATTRS_MESSAGING_DESTINATION, + SEMATTRS_MESSAGING_OPERATION, +} from '@opentelemetry/semantic-conventions'; +import { + getTestSpans, + registerInstrumentationTesting, +} from '@opentelemetry/contrib-test-utils'; +import { W3CBaggagePropagator, CompositePropagator } from '@opentelemetry/core'; + +const instrumentation = registerInstrumentationTesting( + new KafkaJsInstrumentation() +); + +import * as kafkajs from 'kafkajs'; +import { + Kafka, + ProducerRecord, + RecordMetadata, + Producer, + ProducerBatch, + Message, + Consumer, + ConsumerRunConfig, + EachBatchPayload, + EachMessagePayload, + KafkaMessage, +} from 'kafkajs'; +import { DummyPropagation } from './DummyPropagation'; +import { bufferTextMapGetter } from '../src/propagator'; + +describe('instrumentation-kafkajs', () => { + propagation.setGlobalPropagator( + new CompositePropagator({ + propagators: [new DummyPropagation(), new W3CBaggagePropagator()], + }) + ); + + const kafka = new Kafka({ + clientId: 'unit-tests', + brokers: ['testing_mock_host:1234'], + }); + + let producer: Producer; + let messagesSent: Message[] = []; + + const patchProducerSend = (cb: () => Promise) => { + const origProducerFactory = kafkajs.Kafka.prototype.producer; + kafkajs.Kafka.prototype.producer = function (...args): Producer { + const producer = origProducerFactory.apply(this, args); + + producer.send = function (record: ProducerRecord) { + messagesSent.push(...record.messages); + return cb(); + }; + + producer.sendBatch = function (batch: ProducerBatch) { + batch.topicMessages?.forEach(topicMessages => + messagesSent.push(...topicMessages.messages) + ); + return cb(); + }; + + return producer; + }; + }; + + let consumer: Consumer; + let runConfig: ConsumerRunConfig | undefined; + + const storeRunConfig = () => { + const origConsumerFactory = kafkajs.Kafka.prototype.consumer; + kafkajs.Kafka.prototype.consumer = function (...args): Consumer { + const consumer: Consumer = origConsumerFactory.apply(this, args); + consumer.run = function (config?: ConsumerRunConfig): Promise { + runConfig = config; + return Promise.resolve(); + }; + return consumer; + }; + }; + + beforeEach(() => { + messagesSent = []; + }); + + describe('producer', () => { + const expectKafkaHeadersToMatchSpanContext = ( + kafkaMessage: Message, + span: ReadableSpan + ) => { + assert.strictEqual( + kafkaMessage.headers?.[DummyPropagation.TRACE_CONTEXT_KEY], + span.spanContext().traceId + ); + assert.strictEqual( + kafkaMessage.headers?.[DummyPropagation.SPAN_CONTEXT_KEY], + span.spanContext().spanId + ); + }; + + describe('successful send', () => { + beforeEach(async () => { + patchProducerSend(async (): Promise => { + return [ + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '18', + timestamp: '123456', + }, + ]; + }); + instrumentation.disable(); + instrumentation.enable(); + producer = kafka.producer(); + }); + + it('simple send create span with right attributes, pass return value correctly and propagate context', async () => { + const res: RecordMetadata[] = await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'testing message content', + }, + ], + }); + + assert.strictEqual(res.length, 1); + assert.strictEqual(res[0].topicName, 'topic-name-1'); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.kind, SpanKind.PRODUCER); + assert.strictEqual(span.name, 'topic-name-1'); + assert.strictEqual(span.status.code, SpanStatusCode.UNSET); + assert.strictEqual(span.attributes[SEMATTRS_MESSAGING_SYSTEM], 'kafka'); + assert.strictEqual( + span.attributes[SEMATTRS_MESSAGING_DESTINATION], + 'topic-name-1' + ); + + assert.strictEqual(messagesSent.length, 1); + expectKafkaHeadersToMatchSpanContext( + messagesSent[0], + span as ReadableSpan + ); + }); + + it('send two messages', async () => { + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'message1', + }, + { + value: 'message2', + }, + ], + }); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 2); + assert.strictEqual(spans[0].name, 'topic-name-1'); + assert.strictEqual(spans[1].name, 'topic-name-1'); + + assert.strictEqual(messagesSent.length, 2); + expectKafkaHeadersToMatchSpanContext( + messagesSent[0], + spans[0] as ReadableSpan + ); + expectKafkaHeadersToMatchSpanContext( + messagesSent[1], + spans[1] as ReadableSpan + ); + }); + + it('send batch', async () => { + await producer.sendBatch({ + topicMessages: [ + { + topic: 'topic-name-1', + messages: [ + { + value: 'message1-1', + }, + { + value: 'message1-2', + }, + ], + }, + { + topic: 'topic-name-2', + messages: [ + { + value: 'message2-1', + }, + ], + }, + ], + }); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 3); + assert.strictEqual(spans[0].name, 'topic-name-1'); + assert.strictEqual(spans[1].name, 'topic-name-1'); + assert.strictEqual(spans[2].name, 'topic-name-2'); + + assert.strictEqual(messagesSent.length, 3); + for (let i = 0; i < 3; i++) { + expectKafkaHeadersToMatchSpanContext( + messagesSent[i], + spans[i] as ReadableSpan + ); + } + }); + }); + + describe('failed send', () => { + beforeEach(async () => { + patchProducerSend((): Promise => { + return Promise.reject( + new Error('error thrown from kafka client send') + ); + }); + instrumentation.disable(); + instrumentation.enable(); + producer = kafka.producer(); + }); + + it('error in send create failed span', async () => { + try { + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'testing message content', + }, + ], + }); + } catch (err) {} + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual( + span.status.message, + 'error thrown from kafka client send' + ); + }); + + it('error in send with multiple messages create failed spans', async () => { + try { + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'testing message content 1', + }, + { + value: 'testing message content 2', + }, + ], + }); + } catch (err) {} + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 2); + spans.forEach(span => { + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual( + span.status.message, + 'error thrown from kafka client send' + ); + }); + }); + + it('error in sendBatch should set error to all spans', async () => { + try { + await producer.sendBatch({ + topicMessages: [ + { + topic: 'topic-name-1', + messages: [ + { + value: 'message1-1', + }, + { + value: 'message1-2', + }, + ], + }, + { + topic: 'topic-name-2', + messages: [ + { + value: 'message2-1', + }, + ], + }, + ], + }); + } catch (err) {} + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 3); + spans.forEach(span => { + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual( + span.status.message, + 'error thrown from kafka client send' + ); + }); + }); + }); + + describe('producer hook successful', () => { + beforeEach(async () => { + patchProducerSend(async (): Promise => []); + + const config: KafkaJsInstrumentationConfig = { + producerHook: (span, info) => { + span.setAttribute( + 'attribute-from-hook', + info.message.value as string + ); + }, + }; + instrumentation.disable(); + instrumentation.setConfig(config); + instrumentation.enable(); + producer = kafka.producer(); + }); + + it('producer hook add span attribute with value from message', async () => { + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'testing message content', + }, + ], + }); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual( + span.attributes['attribute-from-hook'], + 'testing message content' + ); + }); + }); + + describe('producer hook throw, should still create span', () => { + beforeEach(async () => { + patchProducerSend(async (): Promise => []); + + const config: KafkaJsInstrumentationConfig = { + producerHook: (_span, _info) => { + throw new Error('error thrown from producer hook'); + }, + }; + instrumentation.disable(); + instrumentation.setConfig(config); + instrumentation.enable(); + producer = kafka.producer(); + }); + + it('producer hook add span attribute with value from message', async () => { + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'testing message content', + }, + ], + }); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.status.code, SpanStatusCode.UNSET); + }); + }); + }); + + describe('consumer', () => { + const createKafkaMessage = (offset: string): KafkaMessage => { + return { + key: Buffer.from('message-key', 'utf8'), + value: Buffer.from('message content', 'utf8'), + timestamp: '1234', + size: 10, + attributes: 1, + offset: offset, + }; + }; + + const createEachMessagePayload = (): EachMessagePayload => { + return { + topic: 'topic-name-1', + partition: 0, + message: createKafkaMessage('123'), + heartbeat: async () => {}, + pause: () => () => {}, + }; + }; + + const createEachBatchPayload = (): EachBatchPayload => { + return { + batch: { + topic: 'topic-name-1', + partition: 1234, + highWatermark: '4567', + messages: [createKafkaMessage('124'), createKafkaMessage('125')], + }, + } as EachBatchPayload; + }; + + beforeEach(() => { + storeRunConfig(); + }); + + describe('successful eachMessage', () => { + beforeEach(async () => { + instrumentation.disable(); + instrumentation.enable(); + consumer = kafka.consumer({ + groupId: 'testing-group-id', + }); + }); + + it('consume eachMessage create span with expected attributes', async () => { + consumer.run({ + eachMessage: async ( + _payload: EachMessagePayload + ): Promise => {}, + }); + const payload: EachMessagePayload = createEachMessagePayload(); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.name, 'topic-name-1'); + assert.strictEqual(span.parentSpanId, undefined); + assert.strictEqual(span.kind, SpanKind.CONSUMER); + assert.strictEqual(span.status.code, SpanStatusCode.UNSET); + assert.strictEqual(span.attributes[SEMATTRS_MESSAGING_SYSTEM], 'kafka'); + assert.strictEqual( + span.attributes[SEMATTRS_MESSAGING_DESTINATION], + 'topic-name-1' + ); + assert.strictEqual( + span.attributes[SEMATTRS_MESSAGING_OPERATION], + 'process' + ); + }); + + it('consumer eachMessage with non promise return value', async () => { + consumer.run({ + // the usecase of kafkajs callback not returning promise + // is not typescript valid, but it might (and is) implemented in real life (nestjs) + // and does not break the library. + eachMessage: async (_payload: EachMessagePayload) => {}, + }); + const payload: EachMessagePayload = createEachMessagePayload(); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + }); + }); + + describe('successful consumer hook', () => { + beforeEach(async () => { + const config: KafkaJsInstrumentationConfig = { + consumerHook: (span, info) => { + span.setAttribute( + 'attribute key from hook', + info.message.value!.toString() + ); + }, + }; + instrumentation.disable(); + instrumentation.setConfig(config); + instrumentation.enable(); + consumer = kafka.consumer({ + groupId: 'testing-group-id', + }); + consumer.run({ + eachMessage: async ( + _payload: EachMessagePayload + ): Promise => {}, + }); + }); + + it('consume hook adds attribute to span', async () => { + const payload: EachMessagePayload = createEachMessagePayload(); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual( + span.attributes['attribute key from hook'], + payload.message.value?.toString() + ); + }); + }); + + describe('throwing consumer hook', () => { + beforeEach(async () => { + const config: KafkaJsInstrumentationConfig = { + consumerHook: (_span, _info) => { + throw new Error('error thrown from consumer hook'); + }, + }; + instrumentation.disable(); + instrumentation.setConfig(config); + instrumentation.enable(); + consumer = kafka.consumer({ + groupId: 'testing-group-id', + }); + consumer.run({ + eachMessage: async ( + _payload: EachMessagePayload + ): Promise => {}, + }); + }); + + it('consume hook adds attribute to span', async () => { + const payload: EachMessagePayload = createEachMessagePayload(); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + // span should still be created + assert.strictEqual(spans.length, 1); + }); + }); + + describe('eachMessage throws', () => { + beforeEach(async () => { + instrumentation.disable(); + instrumentation.enable(); + consumer = kafka.consumer({ + groupId: 'testing-group-id', + }); + }); + + it('Error message written in the span status', async () => { + const errorToThrow = new Error( + 'error thrown from eachMessage callback' + ); + consumer.run({ + eachMessage: async (_payload: EachMessagePayload): Promise => { + throw errorToThrow; + }, + }); + + const payload: EachMessagePayload = createEachMessagePayload(); + let exception; + try { + await runConfig?.eachMessage!(payload); + } catch (e) { + exception = e; + } + assert.deepStrictEqual(exception, errorToThrow); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual( + span.status.message, + 'error thrown from eachMessage callback' + ); + }); + + it('throwing object with no message', async () => { + const objectToThrow = { + nonMessageProperty: 'the thrown object has no `message` property', + }; + consumer.run({ + eachMessage: async (_payload: EachMessagePayload): Promise => { + throw objectToThrow; + }, + }); + + const payload: EachMessagePayload = createEachMessagePayload(); + let exception; + try { + await runConfig?.eachMessage!(payload); + } catch (e) { + exception = e; + } + assert.deepStrictEqual(exception, objectToThrow); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual(span.status.message, undefined); + }); + + it('throwing non object', async () => { + consumer.run({ + eachMessage: async (_payload: EachMessagePayload): Promise => { + throw undefined; + }, + }); + + const payload: EachMessagePayload = createEachMessagePayload(); + let exception = null; + try { + await runConfig?.eachMessage!(payload); + } catch (e) { + exception = e; + } + assert.strictEqual(exception, undefined); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual(span.status.message, undefined); + }); + }); + + describe('successful eachBatch', () => { + beforeEach(async () => { + instrumentation.disable(); + instrumentation.enable(); + consumer = kafka.consumer({ + groupId: 'testing-group-id', + }); + }); + + it('consume eachBatch create span with expected attributes', async () => { + consumer.run({ + eachBatch: async (_payload: EachBatchPayload): Promise => {}, + }); + const payload: EachBatchPayload = createEachBatchPayload(); + await runConfig?.eachBatch!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 3); + spans.forEach(span => { + assert.strictEqual(span.name, 'topic-name-1'); + assert.strictEqual(span.status.code, SpanStatusCode.UNSET); + assert.strictEqual( + span.attributes[SEMATTRS_MESSAGING_SYSTEM], + 'kafka' + ); + assert.strictEqual( + span.attributes[SEMATTRS_MESSAGING_DESTINATION], + 'topic-name-1' + ); + }); + + const [recvSpan, msg1Span, msg2Span] = spans; + + assert.strictEqual(recvSpan.parentSpanId, undefined); + assert.strictEqual( + recvSpan.attributes[SEMATTRS_MESSAGING_OPERATION], + 'receive' + ); + + assert.strictEqual( + msg1Span.parentSpanId, + recvSpan.spanContext().spanId + ); + assert.strictEqual( + msg1Span.attributes[SEMATTRS_MESSAGING_OPERATION], + 'process' + ); + + assert.strictEqual( + msg2Span.parentSpanId, + recvSpan.spanContext().spanId + ); + assert.strictEqual( + msg2Span.attributes[SEMATTRS_MESSAGING_OPERATION], + 'process' + ); + }); + + it('consumer eachBatch with non promise return value', async () => { + consumer.run({ + // the usecase of kafkajs callback not returning promise + // is not typescript valid, but it might (and is) implemented in real life (nestjs) + // and does not break the library. + eachBatch: async (_payload: EachBatchPayload) => { + return; + }, + }); + const payload: EachBatchPayload = createEachBatchPayload(); + await runConfig?.eachBatch!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 3); + }); + }); + }); + + describe('context propagation', () => { + beforeEach(() => { + patchProducerSend(async (): Promise => []); + storeRunConfig(); + instrumentation.disable(); + instrumentation.enable(); + producer = kafka.producer(); + consumer = kafka.consumer({ groupId: 'testing-group-id' }); + }); + + it('context injected in producer is extracted in consumer', async () => { + let callbackBaggage: Baggage | undefined; + consumer.run({ + eachMessage: async (_payload: EachMessagePayload): Promise => { + callbackBaggage = propagation.getBaggage(context.active()); + }, + }); + + await context.with( + propagation.setBaggage( + context.active(), + propagation.createBaggage({ foo: { value: 'bar' } }) + ), + async () => { + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'testing message content', + }, + ], + }); + } + ); + + assert.strictEqual(messagesSent.length, 1); + const consumerPayload: EachMessagePayload = { + topic: 'topic-name-1', + partition: 0, + message: { + key: Buffer.alloc(0), + value: Buffer.alloc(0), + timestamp: '1234', + attributes: 0, + offset: '0', + headers: messagesSent[0].headers ?? {}, + }, + heartbeat: async () => {}, + pause: () => () => {}, + }; + await runConfig?.eachMessage!(consumerPayload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 2); + const [producerSpan, consumerSpan] = spans; + assert.strictEqual( + consumerSpan.spanContext().traceId, + producerSpan.spanContext().traceId + ); + assert.strictEqual( + consumerSpan.parentSpanId, + producerSpan.spanContext().spanId + ); + assert.strictEqual(callbackBaggage!.getAllEntries().length, 1); + assert.strictEqual(callbackBaggage!.getEntry('foo')?.value, 'bar'); + }); + + it('context injected in producer is extracted as links in batch consumer', async () => { + consumer.run({ + eachBatch: async (_payload: EachBatchPayload): Promise => {}, + }); + + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + value: 'testing message content', + }, + ], + }); + + assert.strictEqual(messagesSent.length, 1); + const consumerPayload: EachBatchPayload = { + batch: { + topic: 'topic-name-1', + partition: 0, + highWatermark: '1234', + messages: [ + { + key: Buffer.alloc(0), + value: Buffer.alloc(0), + timestamp: '1234', + size: 0, + attributes: 0, + offset: '0', + headers: messagesSent[0].headers, + }, + ], + }, + } as EachBatchPayload; + await runConfig?.eachBatch!(consumerPayload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 3); + const [producerSpan, receivingSpan, processingSpan] = spans; + + // processing span should be the child of receiving span and link to relevant producer + assert.strictEqual( + processingSpan.spanContext().traceId, + receivingSpan.spanContext().traceId + ); + assert.strictEqual( + processingSpan.parentSpanId, + receivingSpan.spanContext().spanId + ); + assert.strictEqual(processingSpan.links.length, 1); + assert.strictEqual( + processingSpan.links[0].context.traceId, + producerSpan.spanContext().traceId + ); + assert.strictEqual( + processingSpan.links[0].context.spanId, + producerSpan.spanContext().spanId + ); + + // receiving span should start a new trace + assert.strictEqual(receivingSpan.parentSpanId, undefined); + assert.notStrictEqual( + receivingSpan.spanContext().traceId, + producerSpan.spanContext().traceId + ); + }); + }); + + describe('bufferTextMapGetter', () => { + it('is possible to retrieve keys case insensitively', () => { + assert.strictEqual( + bufferTextMapGetter.get( + { + 'X-B3-Trace-Id': '123', + }, + 'x-b3-trace-id' + ), + '123' + ); + }); + }); +}); diff --git a/plugins/node/instrumentation-kafkajs/tsconfig.json b/plugins/node/instrumentation-kafkajs/tsconfig.json new file mode 100644 index 0000000000..28be80d266 --- /dev/null +++ b/plugins/node/instrumentation-kafkajs/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../../tsconfig.base", + "compilerOptions": { + "rootDir": ".", + "outDir": "build" + }, + "include": [ + "src/**/*.ts", + "test/**/*.ts" + ] +} diff --git a/release-please-config.json b/release-please-config.json index 3c3537298e..bb6cacc61c 100644 --- a/release-please-config.json +++ b/release-please-config.json @@ -27,6 +27,7 @@ "plugins/node/instrumentation-cucumber": {}, "plugins/node/instrumentation-dataloader": {}, "plugins/node/instrumentation-fs": {}, + "plugins/node/instrumentation-kafkajs": {}, "plugins/node/instrumentation-lru-memoizer": {}, "plugins/node/instrumentation-mongoose": {}, "plugins/node/instrumentation-runtime-node": {},