Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Patching Mechanism with AWS SDK telemetry improvements #13

Merged
merged 11 commits into from
Aug 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ export const AWS_ATTRIBUTE_KEYS: { [key: string]: string } = {
// Used for JavaScript workaround - attribute for pre-calculated value of isLocalRoot
AWS_IS_LOCAL_ROOT: 'aws.is.local.root',

// Naming divergence from Java/Python
// AWS_#_NAME attributes are not supported in JavaScript as they are not part of the Semantic Conventions.
// TODO:Move to Semantic Conventions when these attributes are added.
AWS_S3_BUCKET: 'aws.s3.bucket',
AWS_SQS_QUEUE_URL: 'aws.sqs.queue.url',
AWS_SQS_QUEUE_NAME: 'aws.sqs.queue.name',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { AttributeValue } from '@opentelemetry/api';
import { Instrumentation } from '@opentelemetry/instrumentation';
import { AwsSdkInstrumentationConfig, NormalizedRequest } from '@opentelemetry/instrumentation-aws-sdk';
import { SEMATTRS_MESSAGING_URL } from '@opentelemetry/semantic-conventions';
import { AWS_ATTRIBUTE_KEYS } from '../aws-attribute-keys';
import { SqsUrlParser } from '../sqs-url-parser';
import { RequestMetadata } from '../third-party/otel/aws/services/ServiceExtension';
import { KinesisServiceExtension } from './aws/services/kinesis';
import { S3ServiceExtension } from './aws/services/s3';
Expand All @@ -30,31 +27,47 @@ export function applyInstrumentationPatches(instrumentations: Instrumentation[])
if (services) {
services.set('S3', new S3ServiceExtension());
services.set('Kinesis', new KinesisServiceExtension());
const sqsServiceExtension: any = services.get('SQS');
// It is not expected that `sqsServiceExtension` is undefined
if (sqsServiceExtension) {
const requestPreSpanHook = sqsServiceExtension.requestPreSpanHook;
// Save original `requestPreSpanHook` under a similar name, to be invoked by the patched hook
sqsServiceExtension._requestPreSpanHook = requestPreSpanHook;
// The patched hook will populate the 'aws.sqs.queue.url' and 'aws.sqs.queue.name' attributes according to spec
// from the 'messaging.url' attribute
const patchedRequestPreSpanHook = (
request: NormalizedRequest,
_config: AwsSdkInstrumentationConfig
): RequestMetadata => {
const requestMetadata: RequestMetadata = sqsServiceExtension._requestPreSpanHook(request, _config);
// It is not expected that `requestMetadata.spanAttributes` can possibly be undefined, but still be careful anyways
if (requestMetadata.spanAttributes) {
const queueUrl: AttributeValue | undefined = requestMetadata.spanAttributes[SEMATTRS_MESSAGING_URL];
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL] = queueUrl;
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME] =
SqsUrlParser.getQueueName(queueUrl);
}
return requestMetadata;
};
sqsServiceExtension.requestPreSpanHook = patchedRequestPreSpanHook;
}
patchSqsServiceExtension(services.get('SQS'));
}
}
});
}

