From a464bdb5cbb7856b7a08dac3ff48132948b65792 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 8 Oct 2024 05:30:29 -0700 Subject: [PATCH 1/4] feat(observability): trace Transaction (#2122) This change adds observability tracing for Transaction along with tests. Updates #2079 Built from PR #2087 Updates #2114 --- observability-test/spanner.ts | 263 ++++++++++- observability-test/transaction.ts | 737 ++++++++++++++++++++++++++++++ src/database.ts | 32 +- src/transaction.ts | 643 +++++++++++++++----------- 4 files changed, 1388 insertions(+), 287 deletions(-) create mode 100644 observability-test/transaction.ts diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index bf3d93538..3e9cc295b 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -37,6 +37,9 @@ const { const {ObservabilityOptions} = require('../src/instrument'); +const selectSql = 'SELECT 1'; +const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + /** A simple result set for SELECT 1. */ function createSelect1ResultSet(): protobuf.ResultSet { const fields = [ @@ -85,8 +88,6 @@ async function setup( ); }); - const selectSql = 'SELECT 1'; - const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; spannerMock.putStatementResult( selectSql, mock.StatementResult.resultSet(createSelect1ResultSet()) @@ -205,7 +206,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -216,14 +216,19 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Database.getSnapshot', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = []; + const expectedEventNames = ['Begin Transaction']; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -279,7 +284,6 @@ describe('EndToEnd', () => { .on('end', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -290,7 +294,10 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.runStream']; + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, @@ -313,7 +320,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); // Sort the spans by duration. spans.sort((spanA, spanB) => { @@ -330,6 +336,7 @@ describe('EndToEnd', () => { }); const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; @@ -375,7 +382,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -386,7 +392,11 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + const expectedSpanNames = [ + 'CloudSpanner.Database.runTransaction', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, @@ -413,7 +423,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -424,14 +433,21 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + const expectedSpanNames = [ + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.writeAtLeastOnce', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Starting Commit', + 'Commit Done', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -522,6 +538,226 @@ describe('ObservabilityOptions injection and propagation', async () => { done(); }); + afterEach(async () => { + await injectedTracerProvider.forceFlush(); + injectedTraceExporter.reset(); + }); + + let database: Database; + beforeEach(() => { + const instance = spanner.instance('instance'); + database = instance.database('db'); + }); + + describe('Transaction', () => { + const traceExporter = injectedTraceExporter; + + it('run', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + tx!.run('SELECT 1', (err, rows) => { + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.getTransaction', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Transaction Creation Done', + ]; + assert.strictEqual( + actualEventNames.every(value => expectedEventNames.includes(value)), + true, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('Transaction.begin+Dml.runUpdate', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + // Firstly erase the prior spans so that we can have only Transaction spans. + traceExporter.reset(); + + tx!.begin(); + tx!.runUpdate(updateSql, (err, rowCount) => { + assert.ifError(err); + + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 4); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + ]; + assert.strictEqual( + actualEventNames.every(value => expectedEventNames.includes(value)), + true, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('runStream', done => { + let rowCount = 0; + database.getTransaction((err, tx) => { + assert.ifError(err); + tx! + .runStream(selectSql) + .on('data', () => rowCount++) + .on('error', assert.ifError) + .on('stats', _stats => {}) + .on('end', () => { + tx!.end(); + + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.getTransaction', + 'CloudSpanner.Snapshot.runStream', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('rollback', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + // Firstly erase the prior spans so that we can have only Transaction spans. + traceExporter.reset(); + + tx!.begin(); + + tx!.runUpdate(updateSql, async (err, rowCount) => { + assert.ifError(err); + tx!.rollback(err => { + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + 'CloudSpanner.Transaction.rollback', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + ]; + assert.strictEqual( + actualEventNames.every(value => + expectedEventNames.includes(value) + ), + true, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + }); + }); + it('Propagates spans to the injected not global TracerProvider', done => { const instance = spanner.instance('instance'); const database = instance.database('database'); @@ -558,6 +794,7 @@ describe('ObservabilityOptions injection and propagation', async () => { }); const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts new file mode 100644 index 000000000..7d1795e49 --- /dev/null +++ b/observability-test/transaction.ts @@ -0,0 +1,737 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {before, beforeEach, afterEach, describe, it} from 'mocha'; +import {EventEmitter} from 'events'; +import * as proxyquire from 'proxyquire'; +import * as sinon from 'sinon'; + +import {codec} from '../src/codec'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SpanStatusCode} = require('@opentelemetry/api'); +const { + ReadableSpan, + SimpleSpanProcessor, +} = require('@opentelemetry/sdk-trace-base'); + +describe('Transaction', () => { + const sandbox = sinon.createSandbox(); + + const REQUEST = sandbox.stub(); + const REQUEST_STREAM = sandbox.stub(); + const SESSION_NAME = 'session-123'; + + const SPANNER = { + routeToLeaderEnabled: true, + directedReadOptions: {}, + }; + + const INSTANCE = { + parent: SPANNER, + }; + + const DATABASE = { + formattedName_: 'formatted-database-name', + parent: INSTANCE, + }; + + const SESSION = { + parent: DATABASE, + formattedName_: SESSION_NAME, + request: REQUEST, + requestStream: REQUEST_STREAM, + }; + + const PARTIAL_RESULT_STREAM = sandbox.stub(); + const PROMISIFY_ALL = sandbox.stub(); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let Snapshot; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let Dml; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let Transaction; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let PartitionedDml; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let transaction; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let snapshot; + + before(() => { + const txns = proxyquire('../src/transaction', { + '@google-cloud/promisify': {promisifyAll: PROMISIFY_ALL}, + './codec': {codec}, + './partial-result-stream': {partialResultStream: PARTIAL_RESULT_STREAM}, + }); + + Snapshot = txns.Snapshot; + Dml = txns.Dml; + Transaction = txns.Transaction; + PartitionedDml = txns.PartitionedDml; + }); + + let traceExporter: typeof InMemorySpanExporter; + let tracerProvider: typeof NodeTracerProvider; + + beforeEach(() => { + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + tracerProvider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const SNAPSHOT_OPTIONS = {a: 'b', c: 'd'}; + sandbox.stub(Snapshot, 'encodeTimestampBounds').returns(SNAPSHOT_OPTIONS); + snapshot = new Snapshot(SESSION); + snapshot._observabilityOptions = {tracerProvider: tracerProvider}; + + transaction = new Transaction(SESSION); + transaction._observabilityOptions = {tracerProvider: tracerProvider}; + }); + + afterEach(async () => { + sandbox.restore(); + await tracerProvider.forceFlush(); + traceExporter.reset(); + }); + + after(async () => { + await tracerProvider.shutdown(); + }); + + interface spanExportResults { + spans: (typeof ReadableSpan)[]; + spanNames: string[]; + spanEventNames: string[]; + } + + function extractExportedSpans(): spanExportResults { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + // Sort the spans by startTime. + spans.sort((spanA, spanB) => { + spanA.startTime < spanB.startTime; + }); + + const spanNames: string[] = []; + const eventNames: string[] = []; + spans.forEach(span => { + spanNames.push(span.name); + span.events.forEach(event => { + eventNames.push(event.name); + }); + }); + + return { + spans: spans, + spanNames: spanNames, + spanEventNames: eventNames, + } as spanExportResults; + } + + describe('Snapshot', () => { + describe('begin', () => { + const BEGIN_RESPONSE = { + id: Buffer.from('transaction-id-123'), + }; + + it('without error', done => { + REQUEST.callsFake((_, callback) => callback(null, BEGIN_RESPONSE)); + + snapshot.begin((err, resp) => { + assert.ifError(err); + assert.strictEqual(resp, BEGIN_RESPONSE); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.begin']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Begin Transaction']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with error', done => { + const fakeError = new Error('begin.error'); + + REQUEST.callsFake((_, callback) => callback(fakeError)); + + snapshot.begin(err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.begin']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Begin Transaction']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'begin.error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + }); + + describe('read', () => { + const TABLE = 'my-table-123'; + + let fakeStream; + let stub; + + beforeEach(() => { + fakeStream = new EventEmitter(); + stub = sandbox.stub(snapshot, 'createReadStream').returns(fakeStream); + }); + + it('with error', done => { + const fakeError = new Error('read.error'); + + snapshot.read(TABLE, {}, err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.read']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'read.error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + + fakeStream.emit('error', fakeError); + }); + + it('without error', done => { + const fakeRows = [{a: 'b'}, {c: 'd'}, {e: 'f'}]; + + snapshot.read(TABLE, {}, (err, rows) => { + assert.ifError(err); + assert.deepStrictEqual(rows, fakeRows); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.read']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + + fakeRows.forEach(row => fakeStream.emit('data', row)); + fakeStream.emit('end'); + }); + }); + + describe('run', () => { + const QUERY = 'SELET * FROM `MyTable`'; + + let fakeStream; + let stub; + + beforeEach(() => { + fakeStream = new EventEmitter(); + stub = sandbox.stub(snapshot, 'runStream').returns(fakeStream); + }); + + it('without error', done => { + const fakeRows = [{a: 'b'}, {c: 'd'}, {e: 'f'}]; + + snapshot.run(QUERY, (err, rows) => { + assert.ifError(err); + assert.deepStrictEqual(rows, fakeRows); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.run']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + done(); + }); + + fakeRows.forEach(row => fakeStream.emit('data', row)); + fakeStream.emit('end'); + }); + + it('with errors', done => { + const fakeError = new Error('run.error'); + + snapshot.run(QUERY, err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.run']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'run.error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + + fakeStream.emit('error', fakeError); + }); + }); + + describe('runStream', () => { + const QUERY = { + sql: 'SELECT * FROM `MyTable`', + }; + + beforeEach(() => { + PARTIAL_RESULT_STREAM.callsFake(makeRequest => makeRequest()); + }); + + it('with error', done => { + REQUEST_STREAM.resetHistory(); + + const fakeQuery = Object.assign({}, QUERY, { + params: {a: undefined}, + }); + + const stream = snapshot.runStream(fakeQuery); + stream.on('error', error => { + assert.strictEqual( + error.message, + 'Value of type undefined not recognized.' + ); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.runStream']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['exception']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'Value of type undefined not recognized.', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + assert.ok(!REQUEST_STREAM.called, 'No request should be made'); + }); + }); + }); + + describe('rollback', () => { + const ID = 'transaction-id-0xdedabeef'; + + beforeEach(() => { + transaction.id = ID; + }); + + it('error with unset `id`', done => { + const expectedError = new Error( + 'Transaction ID is unknown, nothing to rollback.' + ); + delete transaction.id; + + transaction.rollback(err => { + assert.deepStrictEqual(err, expectedError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.rollback']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + expectedError.message, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + + it('with request error', done => { + const fakeError = new Error('our request error'); + transaction.request = (config, callback) => { + callback(fakeError); + }; + + transaction.rollback(err => { + assert.deepStrictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.rollback']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + 'our request error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + + it('with no error', done => { + transaction.request = (config, callback) => { + callback(null); + }; + + transaction.rollback(err => { + assert.ifError(err); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.rollback']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + }); + + describe('commit', () => { + it('without error', done => { + const id = 'transaction-id-123'; + const transactionTag = 'bar'; + transaction.id = id; + transaction.requestOptions = {transactionTag}; + + transaction.request = (config, callback) => { + callback(null, {}); + }; + + transaction.commit(err => { + assert.ifError(err); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.commit']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Starting Commit', 'Commit Done']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + + it('with generic error', () => { + const fakeError = new Error('commit.error'); + transaction.request = (config, callback) => { + callback(fakeError, {}); + }; + + transaction.commit(err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.commit']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Starting Commit', 'Commit failed']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + fakeError.message, + firstSpan.status.message, + 'Unexpected span status message' + ); + }); + }); + }); +}); diff --git a/src/database.ts b/src/database.ts index 917bcabcf..aad10f111 100644 --- a/src/database.ts +++ b/src/database.ts @@ -828,8 +828,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.createBatchTransaction', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.createBatchTransaction', traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); @@ -1875,8 +1875,8 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - const q = {opts: this._observabilityOptions}; - return startTrace('Database.getSessions', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.getSessions', traceConfig, span => { this.request< google.spanner.v1.ISession, google.spanner.v1.IListSessionsResponse @@ -2058,8 +2058,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.getSnapshot', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.getSnapshot', traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); @@ -2159,8 +2159,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as GetTransactionOptions) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.getTransaction', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.getTransaction', traceConfig, span => { this.pool_.getSession((err, session, transaction) => { if (options.requestOptions) { transaction!.requestOptions = Object.assign( @@ -2786,8 +2786,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {sql: query, opts: this._observabilityOptions}; - return startTrace('Database.run', q, span => { + const traceConfig = {sql: query, opts: this._observabilityOptions}; + return startTrace('Database.run', traceConfig, span => { this.runStream(query, options) .on('error', err => { setSpanError(span, err); @@ -3007,8 +3007,8 @@ class Database extends common.GrpcServiceObject { options?: TimestampBounds ): PartialResultStream { const proxyStream: Transform = through.obj(); - const q = {sql: query, opts: this._observabilityOptions}; - return startTrace('Database.runStream', q, span => { + const traceConfig = {sql: query, opts: this._observabilityOptions}; + return startTrace('Database.runStream', traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); @@ -3185,8 +3185,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - const q = {opts: this._observabilityOptions}; - startTrace('Database.runTransaction', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + startTrace('Database.runTransaction', traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err) { setSpanError(span, err); @@ -3578,8 +3578,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as CallOptions) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.writeAtLeastOnce', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.writeAtLeastOnce', traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err && isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { diff --git a/src/transaction.ts b/src/transaction.ts index ca96864e1..7e0dbb40b 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -22,7 +22,7 @@ import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; import {common as p} from 'protobufjs'; -import {Readable, PassThrough} from 'stream'; +import {finished, Readable, PassThrough, Stream} from 'stream'; import {codec, Json, JSONOptions, Type, Value} from './codec'; import { @@ -46,7 +46,12 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; -import {ObservabilityOptions} from './instrument'; +import { + ObservabilityOptions, + startTrace, + setSpanError, + setSpanErrorAndException, +} from './instrument'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -351,6 +356,7 @@ export class Snapshot extends EventEmitter { }; this._waitingRequests = []; this._inlineBeginStarted = false; + this._observabilityOptions = session._observabilityOptions; } /** @@ -416,9 +422,6 @@ export class Snapshot extends EventEmitter { options, }; - const span = getActiveOrNoopSpan(); - span.addEvent('Begin Transaction'); - // Only hand crafted read-write transactions will be able to set a // transaction tag for the BeginTransaction RPC. Also, this.requestOptions // is only set in the constructor of Transaction, which is the constructor @@ -436,26 +439,34 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'beginTransaction', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ITransaction - ) => { - if (err) { - callback!(err, resp); - return; + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Snapshot.begin', traceConfig, span => { + span.addEvent('Begin Transaction'); + + this.request( + { + client: 'SpannerClient', + method: 'beginTransaction', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ITransaction + ) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, resp); + return; + } + this._update(resp); + span.end(); + callback!(null, resp); } - this._update(resp); - callback!(null, resp); - } - ); + ); + }); } /** @@ -706,31 +717,56 @@ export class Snapshot extends EventEmitter { }); }; - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - ?.on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); + const traceConfig = {tableName: table, opts: this._observabilityOptions}; + return startTrace('Snapshot.createReadStream', traceConfig, span => { + const resultStream = partialResultStream( + this._wrapWithIdWaiter(makeRequest), + { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, } - }); + ) + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + setSpanError(span, err); + }) + .on('end', err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + + if (resultStream instanceof Stream) { + finished(resultStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return resultStream; + }); } /** @@ -925,10 +961,21 @@ export class Snapshot extends EventEmitter { callback = cb as ReadCallback; } - this.createReadStream(table, request) - .on('error', callback!) - .on('data', row => rows.push(row)) - .on('end', () => callback!(null, rows)); + const traceConfig = {tableName: table, opts: this._observabilityOptions}; + return startTrace('Snapshot.read', traceConfig, span => { + this.createReadStream(table, request) + .on('error', err => { + const e = err as grpc.ServiceError; + setSpanError(span, e); + span.end(); + callback!(e, null); + }) + .on('data', row => rows.push(row)) + .on('end', () => { + span.end(); + callback!(null, rows); + }); + }); } /** @@ -1018,19 +1065,29 @@ export class Snapshot extends EventEmitter { let stats: google.spanner.v1.ResultSetStats; let metadata: google.spanner.v1.ResultSetMetadata; - this.runStream(query) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - if (metadata.transaction && !this.id) { - this._update(metadata.transaction); + const traceConfig = {sql: query, opts: this._observabilityOptions}; + startTrace('Snapshot.run', traceConfig, span => { + return this.runStream(query) + .on('error', (err, rows, stats, metadata) => { + setSpanError(span, err); + span.end(); + callback!(err, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); + } } - } - }) - .on('data', row => rows.push(row)) - .on('stats', _stats => (stats = _stats)) - .on('end', () => callback!(null, rows, stats, metadata)); + }) + .on('data', row => rows.push(row)) + .on('stats', _stats => (stats = _stats)) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** @@ -1201,51 +1258,78 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (!reqOpts || (this.id && !reqOpts.transaction.id)) { - try { - sanitizeRequest(); - } catch (e) { - const errorStream = new PassThrough(); - setImmediate(() => errorStream.destroy(e as Error)); - return errorStream; + const traceConfig = {opts: this._observabilityOptions, ...query}; + return startTrace('Snapshot.runStream', traceConfig, span => { + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { + try { + sanitizeRequest(); + } catch (e) { + const errorStream = new PassThrough(); + setSpanErrorAndException(span, e as Error); + span.end(); + setImmediate(() => errorStream.destroy(e as Error)); + return errorStream; + } } - } - - return this.requestStream({ - client: 'SpannerClient', - method: 'executeStreamingSql', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - .on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); + return this.requestStream({ + client: 'SpannerClient', + method: 'executeStreamingSql', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + + const resultStream = partialResultStream( + this._wrapWithIdWaiter(makeRequest), + { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, } - }); + ) + .on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + setSpanError(span, err as Error); + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + }) + .on('end', err => { + if (err) { + setSpanError(span, err as Error); + } + span.end(); + }); + + if (resultStream instanceof Stream) { + finished(resultStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return resultStream; + }); } /** @@ -1543,22 +1627,30 @@ export class Dml extends Snapshot { query = {sql: query} as ExecuteSqlRequest; } - this.run( - query, - ( - err: null | grpc.ServiceError, - rows: Rows, - stats: spannerClient.spanner.v1.ResultSetStats - ) => { - let rowCount = 0; - - if (stats && stats.rowCount) { - rowCount = Math.floor(stats[stats.rowCount] as number); - } + const traceConfig = {opts: this._observabilityOptions, ...query}; + return startTrace('Dml.runUpdate', traceConfig, span => { + this.run( + query, + ( + err: null | grpc.ServiceError, + rows: Rows, + stats: spannerClient.spanner.v1.ResultSetStats + ) => { + let rowCount = 0; + + if (stats && stats.rowCount) { + rowCount = Math.floor(stats[stats.rowCount] as number); + } - callback!(err, rowCount); - } - ); + if (err) { + setSpanError(span, err); + } + + span.end(); + callback!(err, rowCount); + } + ); + }); } } @@ -1812,57 +1904,64 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'executeBatchDml', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse - ) => { - let batchUpdateError: BatchUpdateError; - - if (err) { - const rowCounts: number[] = []; - batchUpdateError = Object.assign(err, {rowCounts}); - callback!(batchUpdateError, rowCounts, resp); - return; - } + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Transaction.batchUpdate', traceConfig, span => { + this.request( + { + client: 'SpannerClient', + method: 'executeBatchDml', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse + ) => { + let batchUpdateError: BatchUpdateError; + + if (err) { + const rowCounts: number[] = []; + batchUpdateError = Object.assign(err, {rowCounts}); + setSpanError(span, batchUpdateError); + span.end(); + callback!(batchUpdateError, rowCounts, resp); + return; + } - const {resultSets, status} = resp; - for (const resultSet of resultSets) { - if (!this.id && resultSet.metadata?.transaction) { - this._update(resultSet.metadata.transaction); + const {resultSets, status} = resp; + for (const resultSet of resultSets) { + if (!this.id && resultSet.metadata?.transaction) { + this._update(resultSet.metadata.transaction); + } + } + const rowCounts: number[] = resultSets.map(({stats}) => { + return ( + (stats && + Number( + stats[ + (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! + ] + )) || + 0 + ); + }); + + if (status && status.code !== 0) { + const error = new Error(status.message!); + batchUpdateError = Object.assign(error, { + code: status.code, + metadata: Transaction.extractKnownMetadata(status.details!), + rowCounts, + }) as BatchUpdateError; + setSpanError(span, batchUpdateError); } - } - const rowCounts: number[] = resultSets.map(({stats}) => { - return ( - (stats && - Number( - stats[ - (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! - ] - )) || - 0 - ); - }); - if (status && status.code !== 0) { - const error = new Error(status.message!); - batchUpdateError = Object.assign(error, { - code: status.code, - metadata: Transaction.extractKnownMetadata(status.details!), - rowCounts, - }) as BatchUpdateError; + span.end(); + callback!(batchUpdateError!, rowCounts, resp); } - - callback!(batchUpdateError!, rowCounts, resp); - } - ); + ); + }); } private static extractKnownMetadata( @@ -1975,69 +2074,82 @@ export class Transaction extends Dml { const requestOptions = (options as CommitOptions).requestOptions; const reqOpts: CommitRequest = {mutations, session, requestOptions}; - const span = getActiveOrNoopSpan(); - - if (this.id) { - reqOpts.transactionId = this.id as Uint8Array; - } else if (!this._useInRunner) { - reqOpts.singleUseTransaction = this._options; - } else { - this.begin().then(() => this.commit(options, callback), callback); - return; - } - - if ( - 'returnCommitStats' in options && - (options as CommitOptions).returnCommitStats - ) { - reqOpts.returnCommitStats = (options as CommitOptions).returnCommitStats; - } - if ( - 'maxCommitDelay' in options && - (options as CommitOptions).maxCommitDelay - ) { - reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; - } - reqOpts.requestOptions = Object.assign( - requestOptions || {}, - this.requestOptions - ); + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Transaction.commit', traceConfig, span => { + if (this.id) { + reqOpts.transactionId = this.id as Uint8Array; + } else if (!this._useInRunner) { + reqOpts.singleUseTransaction = this._options; + } else { + this.begin().then(() => { + this.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback(err, resp); + }); + }, callback); + return; + } - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + if ( + 'returnCommitStats' in options && + (options as CommitOptions).returnCommitStats + ) { + reqOpts.returnCommitStats = ( + options as CommitOptions + ).returnCommitStats; + } + if ( + 'maxCommitDelay' in options && + (options as CommitOptions).maxCommitDelay + ) { + reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; + } + reqOpts.requestOptions = Object.assign( + requestOptions || {}, + this.requestOptions + ); - span.addEvent('Starting Commit'); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'commit', - reqOpts, - gaxOpts: gaxOpts, - headers: headers, - }, - (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { - this.end(); + span.addEvent('Starting Commit'); + + this.request( + { + client: 'SpannerClient', + method: 'commit', + reqOpts, + gaxOpts: gaxOpts, + headers: headers, + }, + (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { + this.end(); + + if (err) { + span.addEvent('Commit failed'); + setSpanError(span, err); + } else { + span.addEvent('Commit Done'); + } - if (err) { - span.addEvent('Commit failed'); - } else { - span.addEvent('Commit Done'); - } + if (resp && resp.commitTimestamp) { + this.commitTimestampProto = resp.commitTimestamp; + this.commitTimestamp = new PreciseDate( + resp.commitTimestamp as DateStruct + ); + } + err = Transaction.decorateCommitError(err as ServiceError, mutations); - if (resp && resp.commitTimestamp) { - this.commitTimestampProto = resp.commitTimestamp; - this.commitTimestamp = new PreciseDate( - resp.commitTimestamp as DateStruct - ); + span.end(); + callback!(err as ServiceError | null, resp); } - err = Transaction.decorateCommitError(err as ServiceError, mutations); - - callback!(err as ServiceError | null, resp); - } - ); + ); + }); } /** @@ -2328,40 +2440,48 @@ export class Transaction extends Dml { const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - if (!this.id) { - callback!( - new Error( + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Transaction.rollback', traceConfig, span => { + if (!this.id) { + const err = new Error( 'Transaction ID is unknown, nothing to rollback.' - ) as ServiceError - ); - return; - } + ) as ServiceError; + setSpanError(span, err); + span.end(); + callback!(err); + return; + } - const session = this.session.formattedName_!; - const transactionId = this.id; - const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { - session, - transactionId, - }; + const session = this.session.formattedName_!; + const transactionId = this.id; + const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { + session, + transactionId, + }; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } - - this.request( - { - client: 'SpannerClient', - method: 'rollback', - reqOpts, - gaxOpts, - headers: headers, - }, - (err: null | ServiceError) => { - this.end(); - callback!(err); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); } - ); + + this.request( + { + client: 'SpannerClient', + method: 'rollback', + reqOpts, + gaxOpts, + headers: headers, + }, + (err: null | ServiceError) => { + if (err) { + setSpanError(span, err); + } + span.end(); + this.end(); + callback!(err); + } + ); + }); } /** @@ -2813,9 +2933,16 @@ export class PartitionedDml extends Dml { query: string | ExecuteSqlRequest, callback?: RunUpdateCallback ): void | Promise { - super.runUpdate(query, (err, count) => { - this.end(); - callback!(err, count); + const traceConfig = {sql: query, opts: this._observabilityOptions}; + return startTrace('PartitionedDml.runUpdate', traceConfig, span => { + super.runUpdate(query, (err, count) => { + if (err) { + setSpanError(span, err); + } + this.end(); + span.end(); + callback!(err, count); + }); }); } } From f489c9479fa5402f0c960cf896fd3be0e946f182 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 10 Oct 2024 02:44:26 -0700 Subject: [PATCH 2/4] feat: (observability) trace Database.batchCreateSessions + SessionPool.createSessions (#2145) This change adds tracing for Database.batchCreateSessions as well as SessionPool.createSessions which was raised as a big need. This change is a premise to finishing up tracing Transaction. While here, also folded in the async/await fix to avoid day+ long code review lag and then 3+ hours just to run tests per PR: OpenTelemetry cannot work correctly for async/await if there isn't a set AsyncHooksManager, but we should not burden our customers with this type of specialist knowledge, their code should just work and this change performs such a check. Later on we shall file a feature request with the OpenTelemetry-JS API group to give us a hook to detect if we've got a live asyncHooksManager instead of this mandatory comparison to ROOT_CONTEXT each time. Fixes #2146 Updates #2079 Spun out of PR #2122 Supersedes PR #2147 --- observability-test/database.ts | 112 +++++- observability-test/session-pool.ts | 222 +++++++++++ observability-test/spanner.ts | 583 ++++++++++++++++++++++++----- observability-test/transaction.ts | 5 +- package.json | 2 +- src/database.ts | 60 +-- src/instance.ts | 1 + src/instrument.ts | 27 ++ src/session-pool.ts | 84 +++-- src/table.ts | 1 + test/spanner.ts | 71 ++-- 11 files changed, 982 insertions(+), 186 deletions(-) create mode 100644 observability-test/session-pool.ts diff --git a/observability-test/database.ts b/observability-test/database.ts index cbcc73572..d4dcea825 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -375,6 +375,115 @@ describe('Database', () => { }); }); + describe('batchCreateSessions', () => { + it('without error', done => { + const ARGS = [null, [{}]]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.batchCreateSessions(10, (err, sessions) => { + assert.ifError(err); + assert.ok(sessions); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span didn't encounter an error. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with error', done => { + const ARGS = [new Error('batchCreateSessions.error'), null]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.batchCreateSessions(10, (err, sessions) => { + assert.ok(err); + assert.ok(!sessions); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'batchCreateSessions.error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + describe('getSnapshot', () => { let fakePool: FakeSessionPool; let fakeSession: FakeSession; @@ -409,7 +518,7 @@ describe('Database', () => { getSessionStub.callsFake(callback => callback(fakeError, null)); - database.getSnapshot((err, snapshot) => { + database.getSnapshot(err => { assert.strictEqual(err, fakeError); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -1027,7 +1136,6 @@ describe('Database', () => { }); it('with error on null mutation should catch thrown error', done => { - const fakeError = new Error('err'); try { database.writeAtLeastOnce(null, (err, res) => {}); } catch (err) { diff --git a/observability-test/session-pool.ts b/observability-test/session-pool.ts new file mode 100644 index 000000000..e92b42b0a --- /dev/null +++ b/observability-test/session-pool.ts @@ -0,0 +1,222 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {before, beforeEach, afterEach, describe, it} from 'mocha'; +import * as extend from 'extend'; +import PQueue from 'p-queue'; +import * as proxyquire from 'proxyquire'; +import * as sinon from 'sinon'; +import stackTrace = require('stack-trace'); +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +// eslint-disable-next-line n/no-extraneous-require +const {SpanStatusCode} = require('@opentelemetry/api'); + +import {Database} from '../src/database'; +import {Session} from '../src/session'; +import * as sp from '../src/session-pool'; + +let pQueueOverride: typeof PQueue | null = null; + +function FakePQueue(options) { + return new (pQueueOverride || PQueue)(options); +} + +FakePQueue.default = FakePQueue; + +class FakeTransaction { + options; + constructor(options?) { + this.options = options; + } + async begin(): Promise {} +} + +const fakeStackTrace = extend({}, stackTrace); + +describe('SessionPool', () => { + let sessionPool: sp.SessionPool; + // tslint:disable-next-line variable-name + let SessionPool: typeof sp.SessionPool; + + function noop() {} + const DATABASE = { + batchCreateSessions: noop, + databaseRole: 'parent_role', + } as unknown as Database; + + const sandbox = sinon.createSandbox(); + const shouldNotBeCalled = sandbox.stub().throws('Should not be called.'); + + const createSession = (name = 'id', props?): Session => { + props = props || {}; + + return Object.assign(new Session(DATABASE, name), props, { + create: sandbox.stub().resolves(), + delete: sandbox.stub().resolves(), + keepAlive: sandbox.stub().resolves(), + transaction: sandbox.stub().returns(new FakeTransaction()), + }); + }; + + before(() => { + SessionPool = proxyquire('../src/session-pool.js', { + 'p-queue': FakePQueue, + 'stack-trace': fakeStackTrace, + }).SessionPool; + }); + + afterEach(() => { + pQueueOverride = null; + sandbox.restore(); + }); + + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + beforeEach(() => { + DATABASE.session = createSession; + DATABASE._observabilityOptions = { + tracerProvider: provider, + }; + sessionPool = new SessionPool(DATABASE); + sessionPool._observabilityOptions = DATABASE._observabilityOptions; + traceExporter.reset(); + }); + + describe('_createSessions', () => { + const OPTIONS = 3; + it('on exception from Database.batchCreateSessions', async () => { + const ourException = new Error('this fails intentionally'); + const stub = sandbox + .stub(DATABASE, 'batchCreateSessions') + .throws(ourException); + const releaseStub = sandbox.stub(sessionPool, 'release'); + + assert.rejects(async () => { + await sessionPool._createSessions(OPTIONS); + }, ourException); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.SessionPool.createSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 3 sessions', + 'Creating 3 sessions', + 'Requested for 3 sessions returned 0', + 'exception', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + ourException.message, + firstSpan.status.message, + 'Unexpected span status message' + ); + }); + + it('without error', async () => { + const RESPONSE = [[{}, {}, {}]]; + + const stub = sandbox + .stub(DATABASE, 'batchCreateSessions') + .resolves(RESPONSE); + const releaseStub = sandbox.stub(sessionPool, 'release'); + + await sessionPool._createSessions(OPTIONS); + assert.strictEqual(sessionPool.size, 3); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.SessionPool.createSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 3 sessions', + 'Creating 3 sessions', + 'Requested for 3 sessions returned 3', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + }); + }); +}); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 3e9cc295b..322fcc1b9 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -36,6 +36,7 @@ const { } = require('@opentelemetry/context-async-hooks'); const {ObservabilityOptions} = require('../src/instrument'); +import {SessionPool} from '../src/session-pool'; const selectSql = 'SELECT 1'; const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; @@ -113,65 +114,61 @@ async function setup( } describe('EndToEnd', () => { - describe('Database', () => { - let server: grpc.Server; - let spanner: Spanner; - let database: Database; - let spannerMock: mock.MockSpanner; - let traceExporter: typeof InMemorySpanExporter; + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; - const contextManager = new AsyncHooksContextManager(); - setGlobalContextManager(contextManager); + const contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); - afterEach(() => { - disableContextAndManager(contextManager); + afterEach(() => { + disableContextAndManager(contextManager); + }); + + beforeEach(async () => { + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - beforeEach(async () => { - traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - const provider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); - const setupResult = await setup({ - tracerProvider: provider, - enableExtendedTracing: false, - }); - spanner = setupResult.spanner; - server = setupResult.server; - spannerMock = setupResult.spannerMock; - - const selectSql = 'SELECT 1'; - const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; - spannerMock.putStatementResult( - selectSql, - mock.StatementResult.resultSet(createSelect1ResultSet()) - ); - spannerMock.putStatementResult( - updateSql, - mock.StatementResult.updateCount(1) - ); + const setupResult = await setup({ + tracerProvider: provider, + enableExtendedTracing: false, + }); - provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; - const instance = spanner.instance('instance'); - database = instance.database('database'); - }); + const instance = spanner.instance('instance'); + database = instance.database('database'); + + // To deflake expectations of session creation, let's + // issue out a warm-up request request that'll ensure + // that the SessionPool is created deterministically. + const [rows] = await database.run('SELECT 1'); + // Clear out any present traces to make a clean slate for testing. + traceExporter.forceFlush(); + traceExporter.reset(); + }); - afterEach(() => { - traceExporter.reset(); - spannerMock.resetRequests(); - spanner.close(); - server.tryShutdown(() => {}); - }); + afterEach(() => { + traceExporter.reset(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + describe('Database', () => { it('getSessions', async () => { const [rows] = await database.getSessions(); - traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -228,7 +225,13 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Begin Transaction']; + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -247,7 +250,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -265,7 +267,12 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -304,7 +311,12 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -366,7 +378,12 @@ describe('EndToEnd', () => { 'Expected that RunStream has a defined spanId' ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -403,7 +420,12 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = []; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Transaction Creation Done', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -446,6 +468,9 @@ describe('EndToEnd', () => { const expectedEventNames = [ 'Starting Commit', 'Commit Done', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', 'Using Session', ]; assert.deepStrictEqual( @@ -457,50 +482,146 @@ describe('EndToEnd', () => { done(); }); }); + + it('batchCreateSessions', done => { + database.batchCreateSessions(5, (err, sessions) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); }); }); -describe('ObservabilityOptions injection and propagation', async () => { - const globalTraceExporter = new InMemorySpanExporter(); - const globalTracerProvider = new NodeTracerProvider({ - sampler: new AlwaysOnSampler(), - exporter: globalTraceExporter, +describe('SessionPool', async () => { + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, }); - globalTracerProvider.addSpanProcessor( - new SimpleSpanProcessor(globalTraceExporter) - ); - globalTracerProvider.register(); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - const injectedTraceExporter = new InMemorySpanExporter(); - const injectedTracerProvider = new NodeTracerProvider({ - sampler: new AlwaysOnSampler(), - exporter: injectedTraceExporter, + const setupResult = await setup({ + tracerProvider: provider, + enableExtendedTracing: false, }); - injectedTracerProvider.addSpanProcessor( - new SimpleSpanProcessor(injectedTraceExporter) - ); - const observabilityOptions: typeof ObservabilityOptions = { - tracerProvider: injectedTracerProvider, - enableExtendedTracing: true, - }; - - const setupResult = await setup(observabilityOptions); const spanner = setupResult.spanner; const server = setupResult.server; const spannerMock = setupResult.spannerMock; + const instance = spanner.instance('instance'); after(async () => { - globalTraceExporter.reset(); - injectedTraceExporter.reset(); - await globalTracerProvider.shutdown(); - await injectedTracerProvider.shutdown(); + traceExporter.reset(); + await provider.shutdown(); spannerMock.resetRequests(); spanner.close(); server.tryShutdown(() => {}); }); - it('Passed into Spanner, Instance and Database', done => { + it('_createSessions', async () => { + // The first invocation of new SessionPool shall implicitly happen in here. + const database = instance.database('database'); + await database.run('SELECT 1'); + + await provider.forceFlush(); + traceExporter.reset(); + + // Explicitly invoking new SessionPool. + const sessionPool = new SessionPool(database); + + const OPTIONS = 3; + await sessionPool._createSessions(OPTIONS); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 3 sessions', + 'Creating 3 sessions', + 'Requested for 3 sessions returned 3', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); +}); + +describe('ObservabilityOptions injection and propagation', async () => { + it('Passed into Spanner, Instance and Database', async () => { + const traceExporter = new InMemorySpanExporter(); + const tracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: tracerProvider, + enableExtendedTracing: true, + }; + + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + traceExporter.reset(); + await tracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + // Ensure that the same observability configuration is set on the Spanner client. assert.deepStrictEqual(spanner._observabilityOptions, observabilityOptions); @@ -534,23 +655,50 @@ describe('ObservabilityOptions injection and propagation', async () => { databaseByConstructor._observabilityOptions, observabilityOptions ); - - done(); }); - afterEach(async () => { - await injectedTracerProvider.forceFlush(); - injectedTraceExporter.reset(); - }); + describe('Transaction', async () => { + const traceExporter = new InMemorySpanExporter(); + const tracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: tracerProvider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + traceExporter.reset(); + await tracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); - let database: Database; - beforeEach(() => { - const instance = spanner.instance('instance'); - database = instance.database('db'); - }); + let database: Database; + beforeEach(async () => { + const instance = spanner.instance('instance'); + database = instance.database('database'); - describe('Transaction', () => { - const traceExporter = injectedTraceExporter; + // To deflake expectations of session creation, let's + // issue out a warm-up request request that'll ensure + // that the SessionPool is created deterministically. + const [rows] = await database.run('SELECT 1'); + // Clear out any present traces to make a clean slate for testing. + traceExporter.forceFlush(); + traceExporter.reset(); + }); + + afterEach(() => { + spannerMock.resetRequests(); + }); it('run', done => { database.getTransaction((err, tx) => { @@ -582,11 +730,8 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', 'Acquiring session', - 'Waiting for a session to become available', + 'Cache hit: has usable session', 'Acquired session', 'Using Session', 'Transaction Creation Done', @@ -643,7 +788,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Begin Transaction', 'Transaction Creation Done', ]; - assert.strictEqual( + assert.deepStrictEqual( actualEventNames.every(value => expectedEventNames.includes(value)), true, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` @@ -689,7 +834,12 @@ describe('ObservabilityOptions injection and propagation', async () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -758,7 +908,43 @@ describe('ObservabilityOptions injection and propagation', async () => { }); }); - it('Propagates spans to the injected not global TracerProvider', done => { + it('Propagates spans to the injected not global TracerProvider', async () => { + const globalTraceExporter = new InMemorySpanExporter(); + const globalTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: globalTraceExporter, + }); + globalTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(globalTraceExporter) + ); + globalTracerProvider.register(); + + const injectedTraceExporter = new InMemorySpanExporter(); + const injectedTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: injectedTraceExporter, + }); + injectedTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(injectedTraceExporter) + ); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: injectedTracerProvider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + injectedTraceExporter.reset(); + await injectedTracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + const instance = spanner.instance('instance'); const database = instance.database('database'); @@ -794,6 +980,8 @@ describe('ObservabilityOptions injection and propagation', async () => { }); const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', @@ -805,6 +993,9 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', @@ -815,7 +1006,199 @@ describe('ObservabilityOptions injection and propagation', async () => { expectedEventNames, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); + }); + }); +}); + +describe('E2E traces with async/await', async () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + let provider: typeof TracerProvider; + let observabilityOptions: typeof ObservabilityOptions; + + beforeEach(async () => { + traceExporter = new InMemorySpanExporter(); + provider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + observabilityOptions = { + tracerProvider: provider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + }); + + afterEach(async () => { + traceExporter.reset(); + provider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + + function assertAsyncAwaitExpectations() { + // See https://github.com/googleapis/nodejs-spanner/issues/2146. + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // We need to ensure a strict relationship between the spans. + // runSpan -------------------| + // |-runStream ----------| + const runStreamSpan = spans[spans.length - 2]; + const runSpan = spans[spans.length - 1]; + assert.ok( + runSpan.spanContext().traceId, + 'Expected that runSpan has a defined traceId' + ); + assert.ok( + runStreamSpan.spanContext().traceId, + 'Expected that runStreamSpan has a defined traceId' + ); + assert.deepStrictEqual( + runStreamSpan.parentSpanId, + runSpan.spanContext().spanId, + `Expected that runSpan(spanId=${runSpan.spanContext().spanId}) is the parent to runStreamSpan(parentSpanId=${runStreamSpan.parentSpanId})` + ); + assert.deepStrictEqual( + runSpan.spanContext().traceId, + runStreamSpan.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + runStreamSpan.spanContext().spanId, + 'Expected that runStreamSpan has a defined spanId' + ); + assert.ok( + runSpan.spanContext().spanId, + 'Expected that runSpan has a defined spanId' + ); + + const databaseBatchCreateSessionsSpan = spans[0]; + assert.strictEqual( + databaseBatchCreateSessionsSpan.name, + 'CloudSpanner.Database.batchCreateSessions' + ); + const sessionPoolCreateSessionsSpan = spans[1]; + assert.strictEqual( + sessionPoolCreateSessionsSpan.name, + 'CloudSpanner.SessionPool.createSessions' + ); + assert.ok( + sessionPoolCreateSessionsSpan.spanContext().traceId, + 'Expecting a defined sessionPoolCreateSessions traceId' + ); + assert.deepStrictEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + databaseBatchCreateSessionsSpan.spanContext().traceId, + 'Expected the same traceId' + ); + assert.deepStrictEqual( + databaseBatchCreateSessionsSpan.parentSpanId, + sessionPoolCreateSessionsSpan.spanContext().spanId, + 'Expected that sessionPool.createSessions is the parent to db.batchCreassionSessions' + ); + + // Assert that despite all being exported, SessionPool.createSessions + // is not in the same trace as runStream, createSessions is invoked at + // Spanner Client instantiation, thus before database.run is invoked. + assert.notEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + runSpan.spanContext().traceId, + 'Did not expect the same traceId' + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + } + + it('async/await correctly parents trace spans', async () => { + // See https://github.com/googleapis/nodejs-spanner/issues/2146. + async function main() { + const instance = spanner.instance('testing'); + const database = instance.database('db-1'); + + const query = { + sql: selectSql, + }; + + const [rows] = await database.run(query); + + rows.forEach(row => { + const json = row.toJSON(); + }); + + provider.forceFlush(); + } + + await main(); + assertAsyncAwaitExpectations(); + }); + + it('callback correctly parents trace spans', done => { + function main(onComplete) { + const instance = spanner.instance('testing'); + const database = instance.database('db-1'); + + const query = { + sql: selectSql, + }; + + database.run(query, (err, rows) => { + rows.forEach(row => { + const json = row.toJSON(); + }); + + provider.forceFlush(); + onComplete(); + }); + } + main(() => { + assertAsyncAwaitExpectations(); done(); }); }); diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index 7d1795e49..ad1fc0b47 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -178,7 +178,10 @@ describe('Transaction', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Begin Transaction']; + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, diff --git a/package.json b/package.json index ac111fc84..27ed2f743 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "@google-cloud/promisify": "^4.0.0", "@grpc/proto-loader": "^0.7.0", "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^1.26.0", "@opentelemetry/semantic-conventions": "^1.25.1", "@types/big.js": "^6.0.0", "@types/stack-trace": "0.0.33", @@ -85,7 +86,6 @@ "through2": "^4.0.0" }, "devDependencies": { - "@opentelemetry/context-async-hooks": "^1.25.1", "@opentelemetry/sdk-trace-base": "^1.26.0", "@opentelemetry/sdk-trace-node": "^1.26.0", "@types/concat-stream": "^2.0.0", diff --git a/src/database.ts b/src/database.ts index aad10f111..9f67bf01b 100644 --- a/src/database.ts +++ b/src/database.ts @@ -450,6 +450,11 @@ class Database extends common.GrpcServiceObject { typeof poolOptions === 'function' ? new (poolOptions as SessionPoolConstructor)(this, null) : new SessionPool(this, poolOptions); + const sessionPoolInstance = this.pool_ as SessionPool; + if (sessionPoolInstance) { + sessionPoolInstance._observabilityOptions = + instance._observabilityOptions; + } if (typeof poolOptions === 'object') { this.databaseRole = poolOptions.databaseRole || null; } @@ -459,6 +464,7 @@ class Database extends common.GrpcServiceObject { [CLOUD_RESOURCE_HEADER]: this.formattedName_, }; this.request = instance.request; + this._observabilityOptions = instance._observabilityOptions; // eslint-disable-next-line @typescript-eslint/no-explicit-any this.requestStream = instance.requestStream as any; this.pool_.on('error', this.emit.bind(this, 'error')); @@ -467,7 +473,6 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); - this._observabilityOptions = instance._observabilityOptions; } /** * @typedef {array} SetDatabaseMetadataResponse @@ -677,30 +682,36 @@ class Database extends common.GrpcServiceObject { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'batchCreateSessions', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - (err, resp) => { - if (err) { - callback!(err, null, resp!); - return; - } + const traceConfig = {opts: this._observabilityOptions}; + startTrace('Database.batchCreateSessions', traceConfig, span => { + this.request( + { + client: 'SpannerClient', + method: 'batchCreateSessions', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, resp!); + return; + } - const sessions = (resp!.session || []).map(metadata => { - const session = this.session(metadata.name!); - session._observabilityOptions = this._observabilityOptions; - session.metadata = metadata; - return session; - }); + const sessions = (resp!.session || []).map(metadata => { + const session = this.session(metadata.name!); + session._observabilityOptions = this._observabilityOptions; + session.metadata = metadata; + return session; + }); - callback!(null, sessions, resp!); - } - ); + span.end(); + callback!(null, sessions, resp!); + } + ); + }); } /** @@ -2177,6 +2188,7 @@ class Database extends common.GrpcServiceObject { if (!err) { span.addEvent('Using Session', {'session.id': session?.id}); + transaction!._observabilityOptions = this._observabilityOptions; this._releaseOnEnd(session!, transaction!, span); } else if (isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { @@ -3206,6 +3218,8 @@ class Database extends common.GrpcServiceObject { runFn!(err as grpc.ServiceError); return; } + + transaction!._observabilityOptions = this._observabilityOptions; if (options.optimisticLock) { transaction!.useOptimisticLock(); } diff --git a/src/instance.ts b/src/instance.ts index b72f24622..4986e3ecd 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -1363,6 +1363,7 @@ class Instance extends common.GrpcServiceObject { databases = rowDatabases.map(database => { const databaseInstance = self.database(database.name!, {min: 0}); databaseInstance.metadata = database; + databaseInstance._observabilityOptions = this._observabilityOptions; return databaseInstance; }); } diff --git a/src/instrument.ts b/src/instrument.ts index 6cad7bc4a..99b260bf4 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -26,8 +26,10 @@ import { import { Span, SpanStatusCode, + context, trace, INVALID_SPAN_CONTEXT, + ROOT_CONTEXT, SpanAttributes, TimeInput, TracerProvider, @@ -93,6 +95,29 @@ interface traceConfig { const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix. export {SPAN_NAMESPACE_PREFIX, traceConfig}; +const { + AsyncHooksContextManager, +} = require('@opentelemetry/context-async-hooks'); + +/* + * This function ensures that async/await works correctly by + * checking if context.active() returns an invalid/unset context + * and if so, sets a global AsyncHooksContextManager otherwise + * spans resulting from async/await invocations won't be correctly + * associated in their respective hierarchies. + */ +function ensureInitialContextManagerSet() { + if (context.active() === ROOT_CONTEXT) { + // If no active context was set previously, trace context propagation cannot + // function correctly with async/await for OpenTelemetry + // See {@link https://opentelemetry.io/docs/languages/js/context/#active-context} + context.disable(); // Disable any prior contextManager. + const contextManager = new AsyncHooksContextManager(); + contextManager.enable(); + context.setGlobalContextManager(contextManager); + } +} + /** * startTrace begins an active span in the current active context * and passes it back to the set callback function. Each span will @@ -111,6 +136,8 @@ export function startTrace( config = {} as traceConfig; } + ensureInitialContextManagerSet(); + return getTracer(config.opts?.tracerProvider).startActiveSpan( SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix, {kind: SpanKind.CLIENT}, diff --git a/src/session-pool.ts b/src/session-pool.ts index 09300ecde..71be508da 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -24,7 +24,12 @@ import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {GoogleError, grpc, ServiceError} from 'google-gax'; import trace = require('stack-trace'); -import {getActiveOrNoopSpan} from './instrument'; +import { + ObservabilityOptions, + getActiveOrNoopSpan, + setSpanErrorAndException, + startTrace, +} from './instrument'; /** * @callback SessionPoolCloseCallback @@ -353,6 +358,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { _pingHandle!: NodeJS.Timer; _requests: PQueue; _traces: Map; + _observabilityOptions?: ObservabilityOptions; /** * Formats stack trace objects into Node-like stack trace. @@ -485,6 +491,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { }); this._traces = new Map(); + this._observabilityOptions = database._observabilityOptions; } /** @@ -738,9 +745,6 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @emits SessionPool#createError */ async _createSessions(amount: number): Promise { - const span = getActiveOrNoopSpan(); - span.addEvent(`Requesting ${amount} sessions`); - const labels = this.options.labels!; const databaseRole = this.options.databaseRole!; @@ -752,41 +756,51 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { let nReturned = 0; const nRequested: number = amount; - // while we can request as many sessions be created as we want, the backend - // will return at most 100 at a time, hence the need for a while loop. - while (amount > 0) { - let sessions: Session[] | null = null; - - span.addEvent(`Creating ${amount} sessions`); + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('SessionPool.createSessions', traceConfig, async span => { + span.addEvent(`Requesting ${amount} sessions`); + + // while we can request as many sessions be created as we want, the backend + // will return at most 100 at a time, hence the need for a while loop. + while (amount > 0) { + let sessions: Session[] | null = null; + + span.addEvent(`Creating ${amount} sessions`); + + try { + [sessions] = await this.database.batchCreateSessions({ + count: amount, + labels: labels, + databaseRole: databaseRole, + }); + + amount -= sessions.length; + nReturned += sessions.length; + } catch (e) { + this._pending -= amount; + this.emit('createError', e); + span.addEvent( + `Requested for ${nRequested} sessions returned ${nReturned}` + ); + setSpanErrorAndException(span, e as Error); + span.end(); + throw e; + } - try { - [sessions] = await this.database.batchCreateSessions({ - count: amount, - labels: labels, - databaseRole: databaseRole, + sessions.forEach((session: Session) => { + setImmediate(() => { + this._inventory.borrowed.add(session); + this._pending -= 1; + this.release(session); + }); }); - - amount -= sessions.length; - nReturned += sessions.length; - } catch (e) { - this._pending -= amount; - this.emit('createError', e); - span.addEvent( - `Requested for ${nRequested} sessions returned ${nReturned}` - ); - throw e; } - sessions.forEach((session: Session) => { - setImmediate(() => { - this._inventory.borrowed.add(session); - this._pending -= 1; - this.release(session); - }); - }); - } - - span.addEvent(`Requested for ${nRequested} sessions returned ${nReturned}`); + span.addEvent( + `Requested for ${nRequested} sessions returned ${nReturned}` + ); + span.end(); + }); } /** diff --git a/src/table.ts b/src/table.ts index a435b7a40..74d0e0375 100644 --- a/src/table.ts +++ b/src/table.ts @@ -1082,6 +1082,7 @@ class Table { ): void { const traceConfig: traceConfig = { opts: this._observabilityOptions, + tableName: this.name, }; startTrace('Table.' + method, traceConfig, span => { diff --git a/test/spanner.ts b/test/spanner.ts index fc4e11b91..032d18493 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -4999,7 +4999,7 @@ describe('Spanner with mock server', () => { // and tests the database/instance suffix is an iteration of // each afresh invocation of newTestDatabase, which has been // causing test flakes. - it('Check for span annotations', () => { + it('Check for span annotations', done => { const exporter = new InMemorySpanExporter(); const provider = new NodeTracerProvider({ sampler: new AlwaysOnSampler(), @@ -5013,45 +5013,68 @@ describe('Spanner with mock server', () => { }); const opts: typeof ObservabilityOptions = {tracerProvider: provider}; - startTrace('aSpan', {opts: opts}, span => { + startTrace('aSpan', {opts: opts}, async span => { + instance._observabilityOptions = opts; const database = newTestDatabase(); database._observabilityOptions = opts; - async function runIt() { - const query = { - sql: 'SELECT 1', - }; - - const [rows] = await database.run(query); - assert.strictEqual(rows.length, 1); - } + const query = { + sql: 'SELECT 1', + }; - runIt(); + const [rows] = await database.run(query); + assert.strictEqual(rows.length, 1); span.end(); + exporter.forceFlush(); const spans = exporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span'); - const span0 = spans[0]; - const events = span0.events; - // Sort the events by earliest time of occurence. - events.sort((evtA, evtB) => { - return evtA.time < evtB.time; + // Sort the spans by startTime. + spans.sort((spanA, spanB) => { + spanA.startTime < spanB.startTime; }); - const gotEventNames: string[] = []; - events.forEach(event => { - gotEventNames.push(event.name); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); }); - const wantEventNames = ['Requesting 25 sessions', 'Creating 25 sessions']; + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + 'CloudSpanner.aSpan', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; assert.deepEqual( - gotEventNames, - wantEventNames, - `Mismatched events\n\tGot: ${gotEventNames}\n\tWant: ${wantEventNames}` + actualEventNames, + expectedEventNames, + `Mismatched events\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); + + done(); }); }); }); From 0342e74721a0684d8195a6299c3a634eefc2b522 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 10 Oct 2024 05:58:26 -0700 Subject: [PATCH 3/4] feat: (observability) propagate database name for every span generated to aid in quick debugging (#2155) With this change customers shall always be able to identify which database is being connected to. Updates #2079 ## Exhibit Screenshot 2024-10-08 at 10 52 56 PM --- observability-test/database.ts | 41 +++++++++++-- observability-test/helper.ts | 15 +++++ observability-test/spanner.ts | 51 +++++++++++++++- observability-test/table.ts | 20 ++++++- observability-test/transaction.ts | 8 +++ src/batch-transaction.ts | 8 +++ src/database.ts | 99 +++++++++++++++++-------------- src/session-pool.ts | 7 ++- src/table.ts | 6 ++ src/transaction.ts | 60 +++++++++++++++---- 10 files changed, 253 insertions(+), 62 deletions(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index d4dcea825..fce3ef743 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -39,6 +39,7 @@ import {Instance, Spanner} from '../src'; import * as pfy from '@google-cloud/promisify'; import {grpc} from 'google-gax'; import {MockError} from '../test/mockserver/mockspanner'; +const {generateWithAllSpansHaveDBName} = require('./helper'); const fakePfy = extend({}, pfy, { promisifyAll(klass, options) { @@ -235,16 +236,20 @@ describe('Database', () => { DatabaseCached = Object.assign({}, Database); }); + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + INSTANCE.formattedName_ + '/databases/' + NAME + ); + beforeEach(() => { fakeCodec.encode = util.noop; extend(Database, DatabaseCached); - database = new Database(INSTANCE, NAME, POOL_OPTIONS); - database.parent = INSTANCE; - database.databaseRole = 'parent_role'; - database._observabilityOptions = { + INSTANCE._observabilityOptions = { tracerProvider: provider, enableExtendedTracing: false, }; + database = new Database(INSTANCE, NAME, POOL_OPTIONS); + database.parent = INSTANCE; + database.databaseRole = 'parent_role'; const gaxOpts = {}; const options: { a: string; @@ -285,6 +290,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualSpanNames: string[] = []; const actualEventNames: string[] = []; spans.forEach(span => { @@ -333,6 +340,7 @@ describe('Database', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -523,6 +531,7 @@ describe('Database', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -604,6 +613,7 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -706,6 +716,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualEventNames: string[] = []; const actualSpanNames: string[] = []; spans.forEach(span => { @@ -771,6 +783,7 @@ describe('Database', () => { assert.strictEqual(resp, RESPONSE); const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -836,6 +849,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualSpanNames: string[] = []; const actualEventNames: string[] = []; spans.forEach(span => { @@ -911,6 +926,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualEventNames: string[] = []; const actualSpanNames: string[] = []; spans.forEach(span => { @@ -958,6 +975,8 @@ describe('Database', () => { assert.strictEqual(transaction, fakeTransaction); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -1037,6 +1056,7 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -1091,6 +1111,7 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -1148,6 +1169,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualSpanNames: string[] = []; const actualEventNames: string[] = []; spans.forEach(span => { @@ -1222,6 +1245,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualSpanNames: string[] = []; const actualEventNames: string[] = []; spans.forEach(span => { @@ -1273,6 +1298,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualSpanNames: string[] = []; const actualEventNames: string[] = []; spans.forEach(span => { @@ -1376,6 +1403,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualEventNames: string[] = []; const actualSpanNames: string[] = []; spans.forEach(span => { @@ -1427,6 +1456,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualEventNames: string[] = []; const actualSpanNames: string[] = []; spans.forEach(span => { @@ -1491,6 +1522,8 @@ describe('Database', () => { const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 2, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); + const actualSpanNames: string[] = []; const actualEventNames: string[] = []; spans.forEach(span => { diff --git a/observability-test/helper.ts b/observability-test/helper.ts index 342a413ee..b6d429d32 100644 --- a/observability-test/helper.ts +++ b/observability-test/helper.ts @@ -15,6 +15,9 @@ */ import {ContextManager, context} from '@opentelemetry/api'; +import * as assert from 'assert'; +const {ReadableSpan} = require('@opentelemetry/sdk-trace-base'); +import {SEMATTRS_DB_NAME} from '@opentelemetry/semantic-conventions'; /** * This utility exists as a test helper because mocha has builtin "context" @@ -32,3 +35,15 @@ export function disableContextAndManager(manager: ContextManager) { manager.disable(); context.disable(); } + +export function generateWithAllSpansHaveDBName(dbName: String): Function { + return function (spans: (typeof ReadableSpan)[]) { + spans.forEach(span => { + assert.deepStrictEqual( + span.attributes[SEMATTRS_DB_NAME], + dbName, + `Span ${span.name} has mismatched DB_NAME` + ); + }); + }; +} diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 322fcc1b9..93300a1ad 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -30,7 +30,11 @@ const { } = require('@opentelemetry/sdk-trace-node'); // eslint-disable-next-line n/no-extraneous-require const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); -const {disableContextAndManager, setGlobalContextManager} = require('./helper'); +const { + disableContextAndManager, + generateWithAllSpansHaveDBName, + setGlobalContextManager, +} = require('./helper'); const { AsyncHooksContextManager, } = require('@opentelemetry/context-async-hooks'); @@ -168,7 +172,13 @@ describe('EndToEnd', () => { it('getSessions', async () => { const [rows] = await database.getSessions(); + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); + traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -195,6 +205,10 @@ describe('EndToEnd', () => { }); it('getSnapshot', done => { + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); + database.getSnapshot((err, transaction) => { assert.ifError(err); @@ -203,6 +217,7 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -244,12 +259,16 @@ describe('EndToEnd', () => { }); it('getTransaction', done => { + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); database.getTransaction((err, transaction) => { assert.ifError(err); assert.ok(transaction); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -284,6 +303,9 @@ describe('EndToEnd', () => { }); it('runStream', done => { + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); database .runStream('SELECT 1') .on('data', row => {}) @@ -291,6 +313,7 @@ describe('EndToEnd', () => { .on('end', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -328,10 +351,14 @@ describe('EndToEnd', () => { }); it('run', async () => { + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); const [rows] = await database.run('SELECT 1'); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); // Sort the spans by duration. spans.sort((spanA, spanB) => { @@ -392,6 +419,9 @@ describe('EndToEnd', () => { }); it('runTransaction', done => { + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); database.runTransaction((err, transaction) => { assert.ifError(err); transaction!.run('SELECT 1', (err, rows) => { @@ -399,6 +429,7 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -438,6 +469,9 @@ describe('EndToEnd', () => { }); it('writeAtLeastOnce', done => { + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); const blankMutations = new MutationSet(); database.writeAtLeastOnce(blankMutations, (err, response) => { assert.ifError(err); @@ -445,6 +479,7 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -700,6 +735,11 @@ describe('ObservabilityOptions injection and propagation', async () => { spannerMock.resetRequests(); }); + const db = spanner.instance('instance').database('database'); + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + db.formattedName_ + ); + it('run', done => { database.getTransaction((err, tx) => { assert.ifError(err); @@ -708,6 +748,7 @@ describe('ObservabilityOptions injection and propagation', async () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -761,6 +802,7 @@ describe('ObservabilityOptions injection and propagation', async () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); assert.strictEqual(spans.length, 4); const actualSpanNames: string[] = []; @@ -814,6 +856,7 @@ describe('ObservabilityOptions injection and propagation', async () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -866,6 +909,7 @@ describe('ObservabilityOptions injection and propagation', async () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -948,6 +992,10 @@ describe('ObservabilityOptions injection and propagation', async () => { const instance = spanner.instance('instance'); const database = instance.database('database'); + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); + database.run('SELECT 1', (err, rows) => { assert.ifError(err); @@ -970,6 +1018,7 @@ describe('ObservabilityOptions injection and propagation', async () => { spansFromInjected.sort((spanA, spanB) => { spanA.startTime < spanB.startTime; }); + withAllSpansHaveDBName(spansFromInjected); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; spansFromInjected.forEach(span => { diff --git a/observability-test/table.ts b/observability-test/table.ts index 86f6145f9..558312c6c 100644 --- a/observability-test/table.ts +++ b/observability-test/table.ts @@ -31,6 +31,7 @@ import {SpanStatusCode} from '@opentelemetry/api'; // eslint-disable-next-line n/no-extraneous-require const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const {generateWithAllSpansHaveDBName} = require('./helper'); const fakePfy = extend({}, pfy, { promisifyAll(klass, options) { @@ -67,6 +68,7 @@ describe('Table', () => { let transaction: FakeTransaction; const DATABASE = { + formattedName_: 'formatted-db-name', runTransaction: (opts, callback) => callback(null, transaction), getSnapshot: (options, callback) => callback(null, transaction), }; @@ -100,6 +102,10 @@ describe('Table', () => { traceExporter.reset(); }); + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + DATABASE.formattedName_ + ); + function getExportedSpans(minCount: number) { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -215,7 +221,10 @@ describe('Table', () => { assert.ifError(err); assert.strictEqual(stub.callCount, 1); - const actualSpanNames = spanNames(getExportedSpans(1)); + const gotSpans = getExportedSpans(1); + withAllSpansHaveDBName(gotSpans); + + const actualSpanNames = spanNames(gotSpans); const expectedSpanNames = ['CloudSpanner.Table.upsert']; assert.deepStrictEqual( @@ -238,6 +247,8 @@ describe('Table', () => { assert.strictEqual(err, fakeError); const gotSpans = getExportedSpans(1); + withAllSpansHaveDBName(gotSpans); + const gotSpanStatus = gotSpans[0].status; const wantSpanStatus = { code: SpanStatusCode.ERROR, @@ -270,7 +281,10 @@ describe('Table', () => { assert.ifError(err); assert.strictEqual(stub.callCount, 1); - const actualSpanNames = spanNames(getExportedSpans(1)); + const gotSpans = getExportedSpans(1); + withAllSpansHaveDBName(gotSpans); + + const actualSpanNames = spanNames(gotSpans); const expectedSpanNames = ['CloudSpanner.Table.replace']; assert.deepStrictEqual( actualSpanNames, @@ -302,6 +316,8 @@ describe('Table', () => { `mismatch in span status:\n\tGot: ${JSON.stringify(gotSpanStatus)}\n\tWant: ${JSON.stringify(wantSpanStatus)}` ); + withAllSpansHaveDBName(gotSpans); + const actualSpanNames = spanNames(gotSpans); const expectedSpanNames = ['CloudSpanner.Table.replace']; assert.deepStrictEqual( diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index ad1fc0b47..550f85fc3 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -32,6 +32,7 @@ const { ReadableSpan, SimpleSpanProcessor, } = require('@opentelemetry/sdk-trace-base'); +const {generateWithAllSpansHaveDBName} = require('./helper'); describe('Transaction', () => { const sandbox = sinon.createSandbox(); @@ -54,6 +55,10 @@ describe('Transaction', () => { parent: INSTANCE, }; + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + DATABASE.formattedName_ + ); + const SESSION = { parent: DATABASE, formattedName_: SESSION_NAME, @@ -723,6 +728,7 @@ describe('Transaction', () => { // Ensure that the final span that got retries did not error. const spans = exportResults.spans; + const firstSpan = spans[0]; assert.strictEqual( SpanStatusCode.ERROR, @@ -734,6 +740,8 @@ describe('Transaction', () => { firstSpan.status.message, 'Unexpected span status message' ); + + withAllSpansHaveDBName(spans); }); }); }); diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index 403f7dd6e..d182d4429 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -140,6 +140,7 @@ class BatchTransaction extends Snapshot { const traceConfig: traceConfig = { sql: query, opts: this._observabilityOptions, + dbName: this.getDBName(), }; return startTrace( 'BatchTransaction.createQueryPartitions', @@ -170,6 +171,11 @@ class BatchTransaction extends Snapshot { } ); } + + protected getDBName(): string { + return (this.session.parent as Database).formattedName_; + } + /** * Generic create partition method. Handles common parameters used in both * {@link BatchTransaction#createQueryPartitions} and {@link @@ -183,6 +189,7 @@ class BatchTransaction extends Snapshot { createPartitions_(config, callback) { const traceConfig: traceConfig = { opts: this._observabilityOptions, + dbName: this.getDBName(), }; return startTrace( @@ -260,6 +267,7 @@ class BatchTransaction extends Snapshot { createReadPartitions(options, callback) { const traceConfig: traceConfig = { opts: this._observabilityOptions, + dbName: this.getDBName(), }; return startTrace( diff --git a/src/database.ts b/src/database.ts index 9f67bf01b..9a2b703a0 100644 --- a/src/database.ts +++ b/src/database.ts @@ -109,6 +109,7 @@ import { startTrace, setSpanError, setSpanErrorAndException, + traceConfig, } from './instrument'; export type GetDatabaseRolesCallback = RequestCallback< @@ -344,7 +345,8 @@ class Database extends common.GrpcServiceObject { databaseDialect?: EnumKey< typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect > | null; - _observabilityOptions?: ObservabilityOptions; + _observabilityOptions?: ObservabilityOptions; // TODO: exmaine if we can remove it + private _traceConfig: traceConfig; constructor( instance: Instance, name: string, @@ -460,6 +462,12 @@ class Database extends common.GrpcServiceObject { } this.formattedName_ = formattedName_; this.instance = instance; + this._observabilityOptions = instance._observabilityOptions; + this._traceConfig = { + opts: this._observabilityOptions, + dbName: this.formattedName_, + }; + this.resourceHeader_ = { [CLOUD_RESOURCE_HEADER]: this.formattedName_, }; @@ -682,8 +690,7 @@ class Database extends common.GrpcServiceObject { addLeaderAwareRoutingHeader(headers); } - const traceConfig = {opts: this._observabilityOptions}; - startTrace('Database.batchCreateSessions', traceConfig, span => { + startTrace('Database.batchCreateSessions', this._traceConfig, span => { this.request( { client: 'SpannerClient', @@ -702,7 +709,7 @@ class Database extends common.GrpcServiceObject { const sessions = (resp!.session || []).map(metadata => { const session = this.session(metadata.name!); - session._observabilityOptions = this._observabilityOptions; + session._observabilityOptions = this._traceConfig!.opts; session.metadata = metadata; return session; }); @@ -749,7 +756,7 @@ class Database extends common.GrpcServiceObject { const id = identifier.transaction; const transaction = new BatchTransaction(session, options); transaction.id = id; - transaction._observabilityOptions = this._observabilityOptions; + transaction._observabilityOptions = this._traceConfig!.opts; transaction.readTimestamp = identifier.timestamp as PreciseDate; return transaction; } @@ -838,36 +845,41 @@ class Database extends common.GrpcServiceObject { typeof optionsOrCallback === 'object' ? (optionsOrCallback as TimestampBounds) : {}; - - const traceConfig = {opts: this._observabilityOptions}; - return startTrace('Database.createBatchTransaction', traceConfig, span => { - this.pool_.getSession((err, session) => { - if (err) { - setSpanError(span, err); - span.end(); - callback!(err as ServiceError, null, undefined); - return; - } - const transaction = this.batchTransaction({session: session!}, options); - this._releaseOnEnd(session!, transaction, span); - transaction.begin((err, resp) => { + return startTrace( + 'Database.createBatchTransaction', + this._traceConfig, + span => { + this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - } span.end(); - callback!(err, null, resp!); + callback!(err as ServiceError, null, undefined); return; } - span.addEvent('Using Session', {'session.id': session?.id}); - span.end(); - callback!(null, transaction, resp!); + const transaction = this.batchTransaction( + {session: session!}, + options + ); + this._releaseOnEnd(session!, transaction, span); + transaction.begin((err, resp) => { + if (err) { + setSpanError(span, err); + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + } + span.end(); + callback!(err, null, resp!); + return; + } + span.addEvent('Using Session', {'session.id': session?.id}); + span.end(); + callback!(null, transaction, resp!); + }); }); - }); - }); + } + ); } /** * Create a new session. @@ -1097,7 +1109,7 @@ class Database extends common.GrpcServiceObject { /CREATE TABLE `*([^\s`(]+)/ )![1]; const table = this.table(tableName!); - table._observabilityOptions = this._observabilityOptions; + table._observabilityOptions = this._traceConfig!.opts; callback!(null, table, operation!, resp!); }); } @@ -1886,8 +1898,7 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - const traceConfig = {opts: this._observabilityOptions}; - return startTrace('Database.getSessions', traceConfig, span => { + return startTrace('Database.getSessions', this._traceConfig, span => { this.request< google.spanner.v1.ISession, google.spanner.v1.IListSessionsResponse @@ -1908,7 +1919,7 @@ class Database extends common.GrpcServiceObject { sessionInstances = sessions.map(metadata => { const session = self.session(metadata.name!); session.metadata = metadata; - session._observabilityOptions = this._observabilityOptions; + session._observabilityOptions = this._traceConfig!.opts; return session; }); } @@ -2069,8 +2080,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const traceConfig = {opts: this._observabilityOptions}; - return startTrace('Database.getSnapshot', traceConfig, span => { + return startTrace('Database.getSnapshot', this._traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); @@ -2170,8 +2180,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as GetTransactionOptions) : {}; - const traceConfig = {opts: this._observabilityOptions}; - return startTrace('Database.getTransaction', traceConfig, span => { + return startTrace('Database.getTransaction', this._traceConfig, span => { this.pool_.getSession((err, session, transaction) => { if (options.requestOptions) { transaction!.requestOptions = Object.assign( @@ -2798,7 +2807,10 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const traceConfig = {sql: query, opts: this._observabilityOptions}; + const traceConfig = { + sql: query, + ...this._traceConfig, + }; return startTrace('Database.run', traceConfig, span => { this.runStream(query, options) .on('error', err => { @@ -3019,7 +3031,10 @@ class Database extends common.GrpcServiceObject { options?: TimestampBounds ): PartialResultStream { const proxyStream: Transform = through.obj(); - const traceConfig = {sql: query, opts: this._observabilityOptions}; + const traceConfig = { + sql: query, + ...this._traceConfig, + }; return startTrace('Database.runStream', traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { @@ -3197,8 +3212,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - const traceConfig = {opts: this._observabilityOptions}; - startTrace('Database.runTransaction', traceConfig, span => { + startTrace('Database.runTransaction', this._traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err) { setSpanError(span, err); @@ -3592,8 +3606,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as CallOptions) : {}; - const traceConfig = {opts: this._observabilityOptions}; - return startTrace('Database.writeAtLeastOnce', traceConfig, span => { + return startTrace('Database.writeAtLeastOnce', this._traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err && isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { diff --git a/src/session-pool.ts b/src/session-pool.ts index 71be508da..b206bcf8e 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -756,7 +756,12 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { let nReturned = 0; const nRequested: number = amount; - const traceConfig = {opts: this._observabilityOptions}; + // TODO: Inlining this code for now and later on shall go + // extract _traceConfig to the constructor when we have plenty of time. + const traceConfig = { + opts: this._observabilityOptions, + dbName: this.database.formattedName_, + }; return startTrace('SessionPool.createSessions', traceConfig, async span => { span.addEvent(`Requesting ${amount} sessions`); diff --git a/src/table.ts b/src/table.ts index 74d0e0375..227f8d107 100644 --- a/src/table.ts +++ b/src/table.ts @@ -191,6 +191,11 @@ class Table { this.database.createTable(schema, gaxOptions, callback!); } + + protected getDBName(): string { + return this.database.formattedName_; + } + /** * Create a readable object stream to receive rows from the database using key * lookups and scans. @@ -1083,6 +1088,7 @@ class Table { const traceConfig: traceConfig = { opts: this._observabilityOptions, tableName: this.name, + dbName: this.getDBName(), }; startTrace('Table.' + method, traceConfig, span => { diff --git a/src/transaction.ts b/src/transaction.ts index 7e0dbb40b..e7993e74c 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -292,6 +292,7 @@ export class Snapshot extends EventEmitter { resourceHeader_: {[k: string]: string}; requestOptions?: Pick; _observabilityOptions?: ObservabilityOptions; + protected _dbName?: string; /** * The transaction ID. @@ -351,8 +352,9 @@ export class Snapshot extends EventEmitter { const readOnly = Snapshot.encodeTimestampBounds(options || {}); this._options = {readOnly}; + this._dbName = (this.session.parent as Database).formattedName_; this.resourceHeader_ = { - [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, + [CLOUD_RESOURCE_HEADER]: this._dbName, }; this._waitingRequests = []; this._inlineBeginStarted = false; @@ -439,7 +441,10 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const traceConfig = {opts: this._observabilityOptions}; + const traceConfig = { + opts: this._observabilityOptions, + dbName: this._dbName!, + }; return startTrace('Snapshot.begin', traceConfig, span => { span.addEvent('Begin Transaction'); @@ -717,7 +722,11 @@ export class Snapshot extends EventEmitter { }); }; - const traceConfig = {tableName: table, opts: this._observabilityOptions}; + const traceConfig = { + tableName: table, + opts: this._observabilityOptions, + dbName: this._dbName!, + }; return startTrace('Snapshot.createReadStream', traceConfig, span => { const resultStream = partialResultStream( this._wrapWithIdWaiter(makeRequest), @@ -961,7 +970,11 @@ export class Snapshot extends EventEmitter { callback = cb as ReadCallback; } - const traceConfig = {tableName: table, opts: this._observabilityOptions}; + const traceConfig = { + tableName: table, + opts: this._observabilityOptions, + dbName: this._dbName!, + }; return startTrace('Snapshot.read', traceConfig, span => { this.createReadStream(table, request) .on('error', err => { @@ -1065,7 +1078,11 @@ export class Snapshot extends EventEmitter { let stats: google.spanner.v1.ResultSetStats; let metadata: google.spanner.v1.ResultSetMetadata; - const traceConfig = {sql: query, opts: this._observabilityOptions}; + const traceConfig = { + sql: query, + opts: this._observabilityOptions, + dbName: this._dbName!, + }; startTrace('Snapshot.run', traceConfig, span => { return this.runStream(query) .on('error', (err, rows, stats, metadata) => { @@ -1258,7 +1275,11 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const traceConfig = {opts: this._observabilityOptions, ...query}; + const traceConfig = { + opts: this._observabilityOptions, + dbName: this._dbName!, + ...query, + }; return startTrace('Snapshot.runStream', traceConfig, span => { const makeRequest = (resumeToken?: ResumeToken): Readable => { if (!reqOpts || (this.id && !reqOpts.transaction.id)) { @@ -1627,7 +1648,11 @@ export class Dml extends Snapshot { query = {sql: query} as ExecuteSqlRequest; } - const traceConfig = {opts: this._observabilityOptions, ...query}; + const traceConfig = { + opts: this._observabilityOptions, + dbName: this._dbName!, + ...query, + }; return startTrace('Dml.runUpdate', traceConfig, span => { this.run( query, @@ -1904,7 +1929,10 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } - const traceConfig = {opts: this._observabilityOptions}; + const traceConfig = { + opts: this._observabilityOptions, + dbName: this._dbName!, + }; return startTrace('Transaction.batchUpdate', traceConfig, span => { this.request( { @@ -2074,7 +2102,10 @@ export class Transaction extends Dml { const requestOptions = (options as CommitOptions).requestOptions; const reqOpts: CommitRequest = {mutations, session, requestOptions}; - const traceConfig = {opts: this._observabilityOptions}; + const traceConfig = { + opts: this._observabilityOptions, + dbName: this._dbName!, + }; return startTrace('Transaction.commit', traceConfig, span => { if (this.id) { reqOpts.transactionId = this.id as Uint8Array; @@ -2440,7 +2471,10 @@ export class Transaction extends Dml { const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - const traceConfig = {opts: this._observabilityOptions}; + const traceConfig = { + opts: this._observabilityOptions, + dbName: this._dbName!, + }; return startTrace('Transaction.rollback', traceConfig, span => { if (!this.id) { const err = new Error( @@ -2933,7 +2967,11 @@ export class PartitionedDml extends Dml { query: string | ExecuteSqlRequest, callback?: RunUpdateCallback ): void | Promise { - const traceConfig = {sql: query, opts: this._observabilityOptions}; + const traceConfig = { + sql: query, + opts: this._observabilityOptions, + dbName: this._dbName!, + }; return startTrace('PartitionedDml.runUpdate', traceConfig, span => { super.runUpdate(query, (err, count) => { if (err) { From f01516ec6ba44730622cfb050c52cd93f30bba7a Mon Sep 17 00:00:00 2001 From: alkatrivedi <58396306+alkatrivedi@users.noreply.github.com> Date: Mon, 14 Oct 2024 12:22:26 +0000 Subject: [PATCH 4/4] fix: exact staleness timebound (#2143) fixes: #2129 --- samples/crud.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/crud.js b/samples/crud.js index 35a2858c3..41af3ecab 100644 --- a/samples/crud.js +++ b/samples/crud.js @@ -309,8 +309,8 @@ async function readStaleData(instanceId, databaseId, projectId) { }; const options = { - // Guarantees that all writes committed more than 15 seconds ago are visible - exactStaleness: 15, + // Guarantees that all writes committed more than 15000 milliseconds ago are visible + exactStaleness: 15000, }; try {