Skip to content

Commit

Permalink
refactor(instr-amqplib): use exported strings for attributes (#2086)
Browse files Browse the repository at this point in the history
Refs: #2025
  • Loading branch information
david-luna authored Apr 15, 2024
1 parent 1af4901 commit 21745de
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 42 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions plugins/node/instrumentation-amqplib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ The instrumentation's config `publishHook`, `publishConfirmHook`, `consumeHook`

The `moduleVersionAttributeName` config option is removed. To add the amqplib package version to spans, use the `moduleVersion` attribute in hook info for `publishHook` and `consumeHook` functions.

## Semantic Conventions

This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)

Attributes collected:

| Attribute | Short Description |
| -------------------------------- | ---------------------------------------------------------------------- |
| `messaging.destination` | The message destination name. |
| `messaging.destination_kind` | The kind of message destination. |
| `messaging.rabbitmq.routing_key` | RabbitMQ message routing key. |
| `messaging.operation` | A string identifying the kind of message consumption. |
| `messaging.message_id` | A value used by the messaging system as an identifier for the message. |
| `messaging.conversation_id` | The ID identifying the conversation to which the message belongs. |
| `messaging.protocol` | The name of the transport protocol. |
| `messaging.protocol_version` | The version of the transport protocol. |
| `messaging.system` | A string identifying the messaging system. |
| `messaging.url` | The connection string. |
| `net.peer.name` | Remote hostname or similar. |
| `net.peer.port` | Remote port number. |

## Useful links

