From 2c32e5869ef9b6d582ba4da02623a030309bcaf3 Mon Sep 17 00:00:00 2001 From: David Luna Date: Wed, 10 Jul 2024 01:07:21 +0200 Subject: [PATCH 1/2] fix(instr-express): fix handler patching for already patched router (#2294) * fix(instr-express): fix handler patching for already patched router --------- Co-authored-by: Abhijeet Prasad Co-authored-by: Jamie Danielson --- .../src/instrumentation.ts | 9 ++-- .../test/express.test.ts | 41 +++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-express/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-express/src/instrumentation.ts index c3f5602cac..7aeb400ab2 100644 --- a/plugins/node/opentelemetry-instrumentation-express/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-express/src/instrumentation.ts @@ -315,7 +315,11 @@ export class ExpressInstrumentation extends InstrumentationBase { // some properties holding metadata and state so we need to proxy them // through through patched function // ref: https://github.com/open-telemetry/opentelemetry-js-contrib/issues/1950 - Object.keys(original).forEach(key => { + // Also some apps/libs do their own patching before OTEL and have these properties + // in the proptotype. So we use a `for...in` loop to get own properties and also + // any enumerable prop in the prototype chain + // ref: https://github.com/open-telemetry/opentelemetry-js-contrib/issues/2271 + for (const key in original) { Object.defineProperty(patched, key, { get() { return original[key]; @@ -324,8 +328,7 @@ export class ExpressInstrumentation extends InstrumentationBase { original[key] = value; }, }); - }); - + } return patched; }); } diff --git a/plugins/node/opentelemetry-instrumentation-express/test/express.test.ts b/plugins/node/opentelemetry-instrumentation-express/test/express.test.ts index e13c13a424..43b5da1324 100644 --- a/plugins/node/opentelemetry-instrumentation-express/test/express.test.ts +++ b/plugins/node/opentelemetry-instrumentation-express/test/express.test.ts @@ -517,6 +517,47 @@ describe('ExpressInstrumentation', () => { } ); }); + + it('should keep the handle properties even if router is patched before instrumentation does it', async () => { + const rootSpan = tracer.startSpan('rootSpan'); + let routerLayer: { name: string; handle: { stack: any[] } }; + + const expressApp = express(); + const router = express.Router(); + const CustomRouter: (...p: Parameters) => void = ( + req, + res, + next + ) => router(req, res, next); + router.use('/:slug', (req, res, next) => { + const stack = req.app._router.stack as any[]; + routerLayer = stack.find(router => router.name === 'CustomRouter'); + return res.status(200).end('bar'); + }); + // The patched router now has express router's own properties in its prototype so + // they are not accessible through `Object.keys(...)` + // https://github.com/TryGhost/Ghost/blob/fefb9ec395df8695d06442b6ecd3130dae374d94/ghost/core/core/frontend/web/site.js#L192 + Object.setPrototypeOf(CustomRouter, router); + expressApp.use(CustomRouter); + + const httpServer = await createServer(expressApp); + server = httpServer.server; + port = httpServer.port; + await context.with( + trace.setSpan(context.active(), rootSpan), + async () => { + const response = await httpRequest.get( + `http://localhost:${port}/foo` + ); + assert.strictEqual(response, 'bar'); + rootSpan.end(); + assert.ok( + routerLayer.handle.stack.length === 1, + 'router layer stack is accessible' + ); + } + ); + }); }); describe('Disabling plugin', () => { From 5f2c160ac0fd4f6aa76c227537c34329c6c9dfb2 Mon Sep 17 00:00:00 2001 From: McSick Date: Thu, 11 Jul 2024 11:56:12 -0400 Subject: [PATCH 2/2] feat: Add the ability to use span links when consuming a message amqp plugin (#1972) A config option was added that when set to true, it follows the spec behavior to link to the produce context instead of continuing the context. The default behavior will be as it is today, so one has to opt-in to this change via the config setting. * Add the ability to use span links in AMQP plugin * add default behavior * Add in tests for useLinks option * Updating tests to correct Semantic attributes --------- Co-authored-by: Jamie Danielson --- .../node/instrumentation-amqplib/README.md | 32 +- .../instrumentation-amqplib/src/amqplib.ts | 28 +- .../node/instrumentation-amqplib/src/types.ts | 4 + .../test/amqplib-callbacks.test.ts | 266 +++++++++++- .../test/amqplib-promise.test.ts | 405 ++++++++++++++++++ 5 files changed, 724 insertions(+), 11 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/README.md b/plugins/node/instrumentation-amqplib/README.md index f2c7e90bd1..eb3553ce9d 100644 --- a/plugins/node/instrumentation-amqplib/README.md +++ b/plugins/node/instrumentation-amqplib/README.md @@ -40,6 +40,7 @@ registerInstrumentations({ // publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { }, // consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { }, // consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { }, + // useLinksForConsume: boolean, }), ], }) @@ -49,13 +50,14 @@ registerInstrumentations({ amqplib instrumentation has few options available to choose from. You can set the following: -| Options | Type | Description | -| --------------------------------- | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | -| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. | -| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. | -| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. | -| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. | -| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below | +| Options | Type | Description | +| -------------------- | ---------------------------------------------- | ----------------------------------------------------------------------------------- | +| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. | +| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. | +| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. | +| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. | +| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below | +| `useLinksForConsume` | `boolean` | read [Links for Consume](#links-for-consume) below | ### Consume Timeout @@ -69,6 +71,22 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout' Default is 1 minute +### Links for Consume + +By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec. + +Default is false + +## Running Tests Locally + +To run the tests locally, you need to have a RabbitMQ server running. You can use the following command to start a RabbitMQ server using Docker: + +```bash +npm run test:docker:run +``` + +By default, the tests that connect to RabbitMQ are skipped. To make sure these tests are run, you can set the `RUN_RABBIT_TESTS` environment variable to `true` + ## Semantic Conventions This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md) diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index b5050f1396..54fcf43afa 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -22,6 +22,8 @@ import { SpanKind, SpanStatusCode, ROOT_CONTEXT, + Link, + Context, } from '@opentelemetry/api'; import { hrTime, @@ -414,8 +416,25 @@ export class AmqplibInstrumentation extends InstrumentationBase { } const headers = msg.properties.headers ?? {}; - const parentContext = propagation.extract(ROOT_CONTEXT, headers); + let parentContext: Context | undefined = propagation.extract( + ROOT_CONTEXT, + headers + ); const exchange = msg.fields?.exchange; + let links: Link[] | undefined; + if (self._config.useLinksForConsume) { + const parentSpanContext = parentContext + ? trace.getSpan(parentContext)?.spanContext() + : undefined; + parentContext = undefined; + if (parentSpanContext) { + links = [ + { + context: parentSpanContext, + }, + ]; + } + } const span = self.tracer.startSpan( `${queue} process`, { @@ -431,6 +450,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { [SEMATTRS_MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId, }, + links, }, parentContext ); @@ -457,8 +477,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { // store the span on the message, so we can end it when user call 'ack' on it msg[MESSAGE_STORED_SPAN] = span; } - - context.with(trace.setSpan(parentContext, span), () => { + const setContext: Context = parentContext + ? parentContext + : ROOT_CONTEXT; + context.with(trace.setSpan(setContext, span), () => { onMessage.call(this, msg); }); diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 2e82b471b5..55a80fdba3 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -96,10 +96,14 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig { * Default is 1 minute */ consumeTimeoutMs?: number; + + /** option to use a span link for the consume message instead of continuing a trace */ + useLinksForConsume?: boolean; } export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { consumeTimeoutMs: 1000 * 60, // 1 minute + useLinksForConsume: false, }; // The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts index 533b5d9942..a323cebee9 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts @@ -21,7 +21,9 @@ import { registerInstrumentationTesting, } from '@opentelemetry/contrib-test-utils'; -registerInstrumentationTesting(new AmqplibInstrumentation()); +const instrumentation = registerInstrumentationTesting( + new AmqplibInstrumentation() +); import * as amqpCallback from 'amqplib/callback_api'; import { @@ -401,4 +403,266 @@ describe('amqplib instrumentation callback model', () => { }); }); }); + + describe('channel with links config', () => { + let channel: amqpCallback.Channel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createChannel( + context.bind(context.active(), (err, c) => { + channel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', () => {}); + channel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + channel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + channel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + + describe('confirm channel with links config', () => { + let confirmChannel: amqpCallback.ConfirmChannel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createConfirmChannel( + context.bind(context.active(), (err, c) => { + confirmChannel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', () => {}); + confirmChannel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + confirmChannel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + confirmChannel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => { + asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION] + ).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION] + ).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + }); }); diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts index 2205d76a5a..6df63296a1 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts @@ -1246,4 +1246,409 @@ describe('amqplib instrumentation promise model', () => { }); }); }); + describe('channel using links config', () => { + let channel: amqp.Channel & { [CHANNEL_CLOSED_IN_TEST]?: boolean }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + channel = await conn.createChannel(); + await channel.assertQueue(queueName, { durable: false }); + await channel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!channel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + channel.on('close', resolve); + channel.close(); + }); + } catch {} + } + }); + + it('simple publish and consume from queue', async () => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + await asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await channel.assertExchange(exchangeName, 'topic', { durable: false }); + + const { queue: queueName } = await channel.assertQueue('', { + durable: false, + }); + await channel.bindQueue(queueName, exchangeName, '#'); + + channel.publish(exchangeName, routingKey, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); + + describe('confirm channel links config', () => { + let confirmChannel: amqp.ConfirmChannel & { + [CHANNEL_CLOSED_IN_TEST]?: boolean; + }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + confirmChannel = await conn.createConfirmChannel(); + await confirmChannel.assertQueue(queueName, { durable: false }); + await confirmChannel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!confirmChannel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + confirmChannel.on('close', resolve); + confirmChannel.close(); + }); + } catch {} + } + }); + + it('simple publish with confirm and consume from queue', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await confirmChannel.assertExchange(exchangeName, 'topic', { + durable: false, + }); + + const { queue: queueName } = await confirmChannel.assertQueue('', { + durable: false, + }); + await confirmChannel.bindQueue(queueName, exchangeName, '#'); + + await asyncConfirmPublish( + confirmChannel, + exchangeName, + routingKey, + msgPayload + ); + + await asyncConsume(confirmChannel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); });