Skip to content

Commit

Permalink
feat: Add the ability to use span links when consuming a message amqp…
Browse files Browse the repository at this point in the history
… plugin (#1972)

A config option was added that when set to true, it follows the spec behavior to link to the produce context instead of continuing the context. The default behavior will be as it is today, so one has to opt-in to this change via the config setting.

* Add the ability to use span links in AMQP plugin

* add default behavior

* Add in tests for useLinks option

* Updating tests to correct Semantic attributes

---------

Co-authored-by: Jamie Danielson <[email protected]>
  • Loading branch information
McSick and JamieDanielson authored Jul 11, 2024
1 parent 2c32e58 commit 5f2c160
Show file tree
Hide file tree
Showing 5 changed files with 724 additions and 11 deletions.
32 changes: 25 additions & 7 deletions plugins/node/instrumentation-amqplib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ registerInstrumentations({
// publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { },
// consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { },
// consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { },
// useLinksForConsume: boolean,
}),
],
})
Expand All @@ -49,13 +50,14 @@ registerInstrumentations({

amqplib instrumentation has few options available to choose from. You can set the following:

| Options | Type | Description |
| --------------------------------- | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- |
| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. |
| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. |
| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. |
| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. |
| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below |
| Options | Type | Description |
| -------------------- | ---------------------------------------------- | ----------------------------------------------------------------------------------- |
| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. |
| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. |
| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. |
| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. |
| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below |
| `useLinksForConsume` | `boolean` | read [Links for Consume](#links-for-consume) below |

### Consume Timeout

Expand All @@ -69,6 +71,22 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout'

Default is 1 minute

### Links for Consume

By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec.

Default is false

## Running Tests Locally

To run the tests locally, you need to have a RabbitMQ server running. You can use the following command to start a RabbitMQ server using Docker:

```bash
npm run test:docker:run
```

By default, the tests that connect to RabbitMQ are skipped. To make sure these tests are run, you can set the `RUN_RABBIT_TESTS` environment variable to `true`

## Semantic Conventions

This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)
Expand Down
28 changes: 25 additions & 3 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import {
SpanKind,
SpanStatusCode,
ROOT_CONTEXT,
Link,
Context,
} from '@opentelemetry/api';
import {
hrTime,
Expand Down Expand Up @@ -414,8 +416,25 @@ export class AmqplibInstrumentation extends InstrumentationBase {
}

const headers = msg.properties.headers ?? {};
const parentContext = propagation.extract(ROOT_CONTEXT, headers);
let parentContext: Context | undefined = propagation.extract(
ROOT_CONTEXT,
headers
);
const exchange = msg.fields?.exchange;
let links: Link[] | undefined;
if (self._config.useLinksForConsume) {
const parentSpanContext = parentContext
? trace.getSpan(parentContext)?.spanContext()
: undefined;
parentContext = undefined;
if (parentSpanContext) {
links = [
{
context: parentSpanContext,
},
];
}
}
const span = self.tracer.startSpan(
`${queue} process`,
{
Expand All @@ -431,6 +450,7 @@ export class AmqplibInstrumentation extends InstrumentationBase {
[SEMATTRS_MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
links,
},
parentContext
);
Expand All @@ -457,8 +477,10 @@ export class AmqplibInstrumentation extends InstrumentationBase {
// store the span on the message, so we can end it when user call 'ack' on it
msg[MESSAGE_STORED_SPAN] = span;
}

context.with(trace.setSpan(parentContext, span), () => {
const setContext: Context = parentContext
? parentContext
: ROOT_CONTEXT;
context.with(trace.setSpan(setContext, span), () => {
onMessage.call(this, msg);
});

Expand Down
4 changes: 4 additions & 0 deletions plugins/node/instrumentation-amqplib/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,14 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig {
* Default is 1 minute
*/
consumeTimeoutMs?: number;

/** option to use a span link for the consume message instead of continuing a trace */
useLinksForConsume?: boolean;
}

export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = {
consumeTimeoutMs: 1000 * 60, // 1 minute
useLinksForConsume: false,
};

// The following types are vendored from `@types/[email protected]` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510
Expand Down
Loading

0 comments on commit 5f2c160

Please sign in to comment.