- For more information on OpenTelemetry, visit: <https://opentelemetry.io/>
Expand Down
2 changes: 1 addition & 1 deletion plugins/node/instrumentation-amqplib/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"dependencies": {
"@opentelemetry/core": "^1.8.0",
"@opentelemetry/instrumentation": "^0.50.0",
"@opentelemetry/semantic-conventions": "^1.0.0"
"@opentelemetry/semantic-conventions": "^1.22.0"
},
"devDependencies": {
"@opentelemetry/api": "^1.3.0",
Expand Down
41 changes: 21 additions & 20 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ import {
safeExecuteInTheMiddle,
} from '@opentelemetry/instrumentation';
import {
SemanticAttributes,
MessagingOperationValues,
MessagingDestinationKindValues,
SEMATTRS_MESSAGING_DESTINATION,
SEMATTRS_MESSAGING_DESTINATION_KIND,
MESSAGINGDESTINATIONKINDVALUES_TOPIC,
SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY,
SEMATTRS_MESSAGING_OPERATION,
MESSAGINGOPERATIONVALUES_PROCESS,
SEMATTRS_MESSAGING_MESSAGE_ID,
SEMATTRS_MESSAGING_CONVERSATION_ID,
} from '@opentelemetry/semantic-conventions';
import type {
Connection,
Expand Down Expand Up @@ -415,16 +420,13 @@ export class AmqplibInstrumentation extends InstrumentationBase {
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
[SemanticAttributes.MESSAGING_DESTINATION]: exchange,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.TOPIC,
[SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]:
msg.fields?.routingKey,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
[SemanticAttributes.MESSAGING_MESSAGE_ID]:
msg?.properties.messageId,
[SemanticAttributes.MESSAGING_CONVERSATION_ID]:
[SEMATTRS_MESSAGING_DESTINATION]: exchange,
[SEMATTRS_MESSAGING_DESTINATION_KIND]:
MESSAGINGDESTINATIONKINDVALUES_TOPIC,
[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey,
[SEMATTRS_MESSAGING_OPERATION]: MESSAGINGOPERATIONVALUES_PROCESS,
[SEMATTRS_MESSAGING_MESSAGE_ID]: msg?.properties.messageId,
[SEMATTRS_MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
},
Expand Down Expand Up @@ -636,13 +638,12 @@ export class AmqplibInstrumentation extends InstrumentationBase {
kind: SpanKind.PRODUCER,
attributes: {
...channel.connection[CONNECTION_ATTRIBUTES],
[SemanticAttributes.MESSAGING_DESTINATION]: exchange,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.TOPIC,
[SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]: routingKey,
[SemanticAttributes.MESSAGING_MESSAGE_ID]: options?.messageId,
[SemanticAttributes.MESSAGING_CONVERSATION_ID]:
options?.correlationId,
[SEMATTRS_MESSAGING_DESTINATION]: exchange,
[SEMATTRS_MESSAGING_DESTINATION_KIND]:
MESSAGINGDESTINATIONKINDVALUES_TOPIC,
[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: routingKey,
[SEMATTRS_MESSAGING_MESSAGE_ID]: options?.messageId,
[SEMATTRS_MESSAGING_CONVERSATION_ID]: options?.correlationId,
},
}
);
Expand Down
45 changes: 26 additions & 19 deletions plugins/node/instrumentation-amqplib/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ import {
diag,
HrTime,
Span,
SpanAttributes,
SpanAttributeValue,
Attributes,
AttributeValue,
} from '@opentelemetry/api';
import { SemanticAttributes } from '@opentelemetry/semantic-conventions';
import {
SEMATTRS_MESSAGING_PROTOCOL,
SEMATTRS_MESSAGING_PROTOCOL_VERSION,
SEMATTRS_MESSAGING_SYSTEM,
SEMATTRS_MESSAGING_URL,
SEMATTRS_NET_PEER_NAME,
SEMATTRS_NET_PEER_PORT,
} from '@opentelemetry/semantic-conventions';
import type * as amqp from 'amqplib';

export const MESSAGE_STORED_SPAN: unique symbol = Symbol(
Expand All @@ -41,9 +48,9 @@ export const CONNECTION_ATTRIBUTES: unique symbol = Symbol(
export type InstrumentationPublishChannel = (
| amqp.Channel
| amqp.ConfirmChannel
) & { connection: { [CONNECTION_ATTRIBUTES]: SpanAttributes } };
) & { connection: { [CONNECTION_ATTRIBUTES]: Attributes } };
export type InstrumentationConsumeChannel = amqp.Channel & {
connection: { [CONNECTION_ATTRIBUTES]: SpanAttributes };
connection: { [CONNECTION_ATTRIBUTES]: Attributes };
[CHANNEL_SPANS_NOT_ENDED]?: {
msg: amqp.ConsumeMessage;
timeOfConsume: HrTime;
Expand Down Expand Up @@ -93,9 +100,9 @@ const getHostname = (hostnameFromUrl: string | undefined): string => {
const extractConnectionAttributeOrLog = (
url: string | amqp.Options.Connect,
attributeKey: string,
attributeValue: SpanAttributeValue,
attributeValue: AttributeValue,
nameForLog: string
): SpanAttributes => {
): Attributes => {
if (attributeValue) {
return { [attributeKey]: attributeValue };
} else {
Expand All @@ -111,11 +118,11 @@ const extractConnectionAttributeOrLog = (

export const getConnectionAttributesFromServer = (
conn: amqp.Connection['connection']
): SpanAttributes => {
): Attributes => {
const product = conn.serverProperties.product?.toLowerCase?.();
if (product) {
return {
[SemanticAttributes.MESSAGING_SYSTEM]: product,
[SEMATTRS_MESSAGING_SYSTEM]: product,
};
} else {
return {};
Expand All @@ -124,9 +131,9 @@ export const getConnectionAttributesFromServer = (

export const getConnectionAttributesFromUrl = (
url: string | amqp.Options.Connect
): SpanAttributes => {
const attributes: SpanAttributes = {
[SemanticAttributes.MESSAGING_PROTOCOL_VERSION]: '0.9.1', // this is the only protocol supported by the instrumented library
): Attributes => {
const attributes: Attributes = {
[SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', // this is the only protocol supported by the instrumented library
};

url = url || 'amqp://localhost';
Expand All @@ -137,7 +144,7 @@ export const getConnectionAttributesFromUrl = (
Object.assign(attributes, {
...extractConnectionAttributeOrLog(
url,
SemanticAttributes.MESSAGING_PROTOCOL,
SEMATTRS_MESSAGING_PROTOCOL,
protocol,
'protocol'
),
Expand All @@ -147,7 +154,7 @@ export const getConnectionAttributesFromUrl = (
Object.assign(attributes, {
...extractConnectionAttributeOrLog(
url,
SemanticAttributes.NET_PEER_NAME,
SEMATTRS_NET_PEER_NAME,
hostname,
'hostname'
),
Expand All @@ -157,22 +164,22 @@ export const getConnectionAttributesFromUrl = (
Object.assign(attributes, {
...extractConnectionAttributeOrLog(
url,
SemanticAttributes.NET_PEER_PORT,
SEMATTRS_NET_PEER_PORT,
port,
'port'
),
});
} else {
const censoredUrl = censorPassword(url);
attributes[SemanticAttributes.MESSAGING_URL] = censoredUrl;
attributes[SEMATTRS_MESSAGING_URL] = censoredUrl;
try {
const urlParts = new URL(censoredUrl);

const protocol = getProtocol(urlParts.protocol);
Object.assign(attributes, {
...extractConnectionAttributeOrLog(
censoredUrl,
SemanticAttributes.MESSAGING_PROTOCOL,
SEMATTRS_MESSAGING_PROTOCOL,
protocol,
'protocol'
),
Expand All @@ -182,7 +189,7 @@ export const getConnectionAttributesFromUrl = (
Object.assign(attributes, {
...extractConnectionAttributeOrLog(
censoredUrl,
SemanticAttributes.NET_PEER_NAME,
SEMATTRS_NET_PEER_NAME,
hostname,
'hostname'
),
Expand All @@ -195,7 +202,7 @@ export const getConnectionAttributesFromUrl = (
Object.assign(attributes, {
...extractConnectionAttributeOrLog(
censoredUrl,
SemanticAttributes.NET_PEER_PORT,
SEMATTRS_NET_PEER_PORT,
port,
'port'
),
Expand Down

0 comments on commit 21745de

Please sign in to comment.