From 4598bc3ebf7f0d72b1143afb9931a2dd8e208506 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 31 May 2024 11:16:12 -0400 Subject: [PATCH 1/4] feat: use Storage Read API for faster data fetching --- src/bigquery.ts | 80 +++++++++++++++++++++++++++++++++++++++++++++++-- src/storage.ts | 52 ++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 src/storage.ts diff --git a/src/bigquery.ts b/src/bigquery.ts index 9a4b8fff..040208ff 100644 --- a/src/bigquery.ts +++ b/src/bigquery.ts @@ -46,6 +46,7 @@ import { import {GoogleErrorBody} from '@google-cloud/common/build/src/util'; import bigquery from './types'; import {logger, setLogFunction} from './logger'; +import {StorageReadClient} from './storage'; // Third-Party Re-exports export {common}; @@ -243,6 +244,11 @@ export interface BigQueryOptions extends GoogleAuthOptions { * Defaults to `googleapis.com`. */ universeDomain?: string; + + /** + * Storage Reader + */ + storageReadClient?: StorageReadClient; } export interface IntegerTypeCastOptions { @@ -335,6 +341,7 @@ export class BigQuery extends Service { location?: string; private _universeDomain: string; private _enableQueryPreview: boolean; + private _storageClient?: StorageReadClient; createQueryStream(options?: Query | string): ResourceStream { // placeholder body, overwritten in constructor @@ -400,6 +407,10 @@ export class BigQuery extends Service { } } + if (options.storageReadClient) { + this._storageClient = options.storageReadClient; + } + this._universeDomain = universeDomain; this.location = options.location; /** @@ -2150,7 +2161,7 @@ export class BigQuery extends Service { const queryReq = this.buildQueryRequest_(query, options); this.trace_('[query] queryReq', queryReq); if (!queryReq) { - this.createQueryJob(query, (err, job, resp) => { + this.createQueryJob(query, async (err, job, resp) => { if (err) { (callback as SimpleQueryRowsCallback)(err, null, resp); return; @@ -2159,6 +2170,15 @@ export class BigQuery extends Service { (callback as SimpleQueryRowsCallback)(null, [], resp); return; } + if (job && this._storageClient) { + try { + const rows = await this.acceleratedFetchDataFromJob_(job, options); + (callback as QueryRowsCallback)(null, rows, null); + return; + } catch (err) { + this.trace_('failed to fetch using Storage Reader', err); + } + } // The Job is important for the `queryAsStream_` method, so a new query // isn't created each time results are polled for. options = extend({job}, queryOpts, options); @@ -2167,7 +2187,7 @@ export class BigQuery extends Service { return; } - this.runJobsQuery(queryReq, (err, job, res) => { + this.runJobsQuery(queryReq, async (err, job, res) => { this.trace_('[runJobsQuery callback]: ', query, err, job, res); if (err) { (callback as SimpleQueryRowsCallback)(err, null, res); @@ -2176,6 +2196,17 @@ export class BigQuery extends Service { options = extend({job}, queryOpts, options); if (res && res.jobComplete) { + if (job && res.pageToken && this._storageClient) { + try { + const rows = await this.acceleratedFetchDataFromJob_(job, options, res.schema); + (callback as QueryRowsCallback)(null, rows, null); + return; + } catch (err) { + console.log('failed to fetch using storage reader', err); + this.trace_('failed to fetch using Storage Reader', err); + } + } + let rows: any = []; if (res.schema && res.rows) { rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, { @@ -2200,6 +2231,51 @@ export class BigQuery extends Service { }); } + private async acceleratedFetchDataFromJob_( + job: Job, + opts: QueryOptions, + schema?: bigquery.ITableSchema, + ): Promise { + if (!this._storageClient) { + return Promise.reject('storage client not available'); + } + console.time('fetchMetadata'); + const [metadata] = (await job.getMetadata()) as bigquery.IJob[]; + this.trace_('[job metadata]', metadata.configuration?.query); + const qconfig = metadata.configuration?.query; + if (!qconfig) { + return Promise.reject('job is not a query type'); + } + const dstTableRef = qconfig.destinationTable!; + const table = this.dataset(dstTableRef.datasetId!, { + projectId: dstTableRef.projectId, + }).table(dstTableRef.tableId!); + let tableSchema = schema; + if (!tableSchema) { + const [md] = (await table.getMetadata({ + view: 'BASIC', + })) as bigquery.ITable[]; + tableSchema = md.schema; + } + console.timeEnd('fetchMetadata'); + + const tableReader = await this._storageClient.createTableReader({ + table: dstTableRef, + }); + console.time('fetchStorageAPI'); + this.trace_('fetching with Storage API'); + const [rawRows] = await tableReader.getRows(); + this.trace_('finished fetching with Storage API'); + console.timeEnd('fetchStorageAPI'); + console.time('mergeSchemaStorage'); + const rows = BigQuery.mergeSchemaWithRows_(tableSchema!, rawRows, { + wrapIntegers: opts.wrapIntegers || false, + parseJSON: opts.parseJSON || false, + }); + console.timeEnd('mergeSchemaStorage'); + return rows; + } + /** * Check if the given Query can run using the `jobs.query` endpoint. * Returns a bigquery.IQueryRequest that can be used to call `jobs.query`. diff --git a/src/storage.ts b/src/storage.ts new file mode 100644 index 00000000..294dd46b --- /dev/null +++ b/src/storage.ts @@ -0,0 +1,52 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; +import {Transform} from 'stream'; +import {BigQuery, QueryRowsResponse, RowMetadata} from '.'; +import bigquery from './types'; + +// Minimal interface for a BigQuery Storage Read API client +// that can read data from tables. +export interface StorageReadClient { + createTableReader(req: { + table: bigquery.ITableReference; + }): Promise; +} + +// Interface for fetching data from a BigQuery table using +// the BigQuery Storage Read API. +export interface TableReader { + getRows(): Promise; + getRowsAsStream(): Promise>; +} + +export class MergeSchemaTransform extends Transform { + constructor(schema: bigquery.ITableSchema) { + super({ + objectMode: true, + transform(row, _, callback) { + const rows = BigQuery.mergeSchemaWithRows_(schema, [row], { + wrapIntegers: false, + parseJSON: false, + }); + if (rows.length == 0) { + callback(new Error('failed to convert row'), null); + return; + } + callback(null, rows[0]); + }, + }); + } +} From 652e80245d1622ec19711ea939f5414fb9333a69 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 31 May 2024 11:33:52 -0400 Subject: [PATCH 2/4] fix: lint issues --- src/bigquery.ts | 10 +++++++--- src/storage.ts | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/bigquery.ts b/src/bigquery.ts index 040208ff..907d387c 100644 --- a/src/bigquery.ts +++ b/src/bigquery.ts @@ -2198,7 +2198,11 @@ export class BigQuery extends Service { if (res && res.jobComplete) { if (job && res.pageToken && this._storageClient) { try { - const rows = await this.acceleratedFetchDataFromJob_(job, options, res.schema); + const rows = await this.acceleratedFetchDataFromJob_( + job, + options, + res.schema + ); (callback as QueryRowsCallback)(null, rows, null); return; } catch (err) { @@ -2234,12 +2238,12 @@ export class BigQuery extends Service { private async acceleratedFetchDataFromJob_( job: Job, opts: QueryOptions, - schema?: bigquery.ITableSchema, + schema?: bigquery.ITableSchema ): Promise { if (!this._storageClient) { return Promise.reject('storage client not available'); } - console.time('fetchMetadata'); + console.time('fetchMetadata'); const [metadata] = (await job.getMetadata()) as bigquery.IJob[]; this.trace_('[job metadata]', metadata.configuration?.query); const qconfig = metadata.configuration?.query; diff --git a/src/storage.ts b/src/storage.ts index 294dd46b..b0b51d93 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -17,7 +17,7 @@ import {Transform} from 'stream'; import {BigQuery, QueryRowsResponse, RowMetadata} from '.'; import bigquery from './types'; -// Minimal interface for a BigQuery Storage Read API client +// Minimal interface for a BigQuery Storage Read API client // that can read data from tables. export interface StorageReadClient { createTableReader(req: { @@ -25,7 +25,7 @@ export interface StorageReadClient { }): Promise; } -// Interface for fetching data from a BigQuery table using +// Interface for fetching data from a BigQuery table using // the BigQuery Storage Read API. export interface TableReader { getRows(): Promise; From de0e2ac496770a533f3ff4eef1c5bdb4a690e9fd Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 31 May 2024 11:37:21 -0400 Subject: [PATCH 3/4] fix: prefer === --- src/storage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage.ts b/src/storage.ts index b0b51d93..7c2d9c86 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -41,7 +41,7 @@ export class MergeSchemaTransform extends Transform { wrapIntegers: false, parseJSON: false, }); - if (rows.length == 0) { + if (rows.length === 0) { callback(new Error('failed to convert row'), null); return; } From d38082193b7e69066821c9540acffdcfb45bac0b Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 31 Jul 2024 11:43:57 -0400 Subject: [PATCH 4/4] feat: use storage api in stream mode --- src/bigquery.ts | 69 +++++++++++++++++++++++++++---------------------- src/job.ts | 66 ++++++++++++++++++++++++++++++++++++++++++++-- src/storage.ts | 2 +- 3 files changed, 103 insertions(+), 34 deletions(-) diff --git a/src/bigquery.ts b/src/bigquery.ts index c737f7fb..afc11f26 100644 --- a/src/bigquery.ts +++ b/src/bigquery.ts @@ -46,7 +46,7 @@ import { import {GoogleErrorBody} from '@google-cloud/common/build/src/util'; import bigquery from './types'; import {logger, setLogFunction} from './logger'; -import {StorageReadClient} from './storage'; +import {MergeSchemaTransform, StorageReadClient} from './storage'; // Third-Party Re-exports export {common}; @@ -345,7 +345,7 @@ export class BigQuery extends Service { location?: string; private _universeDomain: string; private _enableQueryPreview: boolean; - private _storageClient?: StorageReadClient; + _storageClient?: StorageReadClient; createQueryStream(options?: Query | string): ResourceStream { // placeholder body, overwritten in constructor @@ -2174,15 +2174,6 @@ export class BigQuery extends Service { (callback as SimpleQueryRowsCallback)(null, [], resp); return; } - if (job && this._storageClient) { - try { - const rows = await this.acceleratedFetchDataFromJob_(job, options); - (callback as QueryRowsCallback)(null, rows, null); - return; - } catch (err) { - this.trace_('failed to fetch using Storage Reader', err); - } - } // The Job is important for the `queryAsStream_` method, so a new query // isn't created each time results are polled for. options = extend({job}, queryOpts, options); @@ -2200,21 +2191,6 @@ export class BigQuery extends Service { options = extend({job}, queryOpts, options); if (res && res.jobComplete) { - if (job && res.pageToken && this._storageClient) { - try { - const rows = await this.acceleratedFetchDataFromJob_( - job, - options, - res.schema - ); - (callback as QueryRowsCallback)(null, rows, null); - return; - } catch (err) { - console.log('failed to fetch using storage reader', err); - this.trace_('failed to fetch using Storage Reader', err); - } - } - let rows: any = []; if (res.schema && res.rows) { rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, { @@ -2240,11 +2216,12 @@ export class BigQuery extends Service { }); } - private async acceleratedFetchDataFromJob_( + async acceleratedFetchDataFromJob_( job: Job, opts: QueryOptions, + callback: QueryRowsCallback, schema?: bigquery.ITableSchema - ): Promise { + ): Promise { if (!this._storageClient) { return Promise.reject('storage client not available'); } @@ -2273,16 +2250,33 @@ export class BigQuery extends Service { }); console.time('fetchStorageAPI'); this.trace_('fetching with Storage API'); + if (opts._asStream) { + const stream = await tableReader.getRowStream(); + const rowStream = stream.pipe( + new MergeSchemaTransform(tableSchema!) + ) as ResourceStream; + rowStream.on('data', data => { + callback(null, [data], { + _streaming: true, + } as any); + }); + rowStream.on('end', () => { + callback(null, [], null); + }); + console.timeEnd('fetchStorageAPI'); + return; + } const [rawRows] = await tableReader.getRows(); - this.trace_('finished fetching with Storage API'); console.timeEnd('fetchStorageAPI'); + this.trace_('finished fetching with Storage API'); console.time('mergeSchemaStorage'); const rows = BigQuery.mergeSchemaWithRows_(tableSchema!, rawRows, { wrapIntegers: opts.wrapIntegers || false, parseJSON: opts.parseJSON || false, }); + console.timeEnd('mergeSchemaStorage'); - return rows; + callback(null, rows, null); } /** @@ -2394,8 +2388,10 @@ export class BigQuery extends Service { job = this.job(jobRef.jobId!, { location: jobRef.location, }); + job.metadata = res; } else if (res.queryId) { job = this.job(res.queryId); // stateless query + job.metadata = res; } callback!(null, job, res); } @@ -2409,8 +2405,18 @@ export class BigQuery extends Service { * @private */ queryAsStream_(query: Query, callback?: SimpleQueryRowsCallback) { + this.trace_('queryAsStream_'); + if ((query as QueryResultsOptions)._streaming) { + return; + } if (query.job) { - query.job.getQueryResults(query, callback as QueryRowsCallback); + query.job.getQueryResults( + { + ...query, + _asStream: true, + }, + callback as QueryRowsCallback + ); return; } @@ -2423,6 +2429,7 @@ export class BigQuery extends Service { wrapIntegers, parseJSON, autoPaginate: false, + _asStream: true, }; delete query.location; diff --git a/src/job.ts b/src/job.ts index 78e6a6d2..4247f6d1 100644 --- a/src/job.ts +++ b/src/job.ts @@ -57,6 +57,8 @@ export type QueryResultsOptions = { */ _cachedRows?: any[]; _cachedResponse?: bigquery.IQueryResponse; + _asStream?: boolean; + _streaming?: boolean; }; /** @@ -453,6 +455,49 @@ class Job extends Operation { ); } + async wait(): Promise { + let metadata: JobMetadata | undefined = this.metadata; + if (!metadata) { + [metadata] = await this.getMetadata(); + } + const jobState = metadata?.status?.state; + this.trace_('wait status', jobState); + if (jobState === 'DONE') { + return Promise.resolve(); + } + const isQuery = !!metadata?.configuration?.query; + if (isQuery) { + const stream = this.getQueryResultsStream({ + maxResults: 0, + }); + return new Promise((resolve, reject) => { + stream.on('data', data => { + this.trace_('data arrived on wait', data); + }); + stream.on('error', err => { + this.trace_('error on wait', err); + reject(err); + }); + stream.on('end', () => { + this.trace_('end on wait'); + resolve(); + }); + }); + } + while (true) { + let newMetadata: JobMetadata | undefined; + [newMetadata] = await this.getMetadata(); + const newJobState = metadata?.status?.state; + if (newJobState === 'DONE') { + return Promise.resolve(); + } + // TODO: exponential backoff + await new Promise(resolve => { + setTimeout(resolve, 100); + }); + } + } + /** * Get the results of a job. * @@ -565,6 +610,12 @@ class Job extends Operation { const timeoutOverride = typeof qs.timeoutMs === 'number' ? qs.timeoutMs : false; + if (this.metadata.jobComplete && this.bigQuery._storageClient) { + this.trace_('job complete and storage available'); + this.bigQuery.acceleratedFetchDataFromJob_(this, options, callback!); + return; + } + if (options._cachedRows) { let nextQuery: QueryResultsOptions | null = null; if (options.pageToken) { @@ -636,7 +687,16 @@ class Job extends Operation { options: QueryResultsOptions, callback: QueryRowsCallback ): void { - options = extend({autoPaginate: false}, options); + if (options._streaming) { + return; + } + options = extend( + { + autoPaginate: false, + _asStream: true, + }, + options + ); this.getQueryResults(options, callback); } @@ -683,7 +743,9 @@ paginator.extend(Job, ['getQueryResults']); * All async methods (except for streams) will return a Promise in the event * that a callback is omitted. */ -promisifyAll(Job); +promisifyAll(Job, { + exclude: ['wait'], +}); /** * Reference to the {@link Job} class. diff --git a/src/storage.ts b/src/storage.ts index 7c2d9c86..fd631cc2 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -29,7 +29,7 @@ export interface StorageReadClient { // the BigQuery Storage Read API. export interface TableReader { getRows(): Promise; - getRowsAsStream(): Promise>; + getRowStream(): Promise>; } export class MergeSchemaTransform extends Transform {