From 5f2c160ac0fd4f6aa76c227537c34329c6c9dfb2 Mon Sep 17 00:00:00 2001 From: McSick Date: Thu, 11 Jul 2024 11:56:12 -0400 Subject: [PATCH] feat: Add the ability to use span links when consuming a message amqp plugin (#1972) A config option was added that when set to true, it follows the spec behavior to link to the produce context instead of continuing the context. The default behavior will be as it is today, so one has to opt-in to this change via the config setting. * Add the ability to use span links in AMQP plugin * add default behavior * Add in tests for useLinks option * Updating tests to correct Semantic attributes --------- Co-authored-by: Jamie Danielson --- .../node/instrumentation-amqplib/README.md | 32 +- .../instrumentation-amqplib/src/amqplib.ts | 28 +- .../node/instrumentation-amqplib/src/types.ts | 4 + .../test/amqplib-callbacks.test.ts | 266 +++++++++++- .../test/amqplib-promise.test.ts | 405 ++++++++++++++++++ 5 files changed, 724 insertions(+), 11 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/README.md b/plugins/node/instrumentation-amqplib/README.md index f2c7e90bd1..eb3553ce9d 100644 --- a/plugins/node/instrumentation-amqplib/README.md +++ b/plugins/node/instrumentation-amqplib/README.md @@ -40,6 +40,7 @@ registerInstrumentations({ // publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { }, // consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { }, // consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { }, + // useLinksForConsume: boolean, }), ], }) @@ -49,13 +50,14 @@ registerInstrumentations({ amqplib instrumentation has few options available to choose from. You can set the following: -| Options | Type | Description | -| --------------------------------- | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | -| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. | -| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. | -| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. | -| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. | -| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below | +| Options | Type | Description | +| -------------------- | ---------------------------------------------- | ----------------------------------------------------------------------------------- | +| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. | +| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. | +| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. | +| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. | +| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below | +| `useLinksForConsume` | `boolean` | read [Links for Consume](#links-for-consume) below | ### Consume Timeout @@ -69,6 +71,22 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout' Default is 1 minute +### Links for Consume + +By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec. + +Default is false + +## Running Tests Locally + +To run the tests locally, you need to have a RabbitMQ server running. You can use the following command to start a RabbitMQ server using Docker: + +```bash +npm run test:docker:run +``` + +By default, the tests that connect to RabbitMQ are skipped. To make sure these tests are run, you can set the `RUN_RABBIT_TESTS` environment variable to `true` + ## Semantic Conventions This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md) diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index b5050f1396..54fcf43afa 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -22,6 +22,8 @@ import { SpanKind, SpanStatusCode, ROOT_CONTEXT, + Link, + Context, } from '@opentelemetry/api'; import { hrTime, @@ -414,8 +416,25 @@ export class AmqplibInstrumentation extends InstrumentationBase { } const headers = msg.properties.headers ?? {}; - const parentContext = propagation.extract(ROOT_CONTEXT, headers); + let parentContext: Context | undefined = propagation.extract( + ROOT_CONTEXT, + headers + ); const exchange = msg.fields?.exchange; + let links: Link[] | undefined; + if (self._config.useLinksForConsume) { + const parentSpanContext = parentContext + ? trace.getSpan(parentContext)?.spanContext() + : undefined; + parentContext = undefined; + if (parentSpanContext) { + links = [ + { + context: parentSpanContext, + }, + ]; + } + } const span = self.tracer.startSpan( `${queue} process`, { @@ -431,6 +450,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { [SEMATTRS_MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId, }, + links, }, parentContext ); @@ -457,8 +477,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { // store the span on the message, so we can end it when user call 'ack' on it msg[MESSAGE_STORED_SPAN] = span; } - - context.with(trace.setSpan(parentContext, span), () => { + const setContext: Context = parentContext + ? parentContext + : ROOT_CONTEXT; + context.with(trace.setSpan(setContext, span), () => { onMessage.call(this, msg); }); diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 2e82b471b5..55a80fdba3 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -96,10 +96,14 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig { * Default is 1 minute */ consumeTimeoutMs?: number; + + /** option to use a span link for the consume message instead of continuing a trace */ + useLinksForConsume?: boolean; } export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { consumeTimeoutMs: 1000 * 60, // 1 minute + useLinksForConsume: false, }; // The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts index 533b5d9942..a323cebee9 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts @@ -21,7 +21,9 @@ import { registerInstrumentationTesting, } from '@opentelemetry/contrib-test-utils'; -registerInstrumentationTesting(new AmqplibInstrumentation()); +const instrumentation = registerInstrumentationTesting( + new AmqplibInstrumentation() +); import * as amqpCallback from 'amqplib/callback_api'; import { @@ -401,4 +403,266 @@ describe('amqplib instrumentation callback model', () => { }); }); }); + + describe('channel with links config', () => { + let channel: amqpCallback.Channel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createChannel( + context.bind(context.active(), (err, c) => { + channel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', () => {}); + channel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + channel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + channel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + + describe('confirm channel with links config', () => { + let confirmChannel: amqpCallback.ConfirmChannel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createConfirmChannel( + context.bind(context.active(), (err, c) => { + confirmChannel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', () => {}); + confirmChannel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + confirmChannel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + confirmChannel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => { + asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION] + ).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION] + ).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + }); }); diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts index 2205d76a5a..6df63296a1 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts @@ -1246,4 +1246,409 @@ describe('amqplib instrumentation promise model', () => { }); }); }); + describe('channel using links config', () => { + let channel: amqp.Channel & { [CHANNEL_CLOSED_IN_TEST]?: boolean }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + channel = await conn.createChannel(); + await channel.assertQueue(queueName, { durable: false }); + await channel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!channel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + channel.on('close', resolve); + channel.close(); + }); + } catch {} + } + }); + + it('simple publish and consume from queue', async () => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + await asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await channel.assertExchange(exchangeName, 'topic', { durable: false }); + + const { queue: queueName } = await channel.assertQueue('', { + durable: false, + }); + await channel.bindQueue(queueName, exchangeName, '#'); + + channel.publish(exchangeName, routingKey, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); + + describe('confirm channel links config', () => { + let confirmChannel: amqp.ConfirmChannel & { + [CHANNEL_CLOSED_IN_TEST]?: boolean; + }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + confirmChannel = await conn.createConfirmChannel(); + await confirmChannel.assertQueue(queueName, { durable: false }); + await confirmChannel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!confirmChannel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + confirmChannel.on('close', resolve); + confirmChannel.close(); + }); + } catch {} + } + }); + + it('simple publish with confirm and consume from queue', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await confirmChannel.assertExchange(exchangeName, 'topic', { + durable: false, + }); + + const { queue: queueName } = await confirmChannel.assertQueue('', { + durable: false, + }); + await confirmChannel.bindQueue(queueName, exchangeName, '#'); + + await asyncConfirmPublish( + confirmChannel, + exchangeName, + routingKey, + msgPayload + ); + + await asyncConsume(confirmChannel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); });