Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use Storage Read API for faster data fetching #1368

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
import {GoogleErrorBody} from '@google-cloud/common/build/src/util';
import bigquery from './types';
import {logger, setLogFunction} from './logger';
import {MergeSchemaTransform, StorageReadClient} from './storage';

// Third-Party Re-exports
export {common};
Expand Down Expand Up @@ -247,6 +248,11 @@ export interface BigQueryOptions extends GoogleAuthOptions {
* Defaults to `googleapis.com`.
*/
universeDomain?: string;

/**
* Storage Reader
*/
storageReadClient?: StorageReadClient;
}

export interface IntegerTypeCastOptions {
Expand Down Expand Up @@ -339,6 +345,7 @@ export class BigQuery extends Service {
location?: string;
private _universeDomain: string;
private _enableQueryPreview: boolean;
_storageClient?: StorageReadClient;

createQueryStream(options?: Query | string): ResourceStream<RowMetadata> {
// placeholder body, overwritten in constructor
Expand Down Expand Up @@ -404,6 +411,10 @@ export class BigQuery extends Service {
}
}

if (options.storageReadClient) {
this._storageClient = options.storageReadClient;
}

this._universeDomain = universeDomain;
this.location = options.location;
/**
Expand Down Expand Up @@ -2154,7 +2165,7 @@ export class BigQuery extends Service {
const queryReq = this.buildQueryRequest_(query, options);
this.trace_('[query] queryReq', queryReq);
if (!queryReq) {
this.createQueryJob(query, (err, job, resp) => {
this.createQueryJob(query, async (err, job, resp) => {
if (err) {
(callback as SimpleQueryRowsCallback)(err, null, resp);
return;
Expand All @@ -2171,7 +2182,7 @@ export class BigQuery extends Service {
return;
}

this.runJobsQuery(queryReq, (err, job, res) => {
this.runJobsQuery(queryReq, async (err, job, res) => {
this.trace_('[runJobsQuery callback]: ', query, err, job, res);
if (err) {
(callback as SimpleQueryRowsCallback)(err, null, res);
Expand Down Expand Up @@ -2205,6 +2216,69 @@ export class BigQuery extends Service {
});
}

async acceleratedFetchDataFromJob_(
job: Job,
opts: QueryOptions,
callback: QueryRowsCallback,
schema?: bigquery.ITableSchema
): Promise<void> {
if (!this._storageClient) {
return Promise.reject('storage client not available');
}
console.time('fetchMetadata');
const [metadata] = (await job.getMetadata()) as bigquery.IJob[];
this.trace_('[job metadata]', metadata.configuration?.query);
const qconfig = metadata.configuration?.query;
if (!qconfig) {
return Promise.reject('job is not a query type');
}
const dstTableRef = qconfig.destinationTable!;
const table = this.dataset(dstTableRef.datasetId!, {
projectId: dstTableRef.projectId,
}).table(dstTableRef.tableId!);
let tableSchema = schema;
if (!tableSchema) {
const [md] = (await table.getMetadata({
view: 'BASIC',
})) as bigquery.ITable[];
tableSchema = md.schema;
}
console.timeEnd('fetchMetadata');

const tableReader = await this._storageClient.createTableReader({
table: dstTableRef,
});
console.time('fetchStorageAPI');
this.trace_('fetching with Storage API');
if (opts._asStream) {
const stream = await tableReader.getRowStream();
const rowStream = stream.pipe(
new MergeSchemaTransform(tableSchema!)
) as ResourceStream<any>;
rowStream.on('data', data => {
callback(null, [data], {
_streaming: true,
} as any);
});
rowStream.on('end', () => {
callback(null, [], null);
});
console.timeEnd('fetchStorageAPI');
return;
}
const [rawRows] = await tableReader.getRows();
console.timeEnd('fetchStorageAPI');
this.trace_('finished fetching with Storage API');
console.time('mergeSchemaStorage');
const rows = BigQuery.mergeSchemaWithRows_(tableSchema!, rawRows, {
wrapIntegers: opts.wrapIntegers || false,
parseJSON: opts.parseJSON || false,
});

console.timeEnd('mergeSchemaStorage');
callback(null, rows, null);
}

/**
* Check if the given Query can run using the `jobs.query` endpoint.
* Returns a bigquery.IQueryRequest that can be used to call `jobs.query`.
Expand Down Expand Up @@ -2314,8 +2388,10 @@ export class BigQuery extends Service {
job = this.job(jobRef.jobId!, {
location: jobRef.location,
});
job.metadata = res;
} else if (res.queryId) {
job = this.job(res.queryId); // stateless query
job.metadata = res;
}
callback!(null, job, res);
}
Expand All @@ -2329,8 +2405,18 @@ export class BigQuery extends Service {
* @private
*/
queryAsStream_(query: Query, callback?: SimpleQueryRowsCallback) {
this.trace_('queryAsStream_');
if ((query as QueryResultsOptions)._streaming) {
return;
}
if (query.job) {
query.job.getQueryResults(query, callback as QueryRowsCallback);
query.job.getQueryResults(
{
...query,
_asStream: true,
},
callback as QueryRowsCallback
);
return;
}

Expand All @@ -2343,6 +2429,7 @@ export class BigQuery extends Service {
wrapIntegers,
parseJSON,
autoPaginate: false,
_asStream: true,
};

delete query.location;
Expand Down
66 changes: 64 additions & 2 deletions src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export type QueryResultsOptions = {
*/
_cachedRows?: any[];
_cachedResponse?: bigquery.IQueryResponse;
_asStream?: boolean;
_streaming?: boolean;
};

/**
Expand Down Expand Up @@ -453,6 +455,49 @@ class Job extends Operation {
);
}

async wait(): Promise<void> {
let metadata: JobMetadata | undefined = this.metadata;
if (!metadata) {
[metadata] = await this.getMetadata();
}
const jobState = metadata?.status?.state;
this.trace_('wait status', jobState);
if (jobState === 'DONE') {
return Promise.resolve();
}
const isQuery = !!metadata?.configuration?.query;
if (isQuery) {
const stream = this.getQueryResultsStream({
maxResults: 0,
});
return new Promise((resolve, reject) => {
stream.on('data', data => {
this.trace_('data arrived on wait', data);
});
stream.on('error', err => {
this.trace_('error on wait', err);
reject(err);
});
stream.on('end', () => {
this.trace_('end on wait');
resolve();
});
});
}
while (true) {
let newMetadata: JobMetadata | undefined;
[newMetadata] = await this.getMetadata();
const newJobState = metadata?.status?.state;
if (newJobState === 'DONE') {
return Promise.resolve();
}
// TODO: exponential backoff
await new Promise(resolve => {
setTimeout(resolve, 100);
});
}
}

/**
* Get the results of a job.
*
Expand Down Expand Up @@ -565,6 +610,12 @@ class Job extends Operation {
const timeoutOverride =
typeof qs.timeoutMs === 'number' ? qs.timeoutMs : false;

if (this.metadata.jobComplete && this.bigQuery._storageClient) {
this.trace_('job complete and storage available');
this.bigQuery.acceleratedFetchDataFromJob_(this, options, callback!);
return;
}

if (options._cachedRows) {
let nextQuery: QueryResultsOptions | null = null;
if (options.pageToken) {
Expand Down Expand Up @@ -636,7 +687,16 @@ class Job extends Operation {
options: QueryResultsOptions,
callback: QueryRowsCallback
): void {
options = extend({autoPaginate: false}, options);
if (options._streaming) {
return;
}
options = extend(
{
autoPaginate: false,
_asStream: true,
},
options
);
this.getQueryResults(options, callback);
}

Expand Down Expand Up @@ -683,7 +743,9 @@ paginator.extend(Job, ['getQueryResults']);
* All async methods (except for streams) will return a Promise in the event
* that a callback is omitted.
*/
promisifyAll(Job);
promisifyAll(Job, {
exclude: ['wait'],
});

/**
* Reference to the {@link Job} class.
Expand Down
52 changes: 52 additions & 0 deletions src/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {ResourceStream} from '@google-cloud/paginator';
import {Transform} from 'stream';
import {BigQuery, QueryRowsResponse, RowMetadata} from '.';
import bigquery from './types';

// Minimal interface for a BigQuery Storage Read API client
// that can read data from tables.
export interface StorageReadClient {
createTableReader(req: {
table: bigquery.ITableReference;
}): Promise<TableReader>;
}

// Interface for fetching data from a BigQuery table using
// the BigQuery Storage Read API.
export interface TableReader {
getRows(): Promise<QueryRowsResponse>;
getRowStream(): Promise<ResourceStream<RowMetadata>>;
}

export class MergeSchemaTransform extends Transform {
constructor(schema: bigquery.ITableSchema) {
super({
objectMode: true,
transform(row, _, callback) {
const rows = BigQuery.mergeSchemaWithRows_(schema, [row], {
wrapIntegers: false,
parseJSON: false,
});
if (rows.length === 0) {
callback(new Error('failed to convert row'), null);
return;
}
callback(null, rows[0]);
},
});
}
}