diff --git a/plugins/node/opentelemetry-instrumentation-ioredis/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-ioredis/src/instrumentation.ts index 94f8b850ed..0753518de3 100644 --- a/plugins/node/opentelemetry-instrumentation-ioredis/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-ioredis/src/instrumentation.ts @@ -49,7 +49,11 @@ export class IORedisInstrumentation extends InstrumentationBase { new InstrumentationNodeModuleDefinition( 'ioredis', ['>1', '<6'], - (moduleExports, moduleVersion?: string) => { + (module, moduleVersion?: string) => { + const moduleExports = + module[Symbol.toStringTag] === 'Module' + ? module.default // ESM + : module; // CommonJS diag.debug('Applying patch for ioredis'); if (isWrapped(moduleExports.prototype.sendCommand)) { this._unwrap(moduleExports.prototype, 'sendCommand'); @@ -67,10 +71,14 @@ export class IORedisInstrumentation extends InstrumentationBase { 'connect', this._patchConnection() ); - return moduleExports; + return module; }, - moduleExports => { - if (moduleExports === undefined) return; + module => { + if (module === undefined) return; + const moduleExports = + module[Symbol.toStringTag] === 'Module' + ? module.default // ESM + : module; // CommonJS diag.debug('Removing patch for ioredis'); this._unwrap(moduleExports.prototype, 'sendCommand'); this._unwrap(moduleExports.prototype, 'connect'); @@ -84,17 +92,17 @@ export class IORedisInstrumentation extends InstrumentationBase { */ private _patchSendCommand(moduleVersion?: string) { return (original: Function) => { - return this.traceSendCommand(original, moduleVersion); + return this._traceSendCommand(original, moduleVersion); }; } private _patchConnection() { return (original: Function) => { - return this.traceConnection(original); + return this._traceConnection(original); }; } - private traceSendCommand = (original: Function, moduleVersion?: string) => { + private _traceSendCommand(original: Function, moduleVersion?: string) { const instrumentation = this; return function (this: RedisInterface, cmd?: IORedisCommand) { if (arguments.length < 1 || typeof cmd !== 'object') { @@ -178,9 +186,9 @@ export class IORedisInstrumentation extends InstrumentationBase { throw error; } }; - }; + } - private traceConnection = (original: Function) => { + private _traceConnection(original: Function) { const instrumentation = this; return function (this: RedisInterface) { const config = @@ -213,5 +221,5 @@ export class IORedisInstrumentation extends InstrumentationBase { throw error; } }; - }; + } } diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts index 587c0fe185..3fb4d2c75a 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import { AsyncResource } from 'async_hooks'; import { context, @@ -42,6 +43,7 @@ import { MongoInternalTopology, WireProtocolInternal, V4Connection, + V4ConnectionPool, } from './internal-types'; import { V4Connect, V4Session } from './internal-types'; import { VERSION } from './version'; @@ -76,6 +78,8 @@ export class MongoDBInstrumentation extends InstrumentationBase { const { v4PatchConnect, v4UnpatchConnect } = this._getV4ConnectPatches(); const { v4PatchConnection, v4UnpatchConnection } = this._getV4ConnectionPatches(); + const { v4PatchConnectionPool, v4UnpatchConnectionPool } = + this._getV4ConnectionPoolPatches(); const { v4PatchSessions, v4UnpatchSessions } = this._getV4SessionsPatches(); return [ @@ -105,6 +109,12 @@ export class MongoDBInstrumentation extends InstrumentationBase { v4PatchConnection, v4UnpatchConnection ), + new InstrumentationNodeModuleFile( + 'mongodb/lib/cmap/connection_pool.js', + ['4.*', '5.*'], + v4PatchConnectionPool, + v4UnpatchConnectionPool + ), new InstrumentationNodeModuleFile( 'mongodb/lib/cmap/connect.js', ['4.*', '5.*'], @@ -268,6 +278,35 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } + private _getV4ConnectionPoolPatches() { + return { + v4PatchConnectionPool: (moduleExports: any, moduleVersion?: string) => { + diag.debug(`Applying patch for mongodb@${moduleVersion}`); + const poolPrototype = moduleExports.ConnectionPool.prototype; + + if (isWrapped(poolPrototype.checkOut)) { + this._unwrap(poolPrototype, 'checkOut'); + } + + this._wrap( + poolPrototype, + 'checkOut', + this._getV4ConnectionPoolCheckOut() + ); + return moduleExports; + }, + v4UnpatchConnectionPool: ( + moduleExports?: any, + moduleVersion?: string + ) => { + diag.debug(`Removing internal patch for mongodb@${moduleVersion}`); + if (moduleExports === undefined) return; + + this._unwrap(moduleExports.ConnectionPool.prototype, 'checkOut'); + }, + }; + } + private _getV4ConnectPatches() { return { v4PatchConnect: (moduleExports: any, moduleVersion?: string) => { @@ -288,6 +327,17 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } + // This patch will become unnecessary once + // https://jira.mongodb.org/browse/NODE-5639 is done. + private _getV4ConnectionPoolCheckOut() { + return (original: V4ConnectionPool['checkOut']) => { + return function patchedCheckout(this: unknown, callback: any) { + const patchedCallback = AsyncResource.bind(callback); + return original.call(this, patchedCallback); + }; + }; + } + private _getV4ConnectCommand() { const instrumentation = this; diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts index 03131aa12a..5cb4119de5 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts @@ -184,6 +184,13 @@ export type V4Connection = { ): void; }; +// https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connection_pool.ts +export type V4ConnectionPool = { + // Instrumentation just cares about carrying the async context so + // types of callback params are not needed + checkOut: (callback: (error: any, connection: any) => void) => void; +}; + // https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connect.ts export type V4Connect = { connect: (options: any, callback: any) => void; diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts index 18ba6bc9ef..724e33cffd 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts @@ -237,6 +237,44 @@ describe('MongoDBInstrumentation-Tracing-v4', () => { }); }); }); + + it('should create child spans for concurrent cursor operations', done => { + const queries = [{ a: 1 }, { a: 2 }, { a: 3 }]; + const tasks = queries.map((query, idx) => { + return new Promise((resolve, reject) => { + process.nextTick(() => { + const span = trace + .getTracer('default') + .startSpan(`findRootSpan ${idx}`); + context.with(trace.setSpan(context.active(), span), () => { + collection + .find(query) + .toArray() + .then(() => { + resolve(span.end()); + }) + .catch(reject); + }); + }); + }); + }); + + Promise.all(tasks) + .then(() => { + const spans = getTestSpans(); + const roots = spans.filter(s => s.name.startsWith('findRootSpan')); + + roots.forEach(root => { + const rootId = root.spanContext().spanId; + const children = spans.filter(s => s.parentSpanId === rootId); + assert.strictEqual(children.length, 1); + }); + done(); + }) + .catch(err => { + done(err); + }); + }); }); /** Should intercept command */ diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts index bd8271cb8d..e9f80ad6c5 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts @@ -243,6 +243,44 @@ describe('MongoDBInstrumentation-Tracing-v5', () => { }); }); }); + + it('should create child spans for concurrent cursor operations', done => { + const queries = [{ a: 1 }, { a: 2 }, { a: 3 }]; + const tasks = queries.map((query, idx) => { + return new Promise((resolve, reject) => { + process.nextTick(() => { + const span = trace + .getTracer('default') + .startSpan(`findRootSpan ${idx}`); + context.with(trace.setSpan(context.active(), span), () => { + collection + .find(query) + .toArray() + .then(() => { + resolve(span.end()); + }) + .catch(reject); + }); + }); + }); + }); + + Promise.all(tasks) + .then(() => { + const spans = getTestSpans(); + const roots = spans.filter(s => s.name.startsWith('findRootSpan')); + + roots.forEach(root => { + const rootId = root.spanContext().spanId; + const children = spans.filter(s => s.parentSpanId === rootId); + assert.strictEqual(children.length, 1); + }); + done(); + }) + .catch(err => { + done(err); + }); + }); }); /** Should intercept command */ diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts index f47b1e81b0..8d7914c51b 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts @@ -66,7 +66,11 @@ export class PgInstrumentation extends InstrumentationBase { const modulePG = new InstrumentationNodeModuleDefinition( 'pg', ['8.*'], - moduleExports => { + (module: any) => { + const moduleExports: typeof pgTypes = + module[Symbol.toStringTag] === 'Module' + ? module.default // ESM + : module; // CommonJS if (isWrapped(moduleExports.Client.prototype.query)) { this._unwrap(moduleExports.Client.prototype, 'query'); } @@ -87,9 +91,13 @@ export class PgInstrumentation extends InstrumentationBase { this._getClientConnectPatch() as any ); - return moduleExports; + return module; }, - moduleExports => { + (module: any) => { + const moduleExports: typeof pgTypes = + module[Symbol.toStringTag] === 'Module' + ? module.default // ESM + : module; // CommonJS if (isWrapped(moduleExports.Client.prototype.query)) { this._unwrap(moduleExports.Client.prototype, 'query'); }