/*
* This patch extends the existing upstream extension for SQS. Extensions allow for custom logic for adding
* service-specific information to spans, such as attributes. Specifically, we are adding logic to add
* `aws.sqs.queue.url` and `aws.sqs.queue.name` attributes, to be used to generate RemoteTarget and achieve parity
* with the Java/Python instrumentation.
*
* Callout that today, the upstream logic adds `messaging.url` and `messaging.destination` but we feel that
* `aws.sqs` is more in line with existing AWS Semantic Convention attributes like `AWS_S3_BUCKET`, etc.
*
* @param sqsServiceExtension SQS Service Extension obtained the service extension list from the AWS SDK OTel Instrumentation
*/
function patchSqsServiceExtension(sqsServiceExtension: any): void {
// It is not expected that `sqsServiceExtension` is undefined
if (sqsServiceExtension) {
const requestPreSpanHook = sqsServiceExtension.requestPreSpanHook;
// Save original `requestPreSpanHook` under a similar name, to be invoked by the patched hook
sqsServiceExtension._requestPreSpanHook = requestPreSpanHook;
// The patched hook will populate the 'aws.sqs.queue.url' and 'aws.sqs.queue.name' attributes according to spec
// from the 'messaging.url' attribute
const patchedRequestPreSpanHook = (
request: NormalizedRequest,
_config: AwsSdkInstrumentationConfig
): RequestMetadata => {
const requestMetadata: RequestMetadata = sqsServiceExtension._requestPreSpanHook(request, _config);
// It is not expected that `requestMetadata.spanAttributes` can possibly be undefined, but still be careful anyways
if (requestMetadata.spanAttributes) {
if (request.commandInput?.QueueUrl) {
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL] = request.commandInput.QueueUrl;
}
if (request.commandInput?.QueueName) {
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME] = request.commandInput.QueueName;
}
}
return requestMetadata;
};
sqsServiceExtension.requestPreSpanHook = patchedRequestPreSpanHook;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ describe('Kinesis', () => {
secretAccessKey: 'abcde',
},
});

nock(`https://kinesis.${region}.amazonaws.com`).post('/').reply(200, {});
});

