From c62584c861b36516410f41dd8ba426b17f27497a Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Thu, 11 Jan 2024 15:17:16 +0530 Subject: [PATCH 1/4] chore: integration test fix --- system-test/spanner.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/system-test/spanner.ts b/system-test/spanner.ts index d2bde030f..5575d5c5f 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -8731,10 +8731,10 @@ describe('Spanner', () => { }, err => { assert.strictEqual(err?.details, expectedErrorMessage); + transaction!.end(); + done(); } ); - transaction!.end(); - done(); }); }); }); From 6a8a8a06fee6e4ad36b02fc840b6d474f309d512 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Tue, 12 Nov 2024 15:13:18 +0530 Subject: [PATCH 2/4] Spans for abort transactions --- observability-test/spanner.ts | 105 ++++++++++++++++++++++++++++++++++ src/partial-result-stream.ts | 4 +- src/transaction-runner.ts | 5 +- 3 files changed, 112 insertions(+), 2 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index c60549776..ab3f4e26e 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -142,6 +142,11 @@ describe('EndToEnd', async () => { tracerProvider: tracerProvider, enableExtendedTracing: false, }); + let dbCounter = 1; + + function newTestDatabase(): Database { + return instance.database(`database-${dbCounter++}`,); + } const server = setupResult.server; const spannerMock = setupResult.spannerMock; @@ -305,6 +310,7 @@ describe('EndToEnd', async () => { }); it('runTransactionAsync', async () => { + await database.runTransactionAsync(async transaction => { await transaction!.run('SELECT 1'); }); @@ -327,6 +333,105 @@ describe('EndToEnd', async () => { ); }); + it.only('runTransaction with abort', done => { + let attempts = 0; + let rowCount = 0; + const database = newTestDatabase(); + database.runTransaction(async (err, transaction) => { + assert.ifError(err); + if (!attempts) { + spannerMock.abortTransaction(transaction!); + } + attempts++; + transaction!.run(selectSql, (err, rows) => { + assert.ifError(err); + rows.forEach(() => rowCount++); + transaction! + .commit() + .catch(done) + .then(async () => { + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Database.runTransaction', + ]; + const expectedEventNames = [ + ...waitingSessionsEvents, + 'Retrying Transaction', + 'Starting stream', + 'exception', + 'Stream broken. Not safe to retry', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Starting Commit', + 'Commit Done', + ]; + await verifySpansAndEvents(traceExporter, expectedSpanNames, expectedEventNames) + database + .close() + .catch(done) + .then(() => done()); + }); + }); + }); + }); + + it('runTransactionAsync with abort', async () => { + let attempts = 0; + const database = newTestDatabase(); + await database.runTransactionAsync((transaction): Promise => { + if (!attempts) { + spannerMock.abortTransaction(transaction); + } + attempts++; + return transaction.run(selectSql).then(([rows]) => { + let count = 0; + rows.forEach(() => count++); + return transaction.commit().then(() => count); + }); + }); + assert.strictEqual(attempts, 2); + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransactionAsync', + ]; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'exception', + 'Stream broken. Not safe to retry', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Starting Commit', + 'Commit Done', + ...waitingSessionsEvents, + 'Retrying transaction', + ]; + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames + ); + await database.close(); + }); + it('writeAtLeastOnce', done => { const blankMutations = new MutationSet(); database.writeAtLeastOnce(blankMutations, async (err, response) => { diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 69439f534..e5c577e87 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -24,7 +24,7 @@ import {Readable, Transform} from 'stream'; import * as streamEvents from 'stream-events'; import {grpc, CallOptions} from 'google-gax'; import {DeadlineError, isRetryableInternalError} from './transaction-runner'; - +import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument'; import {codec, JSONOptions, Json, Field, Value} from './codec'; import {google} from '../protos/protos'; import * as stream from 'stream'; @@ -494,6 +494,7 @@ export function partialResultStream( let lastRequestStream: Readable; const startTime = Date.now(); const timeout = options?.gaxOptions?.timeout ?? Infinity; + const span = getActiveOrNoopSpan(); // mergeStream allows multiple streams to be connected into one. This is good; // if we need to retry a request and pipe more data to the user's stream. @@ -568,6 +569,7 @@ export function partialResultStream( // checkpoint stream has queued. After that, we will destroy the // user's stream with the same error. setImmediate(() => batchAndSplitOnTokenStream.destroy(err)); + setSpanErrorAndException(span, err as Error); return; } diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 61d979e8c..32786f20e 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -23,6 +23,7 @@ import {Session} from './session'; import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {isSessionNotFoundError} from './session-pool'; +import {getActiveOrNoopSpan} from './instrument'; import {Database} from './database'; import {google} from '../protos/protos'; import IRequestOptions = google.spanner.v1.IRequestOptions; @@ -238,6 +239,7 @@ export abstract class Runner { this.session.lastError = e as grpc.ServiceError; lastError = e as grpc.ServiceError; } + const span = getActiveOrNoopSpan(); // Note that if the error is a 'Session not found' error, it will be // thrown here. We do this to bubble this error up to the caller who is @@ -250,7 +252,7 @@ export abstract class Runner { } this.attempts += 1; - + span.addEvent('Retrying transaction'); const delay = this.getNextDelay(lastError); await new Promise(resolve => setTimeout(resolve, delay)); } @@ -321,6 +323,7 @@ export class TransactionRunner extends Runner { } stream.unpipe(proxyStream); + // proxyStream.emit('error', err); reject(err); }) .pipe(proxyStream); From 430a33c0d3539de06f2030e1422b4bde6a35cdd0 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Tue, 19 Nov 2024 15:24:28 +0530 Subject: [PATCH 3/4] remove snapshot.run span --- observability-test/spanner.ts | 54 ++++------------ observability-test/transaction.ts | 100 ------------------------------ src/partial-result-stream.ts | 2 +- src/transaction-runner.ts | 1 - src/transaction.ts | 42 +++++-------- 5 files changed, 29 insertions(+), 170 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index ab3f4e26e..ee3a6bbc6 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -145,7 +145,7 @@ describe('EndToEnd', async () => { let dbCounter = 1; function newTestDatabase(): Database { - return instance.database(`database-${dbCounter++}`,); + return instance.database(`database-${dbCounter++}`); } const server = setupResult.server; @@ -200,7 +200,6 @@ describe('EndToEnd', async () => { 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Database.getSnapshot', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', ]; const expectedEventNames = [ 'Begin Transaction', @@ -288,16 +287,15 @@ describe('EndToEnd', async () => { await transaction!.end(); const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Database.runTransaction', ]; const expectedEventNames = [ 'Starting stream', - 'Transaction Creation Done', 'Starting Commit', 'Commit Done', ...cacheSessionEvents, + 'Transaction Creation Done', ]; await verifySpansAndEvents( @@ -310,21 +308,19 @@ describe('EndToEnd', async () => { }); it('runTransactionAsync', async () => { - await database.runTransactionAsync(async transaction => { await transaction!.run('SELECT 1'); }); const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Database.runTransactionAsync', ]; const expectedEventNames = [ 'Starting stream', - 'Transaction Creation Done', ...cacheSessionEvents, 'Using Session', + 'Transaction Creation Done', ]; await verifySpansAndEvents( traceExporter, @@ -333,7 +329,7 @@ describe('EndToEnd', async () => { ); }); - it.only('runTransaction with abort', done => { + it.skip('runTransaction with abort', done => { let attempts = 0; let rowCount = 0; const database = newTestDatabase(); @@ -354,9 +350,7 @@ describe('EndToEnd', async () => { 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Database.runTransaction', @@ -373,17 +367,21 @@ describe('EndToEnd', async () => { 'Starting Commit', 'Commit Done', ]; - await verifySpansAndEvents(traceExporter, expectedSpanNames, expectedEventNames) + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames + ); database .close() .catch(done) .then(() => done()); }); }); - }); + }); }); - it('runTransactionAsync with abort', async () => { + it.skip('runTransactionAsync with abort', async () => { let attempts = 0; const database = newTestDatabase(); await database.runTransactionAsync((transaction): Promise => { @@ -402,10 +400,8 @@ describe('EndToEnd', async () => { 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Database.runTransactionAsync', ]; @@ -476,7 +472,6 @@ describe('EndToEnd', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', 'CloudSpanner.PartitionedDml.runUpdate', 'CloudSpanner.Database.runPartitionedUpdate', @@ -635,7 +630,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedSpanNames = [ 'CloudSpanner.Database.getTransaction', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', ]; assert.deepStrictEqual( actualSpanNames, @@ -690,7 +684,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', ]; assert.deepStrictEqual( @@ -799,7 +792,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', 'CloudSpanner.Transaction.rollback', ]; @@ -1352,7 +1344,6 @@ SELECT 1p 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Transaction.commit', @@ -1364,19 +1355,6 @@ SELECT 1p expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const spanSnapshotRun = spans[3]; - assert.strictEqual(spanSnapshotRun.name, 'CloudSpanner.Snapshot.run'); - const wantSpanErr = '6 ALREADY_EXISTS: ' + messageBadInsertAlreadyExistent; - assert.deepStrictEqual( - spanSnapshotRun.status.code, - SpanStatusCode.ERROR, - 'Unexpected status code' - ); - assert.deepStrictEqual( - spanSnapshotRun.status.message, - wantSpanErr, - 'Unexpexcted error message' - ); const databaseBatchCreateSessionsSpan = spans[0]; assert.strictEqual( @@ -1405,8 +1383,7 @@ SELECT 1p // We need to ensure a strict relationship between the spans. // |-Database.runTransactionAsync |-------------------------------------| - // |-Snapshot.run |------------------------| - // |-Snapshot.runStream |---------------------| + // |-Snapshot.runStream |---------------------| // |-Transaction.commit |--------| // |-Snapshot.begin |------| // |-Snapshot.commit |-----| @@ -1427,12 +1404,6 @@ SELECT 1p 'Expected that Database.runTransaction is the parent to Transaction.commmit' ); - assert.deepStrictEqual( - spanSnapshotRun.parentSpanId, - spanDatabaseRunTransactionAsync.spanContext().spanId, - 'Expected that Database.runTransaction is the parent to Snapshot.run' - ); - // Assert that despite all being exported, SessionPool.createSessions // is not in the same trace as runStream, createSessions is invoked at // Spanner Client instantiation, thus before database.run is invoked. @@ -1953,7 +1924,6 @@ describe('Traces for ExecuteStream broken stream retries', () => { 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Transaction.commit', diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index 20e604d94..2cfe50d31 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -332,106 +332,6 @@ describe('Transaction', () => { }); }); - describe('run', () => { - const QUERY = 'SELET * FROM `MyTable`'; - - let fakeStream; - - beforeEach(() => { - fakeStream = new EventEmitter(); - sandbox.stub(snapshot, 'runStream').returns(fakeStream); - }); - - it('without error', done => { - const fakeRows = [{a: 'b'}, {c: 'd'}, {e: 'f'}]; - - snapshot.run(QUERY, (err, rows) => { - assert.ifError(err); - assert.deepStrictEqual(rows, fakeRows); - - const exportResults = extractExportedSpans(); - const actualSpanNames = exportResults.spanNames; - const actualEventNames = exportResults.spanEventNames; - - const expectedSpanNames = ['CloudSpanner.Snapshot.run']; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = []; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); - - // Ensure that the final span that got retries did not error. - const spans = exportResults.spans; - const firstSpan = spans[0]; - assert.strictEqual( - SpanStatusCode.UNSET, - firstSpan.status.code, - 'Unexpected an span status code' - ); - assert.strictEqual( - undefined, - firstSpan.status.message, - 'Unexpected span status message' - ); - done(); - }); - - fakeRows.forEach(row => fakeStream.emit('data', row)); - fakeStream.emit('end'); - }); - - it('with errors', done => { - const fakeError = new Error('run.error'); - - snapshot.run(QUERY, err => { - assert.strictEqual(err, fakeError); - - const exportResults = extractExportedSpans(); - const actualSpanNames = exportResults.spanNames; - const actualEventNames = exportResults.spanEventNames; - - const expectedSpanNames = ['CloudSpanner.Snapshot.run']; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = []; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); - - // Ensure that the final span that got retries did not error. - const spans = exportResults.spans; - const firstSpan = spans[0]; - assert.strictEqual( - SpanStatusCode.ERROR, - firstSpan.status.code, - 'Unexpected an span status code' - ); - assert.strictEqual( - 'run.error', - firstSpan.status.message, - 'Unexpected span status message' - ); - - done(); - }); - - fakeStream.emit('error', fakeError); - }); - }); - describe('runStream', () => { const QUERY = { sql: 'SELECT * FROM `MyTable`', diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index e5c577e87..8e42addcb 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -569,7 +569,7 @@ export function partialResultStream( // checkpoint stream has queued. After that, we will destroy the // user's stream with the same error. setImmediate(() => batchAndSplitOnTokenStream.destroy(err)); - setSpanErrorAndException(span, err as Error); + // setSpanErrorAndException(span, err as Error); return; } diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 32786f20e..783145906 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -323,7 +323,6 @@ export class TransactionRunner extends Runner { } stream.unpipe(proxyStream); - // proxyStream.emit('error', err); reject(err); }) .pipe(proxyStream); diff --git a/src/transaction.ts b/src/transaction.ts index fa1f10814..387376103 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1093,33 +1093,23 @@ export class Snapshot extends EventEmitter { let stats: google.spanner.v1.ResultSetStats; let metadata: google.spanner.v1.ResultSetMetadata; - const traceConfig = { - sql: query, - opts: this._observabilityOptions, - dbName: this._dbName!, - }; - startTrace('Snapshot.run', traceConfig, span => { - return this.runStream(query) - .on('error', (err, rows, stats, metadata) => { - setSpanError(span, err); - span.end(); - callback!(err, rows, stats, metadata); - }) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - if (metadata.transaction && !this.id) { - this._update(metadata.transaction); - } + this.runStream(query) + .on('error', (err, rows, stats, metadata) => { + callback!(err, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); } - }) - .on('data', row => rows.push(row)) - .on('stats', _stats => (stats = _stats)) - .on('end', () => { - span.end(); - callback!(null, rows, stats, metadata); - }); - }); + } + }) + .on('data', row => rows.push(row)) + .on('stats', _stats => (stats = _stats)) + .on('end', () => { + callback!(null, rows, stats, metadata); + }); } /** From f5d21a3f205a53c41aa7a108a7a1bc0d72084408 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Thu, 21 Nov 2024 21:21:24 +0530 Subject: [PATCH 4/4] Added span in partialresultStream --- observability-test/database.ts | 4 ++-- observability-test/helper.ts | 16 ++++++++++++++-- observability-test/spanner.ts | 34 +++++++++------------------------- src/database.ts | 3 ++- src/instrument.ts | 12 ++++++++++++ src/partial-result-stream.ts | 17 ++++++++--------- src/transaction-runner.ts | 5 ++++- src/transaction.ts | 5 +++++ test/transaction.ts | 4 ++-- 9 files changed, 58 insertions(+), 42 deletions(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index 39ebe9afc..7df1ac6fc 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -682,7 +682,7 @@ describe('Database', () => { 'Expected that secondRetrySpan is the child to parentSpan' ); - const expectedEventNames = ['No session available']; + const expectedEventNames = ['No session available', 'Using Session']; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -1558,7 +1558,7 @@ describe('Database', () => { ); // We don't expect events. - const expectedEventNames = []; + const expectedEventNames = ['Using Session']; assert.deepStrictEqual( actualEventNames, expectedEventNames, diff --git a/observability-test/helper.ts b/observability-test/helper.ts index 591171666..58a1d13d4 100644 --- a/observability-test/helper.ts +++ b/observability-test/helper.ts @@ -36,6 +36,7 @@ export const cacheSessionEvents = [ 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', + 'Using Session', ]; /** @@ -82,14 +83,25 @@ export async function verifySpansAndEvents( actualEventNames.push(event.name); }); }); + + assert.strictEqual( + actualSpanNames.length, + expectedSpans.length, + `Span count mismatch: Expected ${expectedSpans.length} spans, but received ${actualSpanNames.length} spans` + ); assert.deepStrictEqual( actualSpanNames, expectedSpans, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpans}` ); + assert.strictEqual( + actualEventNames.length, + expectedEvents.length, + `Event count mismatch: Expected ${expectedEvents.length} events, but received ${actualEventNames.length} events` + ); assert.deepStrictEqual( - actualEventNames, - expectedEvents, + actualEventNames.sort(), + expectedEvents.sort(), `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEvents}` ); } diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index ee3a6bbc6..aea657e84 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -225,7 +225,7 @@ describe('EndToEnd', async () => { transaction!.commit(); const expectedSpanNames = ['CloudSpanner.Database.getTransaction']; - const expectedEventNames = [...cacheSessionEvents, 'Using Session']; + const expectedEventNames = [...cacheSessionEvents]; await verifySpansAndEvents( traceExporter, expectedSpanNames, @@ -245,11 +245,7 @@ describe('EndToEnd', async () => { 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', ]; - const expectedEventNames = [ - 'Starting stream', - ...cacheSessionEvents, - 'Using Session', - ]; + const expectedEventNames = ['Starting stream', ...cacheSessionEvents]; await verifySpansAndEvents( traceExporter, expectedSpanNames, @@ -267,11 +263,7 @@ describe('EndToEnd', async () => { 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; - const expectedEventNames = [ - 'Starting stream', - ...cacheSessionEvents, - 'Using Session', - ]; + const expectedEventNames = ['Starting stream', ...cacheSessionEvents]; await verifySpansAndEvents( traceExporter, expectedSpanNames, @@ -319,7 +311,6 @@ describe('EndToEnd', async () => { const expectedEventNames = [ 'Starting stream', ...cacheSessionEvents, - 'Using Session', 'Transaction Creation Done', ]; await verifySpansAndEvents( @@ -350,22 +341,21 @@ describe('EndToEnd', async () => { 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Transaction.commit', - 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Database.runTransaction', ]; const expectedEventNames = [ - ...waitingSessionsEvents, - 'Retrying Transaction', + ...batchCreateSessionsEvents, 'Starting stream', - 'exception', - 'Stream broken. Not safe to retry', 'Begin Transaction', 'Transaction Creation Done', 'Starting stream', 'Starting Commit', 'Commit Done', + ...waitingSessionsEvents, + 'Retrying transaction', ]; await verifySpansAndEvents( traceExporter, @@ -381,7 +371,7 @@ describe('EndToEnd', async () => { }); }); - it.skip('runTransactionAsync with abort', async () => { + it('runTransactionAsync with abort', async () => { let attempts = 0; const database = newTestDatabase(); await database.runTransactionAsync((transaction): Promise => { @@ -406,11 +396,8 @@ describe('EndToEnd', async () => { 'CloudSpanner.Database.runTransactionAsync', ]; const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', - 'exception', 'Stream broken. Not safe to retry', 'Begin Transaction', 'Transaction Creation Done', @@ -441,7 +428,6 @@ describe('EndToEnd', async () => { 'Starting Commit', 'Commit Done', ...cacheSessionEvents, - 'Using Session', ]; await verifySpansAndEvents( traceExporter, @@ -639,7 +625,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ ...cacheSessionEvents, - 'Using Session', 'Starting stream', 'Transaction Creation Done', ]; @@ -747,7 +732,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ ...cacheSessionEvents, - 'Using Session', 'Starting stream', ]; assert.deepStrictEqual( diff --git a/src/database.ts b/src/database.ts index a81c5cfd5..6f0468b93 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2110,7 +2110,6 @@ class Database extends common.GrpcServiceObject { span.end(); this.getSnapshot(options, callback!); } else { - span.addEvent('Using Session', {'session.id': session?.id}); this.pool_.release(session!); span.end(); callback!(err); @@ -2118,6 +2117,7 @@ class Database extends common.GrpcServiceObject { return; } + span.addEvent('Using Session', {'session.id': session?.id}); this._releaseOnEnd(session!, snapshot, span); span.end(); callback!(err, snapshot); @@ -3244,6 +3244,7 @@ class Database extends common.GrpcServiceObject { return; } + span.addEvent('Using Session', {'session.id': session?.id}); transaction!._observabilityOptions = this._observabilityOptions; if (options.optimisticLock) { transaction!.useOptimisticLock(); diff --git a/src/instrument.ts b/src/instrument.ts index 8ad123bf6..994de20d7 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -200,6 +200,18 @@ export function setSpanError(span: Span, err: Error | String): boolean { return true; } +/** + * Sets the span status with err and end, if non-null onto the span with + * status.code=ERROR and the message of err.toString() + * + * @returns {boolean} to signify if the status was set. + */ +export function setSpanErrorAndEnd(span: Span, err: Error | String): boolean { + const status = setSpanError(span, err); + span.end(); + return status; +} + /** * Sets err, if non-null onto the span with * status.code=ERROR and the message of err.toString() diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 8e42addcb..340d171e6 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -24,7 +24,7 @@ import {Readable, Transform} from 'stream'; import * as streamEvents from 'stream-events'; import {grpc, CallOptions} from 'google-gax'; import {DeadlineError, isRetryableInternalError} from './transaction-runner'; -import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument'; +import {Span} from './instrument'; import {codec, JSONOptions, Json, Field, Value} from './codec'; import {google} from '../protos/protos'; import * as stream from 'stream'; @@ -97,6 +97,7 @@ export interface RowOptions { */ columnsMetadata?: object; gaxOptions?: CallOptions; + span?: Span; } /** @@ -183,16 +184,16 @@ interface ResultEvents { export class PartialResultStream extends Transform implements ResultEvents { private _destroyed: boolean; private _fields!: google.spanner.v1.StructType.Field[]; - private _options: RowOptions; private _pendingValue?: p.IValue; private _pendingValueForResume?: p.IValue; private _values: p.IValue[]; private _numPushFailed = 0; + options: RowOptions; constructor(options = {}) { super({objectMode: true}); this._destroyed = false; - this._options = Object.assign({maxResumeRetries: 20}, options); + this.options = Object.assign({maxResumeRetries: 20}, options); this._values = []; } /** @@ -271,7 +272,7 @@ export class PartialResultStream extends Transform implements ResultEvents { // Downstream returned false indicating that it is still not ready for // more data. this._numPushFailed++; - if (this._numPushFailed === this._options.maxResumeRetries) { + if (this._numPushFailed === this.options.maxResumeRetries) { this.destroy( new Error( `Stream is still not ready to receive data after ${this._numPushFailed} attempts to resume.` @@ -359,8 +360,8 @@ export class PartialResultStream extends Transform implements ResultEvents { const row: Row = this._createRow(values); - if (this._options.json) { - return this.push(row.toJSON(this._options.jsonOptions)); + if (this.options.json) { + return this.push(row.toJSON(this.options.jsonOptions)); } return this.push(row); @@ -376,7 +377,7 @@ export class PartialResultStream extends Transform implements ResultEvents { private _createRow(values: Value[]): Row { const fields = values.map((value, index) => { const {name, type} = this._fields[index]; - const columnMetadata = this._options.columnsMetadata?.[name]; + const columnMetadata = this.options.columnsMetadata?.[name]; return { name, value: codec.decode( @@ -494,7 +495,6 @@ export function partialResultStream( let lastRequestStream: Readable; const startTime = Date.now(); const timeout = options?.gaxOptions?.timeout ?? Infinity; - const span = getActiveOrNoopSpan(); // mergeStream allows multiple streams to be connected into one. This is good; // if we need to retry a request and pipe more data to the user's stream. @@ -569,7 +569,6 @@ export function partialResultStream( // checkpoint stream has queued. After that, we will destroy the // user's stream with the same error. setImmediate(() => batchAndSplitOnTokenStream.destroy(err)); - // setSpanErrorAndException(span, err as Error); return; } diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 783145906..9f1d01328 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -23,7 +23,7 @@ import {Session} from './session'; import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {isSessionNotFoundError} from './session-pool'; -import {getActiveOrNoopSpan} from './instrument'; +import {getActiveOrNoopSpan, setSpanErrorAndEnd} from './instrument'; import {Database} from './database'; import {google} from '../protos/protos'; import IRequestOptions = google.spanner.v1.IRequestOptions; @@ -314,9 +314,12 @@ export class TransactionRunner extends Runner { transaction.requestStream = (config: object) => { const proxyStream = through.obj(); const stream = requestStream(config); + const resultStream = transaction.resultStream; stream .on('error', (err: grpc.ServiceError) => { + resultStream?.options.span && + setSpanErrorAndEnd(resultStream?.options.span, err); if (!this.shouldRetry(err)) { proxyStream.destroy(err); return; diff --git a/src/transaction.ts b/src/transaction.ts index 387376103..3f5e115c2 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -287,6 +287,7 @@ export class Snapshot extends EventEmitter { readTimestampProto?: spannerClient.protobuf.ITimestamp; request: (config: {}, callback: Function) => void; requestStream: (config: {}) => Readable; + resultStream?: PartialResultStream; session: Session; queryOptions?: IQueryOptions; resourceHeader_: {[k: string]: string}; @@ -751,6 +752,7 @@ export class Snapshot extends EventEmitter { maxResumeRetries, columnsMetadata, gaxOptions, + span, } ) ?.on('response', response => { @@ -789,6 +791,7 @@ export class Snapshot extends EventEmitter { }); } + this.resultStream = resultStream; return resultStream; }); } @@ -1332,6 +1335,7 @@ export class Snapshot extends EventEmitter { maxResumeRetries, columnsMetadata, gaxOptions, + span, } ) .on('response', response => { @@ -1371,6 +1375,7 @@ export class Snapshot extends EventEmitter { }); } + this.resultStream = resultStream; return resultStream; }); } diff --git a/test/transaction.ts b/test/transaction.ts index 28be1543f..7f91b3733 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -410,7 +410,7 @@ describe('Transaction', () => { assert.strictEqual(reqOpts.jsonOptions, undefined); assert.strictEqual(reqOpts.maxResumeRetries, undefined); - const options = PARTIAL_RESULT_STREAM.lastCall.args[1]; + const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1]; assert.deepStrictEqual(options, fakeOptions); }); @@ -791,7 +791,7 @@ describe('Transaction', () => { assert.strictEqual(reqOpts.jsonOptions, undefined); assert.strictEqual(reqOpts.maxResumeRetries, undefined); - const options = PARTIAL_RESULT_STREAM.lastCall.args[1]; + const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1]; assert.deepStrictEqual(options, expectedOptions); });