diff --git a/plugins/node/instrumentation-amqplib/README.md b/plugins/node/instrumentation-amqplib/README.md index f2c7e90bd1..eb3553ce9d 100644 --- a/plugins/node/instrumentation-amqplib/README.md +++ b/plugins/node/instrumentation-amqplib/README.md @@ -40,6 +40,7 @@ registerInstrumentations({ // publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { }, // consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { }, // consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { }, + // useLinksForConsume: boolean, }), ], }) @@ -49,13 +50,14 @@ registerInstrumentations({ amqplib instrumentation has few options available to choose from. You can set the following: -| Options | Type | Description | -| --------------------------------- | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | -| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. | -| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. | -| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. | -| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. | -| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below | +| Options | Type | Description | +| -------------------- | ---------------------------------------------- | ----------------------------------------------------------------------------------- | +| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. | +| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. | +| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. | +| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. | +| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below | +| `useLinksForConsume` | `boolean` | read [Links for Consume](#links-for-consume) below | ### Consume Timeout @@ -69,6 +71,22 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout' Default is 1 minute +### Links for Consume + +By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec. + +Default is false + +## Running Tests Locally + +To run the tests locally, you need to have a RabbitMQ server running. You can use the following command to start a RabbitMQ server using Docker: + +```bash +npm run test:docker:run +``` + +By default, the tests that connect to RabbitMQ are skipped. To make sure these tests are run, you can set the `RUN_RABBIT_TESTS` environment variable to `true` + ## Semantic Conventions This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md) diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index b5050f1396..54fcf43afa 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -22,6 +22,8 @@ import { SpanKind, SpanStatusCode, ROOT_CONTEXT, + Link, + Context, } from '@opentelemetry/api'; import { hrTime, @@ -414,8 +416,25 @@ export class AmqplibInstrumentation extends InstrumentationBase { } const headers = msg.properties.headers ?? {}; - const parentContext = propagation.extract(ROOT_CONTEXT, headers); + let parentContext: Context | undefined = propagation.extract( + ROOT_CONTEXT, + headers + ); const exchange = msg.fields?.exchange; + let links: Link[] | undefined; + if (self._config.useLinksForConsume) { + const parentSpanContext = parentContext + ? trace.getSpan(parentContext)?.spanContext() + : undefined; + parentContext = undefined; + if (parentSpanContext) { + links = [ + { + context: parentSpanContext, + }, + ]; + } + } const span = self.tracer.startSpan( `${queue} process`, { @@ -431,6 +450,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { [SEMATTRS_MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId, }, + links, }, parentContext ); @@ -457,8 +477,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { // store the span on the message, so we can end it when user call 'ack' on it msg[MESSAGE_STORED_SPAN] = span; } - - context.with(trace.setSpan(parentContext, span), () => { + const setContext: Context = parentContext + ? parentContext + : ROOT_CONTEXT; + context.with(trace.setSpan(setContext, span), () => { onMessage.call(this, msg); }); diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 2e82b471b5..55a80fdba3 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -96,10 +96,14 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig { * Default is 1 minute */ consumeTimeoutMs?: number; + + /** option to use a span link for the consume message instead of continuing a trace */ + useLinksForConsume?: boolean; } export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { consumeTimeoutMs: 1000 * 60, // 1 minute + useLinksForConsume: false, }; // The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts index 533b5d9942..a323cebee9 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts @@ -21,7 +21,9 @@ import { registerInstrumentationTesting, } from '@opentelemetry/contrib-test-utils'; -registerInstrumentationTesting(new AmqplibInstrumentation()); +const instrumentation = registerInstrumentationTesting( + new AmqplibInstrumentation() +); import * as amqpCallback from 'amqplib/callback_api'; import { @@ -401,4 +403,266 @@ describe('amqplib instrumentation callback model', () => { }); }); }); + + describe('channel with links config', () => { + let channel: amqpCallback.Channel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createChannel( + context.bind(context.active(), (err, c) => { + channel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', () => {}); + channel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + channel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + channel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + + describe('confirm channel with links config', () => { + let confirmChannel: amqpCallback.ConfirmChannel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createConfirmChannel( + context.bind(context.active(), (err, c) => { + confirmChannel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', () => {}); + confirmChannel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + confirmChannel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + confirmChannel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => { + asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION] + ).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION] + ).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + }); }); diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts index 2205d76a5a..6df63296a1 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-promise.test.ts @@ -1246,4 +1246,409 @@ describe('amqplib instrumentation promise model', () => { }); }); }); + describe('channel using links config', () => { + let channel: amqp.Channel & { [CHANNEL_CLOSED_IN_TEST]?: boolean }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + channel = await conn.createChannel(); + await channel.assertQueue(queueName, { durable: false }); + await channel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!channel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + channel.on('close', resolve); + channel.close(); + }); + } catch {} + } + }); + + it('simple publish and consume from queue', async () => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + await asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await channel.assertExchange(exchangeName, 'topic', { durable: false }); + + const { queue: queueName } = await channel.assertQueue('', { + durable: false, + }); + await channel.bindQueue(queueName, exchangeName, '#'); + + channel.publish(exchangeName, routingKey, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); + + describe('confirm channel links config', () => { + let confirmChannel: amqp.ConfirmChannel & { + [CHANNEL_CLOSED_IN_TEST]?: boolean; + }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + confirmChannel = await conn.createConfirmChannel(); + await confirmChannel.assertQueue(queueName, { durable: false }); + await confirmChannel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!confirmChannel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + confirmChannel.on('close', resolve); + confirmChannel.close(); + }); + } catch {} + } + }); + + it('simple publish with confirm and consume from queue', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + '' + ); // according to spec: "This will be an empty string if the default exchange is used" + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(queueName); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual( + censoredUrl + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual( + TEST_RABBITMQ_HOST + ); + expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual( + TEST_RABBITMQ_PORT + ); + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await confirmChannel.assertExchange(exchangeName, 'topic', { + durable: false, + }); + + const { queue: queueName } = await confirmChannel.assertQueue('', { + durable: false, + }); + await confirmChannel.bindQueue(queueName, exchangeName, '#'); + + await asyncConfirmPublish( + confirmChannel, + exchangeName, + routingKey, + msgPayload + ); + + await asyncConsume(confirmChannel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual( + 'rabbitmq' + ); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual( + exchangeName + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND] + ).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY] + ).toEqual(routingKey); + expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual( + 'AMQP' + ); + expect( + consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION] + ).toEqual('0.9.1'); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); });