describe('DescribeStream', () => {
it('adds Stream Name', async () => {
const dummyStreamName: string = 'dummy-stream-name';

nock(`https://kinesis.${region}.amazonaws.com`).post('/').reply(200, {});

await kinesis
.describeStream({
StreamName: dummyStreamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ describe('S3', () => {
secretAccessKey: 'abcde',
},
});

nock(`https://s3.${region}.amazonaws.com`).post('/').reply(200, {});
});

describe('ListObjects', () => {
Expand Down
thpierce marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ describe('InstrumentationPatchTest', () => {
const services: Map<string, any> = extractServicesFromAwsSdkInstrumentation(unpatchedAwsSdkInstrumentation);
expect(() => doExtractSqsAttributes(services)).not.toThrow();

const sqsAttributes: Attributes = doExtractSqsAttributes(services);
let sqsAttributes: Attributes = doExtractSqsAttributes(services, false);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toBeUndefined();
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]).toBeUndefined();

sqsAttributes = doExtractSqsAttributes(services, true);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toBeUndefined();
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]).toBeUndefined();
});
Expand All @@ -97,9 +101,15 @@ describe('InstrumentationPatchTest', () => {
it('SQS with patching', () => {
const patchedAwsSdkInstrumentation: AwsInstrumentation = extractAwsSdkInstrumentation(PATCHED_INSTRUMENTATIONS);
const services: Map<string, any> = extractServicesFromAwsSdkInstrumentation(patchedAwsSdkInstrumentation);
const sqsAttributes: Attributes = doExtractSqsAttributes(services);
const sqsAttributes: Attributes = doExtractSqsAttributes(services, false);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toEqual(_QUEUE_URL);
});
thpierce marked this conversation as resolved.
Show resolved Hide resolved

it('SQS with patching if Queue Name was available (but is not)', () => {
const patchedAwsSdkInstrumentation: AwsInstrumentation = extractAwsSdkInstrumentation(PATCHED_INSTRUMENTATIONS);
const services: Map<string, any> = extractServicesFromAwsSdkInstrumentation(patchedAwsSdkInstrumentation);
const sqsAttributes: Attributes = doExtractSqsAttributes(services, true);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toEqual(_QUEUE_URL);
thpierce marked this conversation as resolved.
Show resolved Hide resolved
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]).toEqual(_QUEUE_NAME);
});

function extractAwsSdkInstrumentation(instrumentations: Instrumentation[]): AwsInstrumentation {
Expand Down Expand Up @@ -144,7 +154,10 @@ describe('InstrumentationPatchTest', () => {
return doExtractAttributes(services, serviceName, params);
}

function doExtractSqsAttributes(services: Map<string, ServiceExtension>): Attributes {
function doExtractSqsAttributes(
services: Map<string, ServiceExtension>,
includeQueueName: boolean = false
): Attributes {
const serviceName: string = 'SQS';
const params: NormalizedRequest = {
serviceName: serviceName,
Expand All @@ -153,6 +166,9 @@ describe('InstrumentationPatchTest', () => {
QueueUrl: _QUEUE_URL,
},
};
if (includeQueueName) {
params.commandInput.QueueName = _QUEUE_NAME;
}
return doExtractAttributes(services, serviceName, params);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import { spawnSync, SpawnSyncReturns } from 'child_process';

// The OpenTelemetry Authors code
describe('RegisterPatch', function () {
it('Correctly applies AWS SDK Patches and generates expected attributes for S3 Client call', () => {
it('Correctly applies AWS SDK Patches and generates expected attributes for S3, Kinesis, and SQS Client calls', () => {
const proc: SpawnSyncReturns<Buffer> = spawnSync(
process.execPath,
['--require', '../build/src/register.js', './test-app/app-aws-sdk-s3-call.js'],
['--require', '../build/src/register.js', './test-app/app-aws-sdk-client-calls.js'],
{
cwd: __dirname,
timeout: 10000,
Expand Down Expand Up @@ -57,45 +57,6 @@ describe('RegisterPatch', function () {
proc.stdout.includes("'aws.remote.resource.identifier': 'test-bucket-not-exists'"),
'console span output in stdout - validate aws.remote.resource.identifier'
);
});

it('Correctly applies AWS SDK Patches and generates expected attributes for Kinesis Client call', () => {
const proc: SpawnSyncReturns<Buffer> = spawnSync(
process.execPath,
['--require', '../build/src/register.js', './test-app/app-aws-sdk-kinesis-call.js'],
{
cwd: __dirname,
timeout: 10000,
killSignal: 'SIGKILL', // SIGTERM is not sufficient to terminate some hangs
env: Object.assign({}, process.env, {
OTEL_NODE_RESOURCE_DETECTORS: 'none',
OTEL_TRACES_EXPORTER: 'console',
// nx (used by lerna run) defaults `FORCE_COLOR=true`, which in
// node v18.17.0, v20.3.0 and later results in ANSI color escapes
// in the ConsoleSpanExporter output that is checked below.
FORCE_COLOR: '0',

OTEL_LOG_LEVEL: 'ALL',
OTEL_TRACES_SAMPLER: 'always_on',
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://localhost:4316/v1/traces',
OTEL_RESOURCE_ATTRIBUTES: 'service.name=test-adot-sdk-ec2-service-name',
OTEL_AWS_APPLICATION_SIGNALS_ENABLED: 'true',
OTEL_NODE_DISABLED_INSTRUMENTATIONS: 'fs',
}),
}
);
assert.ifError(proc.error);
assert.equal(proc.status, 0, `proc.status (${proc.status})`);
assert.equal(proc.signal, null, `proc.signal (${proc.signal})`);

assert.ok(proc.stdout.includes('AWS Distro of OpenTelemetry automatic instrumentation started successfully'));
assert.ok(proc.stdout.includes("Environment variable OTEL_EXPORTER_OTLP_PROTOCOL is set to 'http/protobuf'"));
assert.ok(proc.stdout.includes("Environment variable OTEL_PROPAGATORS is set to 'xray,tracecontext,b3,b3multi'"));

assert.ok(
proc.stdout.includes("'service.name': 'test-adot-sdk-ec2-service-name'"),
'console span output in stdout - validate service.name'
);

assert.ok(
proc.stdout.includes("'aws.kinesis.stream.name': 'my-kinesis-stream'"),
Expand All @@ -109,59 +70,11 @@ describe('RegisterPatch', function () {
proc.stdout.includes("'aws.remote.resource.identifier': 'my-kinesis-stream'"),
'console span output in stdout - validate aws.remote.resource.identifier'
);
});

it('Correctly applies AWS SDK Patches and generates expected attributes for SQS Client call', () => {
const proc: SpawnSyncReturns<Buffer> = spawnSync(
process.execPath,
['--require', '../build/src/register.js', './test-app/app-aws-sdk-sqs-call.js'],
{
cwd: __dirname,
timeout: 10000,
killSignal: 'SIGKILL', // SIGTERM is not sufficient to terminate some hangs
env: Object.assign({}, process.env, {
OTEL_NODE_RESOURCE_DETECTORS: 'none',
OTEL_TRACES_EXPORTER: 'console',
// nx (used by lerna run) defaults `FORCE_COLOR=true`, which in
// node v18.17.0, v20.3.0 and later results in ANSI color escapes
// in the ConsoleSpanExporter output that is checked below.
FORCE_COLOR: '0',

OTEL_LOG_LEVEL: 'ALL',
OTEL_TRACES_SAMPLER: 'always_on',
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://localhost:4316/v1/traces',
OTEL_RESOURCE_ATTRIBUTES: 'service.name=test-adot-sdk-ec2-service-name',
OTEL_AWS_APPLICATION_SIGNALS_ENABLED: 'true',
OTEL_NODE_DISABLED_INSTRUMENTATIONS: 'fs',
}),
}
);
assert.ifError(proc.error);
assert.equal(proc.status, 0, `proc.status (${proc.status})`);
assert.equal(proc.signal, null, `proc.signal (${proc.signal})`);

assert.ok(proc.stdout.includes('AWS Distro of OpenTelemetry automatic instrumentation started successfully'));
assert.ok(proc.stdout.includes("Environment variable OTEL_EXPORTER_OTLP_PROTOCOL is set to 'http/protobuf'"));
assert.ok(proc.stdout.includes("Environment variable OTEL_PROPAGATORS is set to 'xray,tracecontext,b3,b3multi'"));

assert.ok(
proc.stdout.includes("'service.name': 'test-adot-sdk-ec2-service-name'"),
'console span output in stdout - validate service.name'
);

assert.ok(
proc.stdout.includes("'aws.sqs.queue.name': 'sqs-queue-name'"),
'console span output in stdout - validate aws.sqs.queue.name'
);
assert.ok(
proc.stdout.includes("'aws.sqs.queue.url': 'https://sqs.us-east-1.amazonaws.com/012345678910/sqs-queue-name'"),
'console span output in stdout - validate aws.sqs.queue.url'
);

assert.ok(
proc.stdout.includes("'messaging.url': 'https://sqs.us-east-1.amazonaws.com/012345678910/sqs-queue-name'"),
'console span output in stdout - validate messaging.url'
);
assert.ok(
proc.stdout.includes("'aws.remote.resource.type': 'AWS::SQS::Queue'"),
'console span output in stdout - validate aws.remote.resource.type'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// Used in register.patch.test.ts to mimic a JS app using SQS client of AWS SDK for JS (v3).
const { S3Client, ListObjectsCommand } = require("@aws-sdk/client-s3");
const { KinesisClient, ListStreamsCommand } = require('@aws-sdk/client-kinesis');
const { SQSClient, GetQueueAttributesCommand } = require("@aws-sdk/client-sqs");

const s3Client = new S3Client({});
const bucketName = "test-bucket-not-exists";

const kinesisClient = new KinesisClient({});
const streamName = "my-kinesis-stream";

const sqsClient = new SQSClient({});
const queueUrl = "https://sqs.us-east-1.amazonaws.com/012345678910/sqs-queue-name";

const awsSdkClientSendPromises = [
s3Client.send(
new ListObjectsCommand({
Bucket: bucketName
})
),
kinesisClient.send(
new ListStreamsCommand({
StreamName: streamName,
})
),
sqsClient.send(
new GetQueueAttributesCommand({
QueueUrl: queueUrl
})
),
]

Promise.all(awsSdkClientSendPromises).catch(e => {
console.error("Exception thrown", e.message);
});

This file was deleted.

This file was deleted.

This file was deleted.

Loading