Skip to content

Commit

Permalink
feat: (observability): trace Database.runPartitionedUpdate (#2176)
Browse files Browse the repository at this point in the history
This change traces Database.runPartitionedUpdate along with the appropriate tests for it with and without errors.

Updates #2079
  • Loading branch information
odeke-em authored Oct 28, 2024
1 parent cbc86fa commit 701e226
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 7 deletions.
221 changes: 220 additions & 1 deletion observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ const {
InMemorySpanExporter,
} = require('@opentelemetry/sdk-trace-node');
// eslint-disable-next-line n/no-extraneous-require
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');
const {
ReadableSpan,
SimpleSpanProcessor,
} = require('@opentelemetry/sdk-trace-base');
import * as db from '../src/database';
import {Instance, MutationGroup, Spanner} from '../src';
import * as pfy from '@google-cloud/promisify';
Expand Down Expand Up @@ -1954,4 +1957,220 @@ describe('Database', () => {
fakeStream2.push(null);
});
});

describe('runPartitionedUpdate', () => {
const QUERY = {
sql: 'INSERT INTO `MyTable` (Key, Thing) VALUES(@key, @thing)',
params: {
key: 'k999',
thing: 'abc',
},
};

let fakePool: FakeSessionPool;
let fakeSession: FakeSession;
let fakePartitionedDml = new FakeTransaction(
{} as google.spanner.v1.TransactionOptions.PartitionedDml
);

let getSessionStub;
let beginStub;
let runUpdateStub;

beforeEach(() => {
fakePool = database.pool_;
fakeSession = new FakeSession();
fakePartitionedDml = new FakeTransaction(
{} as google.spanner.v1.TransactionOptions.PartitionedDml
);

getSessionStub = (
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
).callsFake(callback => {
callback(null, fakeSession);
});

sandbox.stub(fakeSession, 'partitionedDml').returns(fakePartitionedDml);

beginStub = (
sandbox.stub(fakePartitionedDml, 'begin') as sinon.SinonStub
).callsFake(callback => callback(null));

runUpdateStub = (
sandbox.stub(fakePartitionedDml, 'runUpdate') as sinon.SinonStub
).callsFake((_, callback) => callback(null));
});

interface traceExportResults {
spanNames: string[];
spans: (typeof ReadableSpan)[];
eventNames: string[];
}

async function getTraceExportResults(): Promise<traceExportResults> {
await provider.forceFlush();
await traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

return Promise.resolve({
spanNames: actualSpanNames,
spans: spans,
eventNames: actualEventNames,
});
}

it('with pool errors', done => {
const fakeError = new Error('err');
const fakeCallback = sandbox.spy();

getSessionStub.callsFake(callback => callback(fakeError));
database.runPartitionedUpdate(QUERY, async (err, rowCount) => {
assert.strictEqual(err, fakeError);
assert.strictEqual(rowCount, 0);

const exportResults = await getTraceExportResults();
const actualSpanNames = exportResults.spanNames;
const spans = exportResults.spans;
const actualEventNames = exportResults.eventNames;

const expectedSpanNames = [
'CloudSpanner.Database.runPartitionedUpdate',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the first span actually produced an error that was recorded.
const parentSpan = spans[0];
assert.deepStrictEqual(
SpanStatusCode.ERROR,
parentSpan.status.code,
'Expected an ERROR span status'
);
assert.deepStrictEqual(
fakeError.message,
parentSpan.status.message.toString(),
'Mismatched span status message'
);

const expectedEventNames = [];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);

done();
});
});

it('with begin errors', done => {
const fakeError = new Error('err');

beginStub.callsFake(callback => callback(fakeError));

const releaseStub = (
sandbox.stub(fakePool, 'release') as sinon.SinonStub
).withArgs(fakeSession);

database.runPartitionedUpdate(QUERY, async (err, rowCount) => {
assert.strictEqual(err, fakeError);
assert.strictEqual(rowCount, 0);
assert.strictEqual(releaseStub.callCount, 1);

const exportResults = await getTraceExportResults();
const actualSpanNames = exportResults.spanNames;
const spans = exportResults.spans;
const actualEventNames = exportResults.eventNames;

const expectedSpanNames = [
'CloudSpanner.Database.runPartitionedUpdate',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the first span actually produced an error that was recorded.
const parentSpan = spans[0];
assert.deepStrictEqual(
SpanStatusCode.ERROR,
parentSpan.status.code,
'Expected an ERROR span status'
);
assert.deepStrictEqual(
fakeError.message,
parentSpan.status.message.toString(),
'Mismatched span status message'
);

const expectedEventNames = [];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
done();
});
});

it('session released on transaction end', done => {
const releaseStub = (
sandbox.stub(fakePool, 'release') as sinon.SinonStub
).withArgs(fakeSession);

database.runPartitionedUpdate(QUERY, async (err, rowCount) => {
const exportResults = await getTraceExportResults();
const actualSpanNames = exportResults.spanNames;
const spans = exportResults.spans;
const actualEventNames = exportResults.eventNames;

const expectedSpanNames = [
'CloudSpanner.Database.runPartitionedUpdate',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the first span actually produced an error that was recorded.
const parentSpan = spans[0];
assert.deepStrictEqual(
SpanStatusCode.UNSET,
parentSpan.status.code,
'Unexpected span status'
);
assert.deepStrictEqual(
undefined,
parentSpan.status.message,
'Mismatched span status message'
);

const expectedEventNames = [];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
done();
});

fakePartitionedDml.emit('end');
assert.strictEqual(releaseStub.callCount, 1);
});
});
});
47 changes: 47 additions & 0 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,53 @@ describe('EndToEnd', async () => {
done();
});
});

it('runPartitionedUpdate', async () => {
const [rowCount] = await database.runPartitionedUpdate({
sql: updateSql,
});

await tracerProvider.forceFlush();
await traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();

const actualEventNames: string[] = [];
const actualSpanNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Snapshot.begin',
'CloudSpanner.Snapshot.runStream',
'CloudSpanner.Snapshot.run',
'CloudSpanner.Dml.runUpdate',
'CloudSpanner.PartitionedDml.runUpdate',
'CloudSpanner.Database.runPartitionedUpdate',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

const expectedEventNames = [
'Begin Transaction',
'Transaction Creation Done',
'Starting stream',
'Acquiring session',
'Cache hit: has usable session',
'Acquired session',
];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
});
});
});

Expand Down
26 changes: 20 additions & 6 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2858,13 +2858,27 @@ class Database extends common.GrpcServiceObject {
query: string | RunPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void | Promise<[number]> {
this.pool_.getSession((err, session) => {
if (err) {
callback!(err as ServiceError, 0);
return;
}
const traceConfig = {
sql: query,
...this._traceConfig,
};
return startTrace('Database.runPartitionedUpdate', traceConfig, span => {
this.pool_.getSession((err, session) => {
if (err) {
setSpanError(span, err);
span.end();
callback!(err as ServiceError, 0);
return;
}

this._runPartitionedUpdate(session!, query, callback);
this._runPartitionedUpdate(session!, query, (err, count) => {
if (err) {
setSpanError(span, err);
}
span.end();
callback!(err, count);
});
});
});
}

Expand Down

0 comments on commit 701e226

Please sign in to comment.