diff --git a/src/bigquery.ts b/src/bigquery.ts index bc85b18d..afc11f26 100644 --- a/src/bigquery.ts +++ b/src/bigquery.ts @@ -46,6 +46,7 @@ import { import {GoogleErrorBody} from '@google-cloud/common/build/src/util'; import bigquery from './types'; import {logger, setLogFunction} from './logger'; +import {MergeSchemaTransform, StorageReadClient} from './storage'; // Third-Party Re-exports export {common}; @@ -247,6 +248,11 @@ export interface BigQueryOptions extends GoogleAuthOptions { * Defaults to `googleapis.com`. */ universeDomain?: string; + + /** + * Storage Reader + */ + storageReadClient?: StorageReadClient; } export interface IntegerTypeCastOptions { @@ -339,6 +345,7 @@ export class BigQuery extends Service { location?: string; private _universeDomain: string; private _enableQueryPreview: boolean; + _storageClient?: StorageReadClient; createQueryStream(options?: Query | string): ResourceStream { // placeholder body, overwritten in constructor @@ -404,6 +411,10 @@ export class BigQuery extends Service { } } + if (options.storageReadClient) { + this._storageClient = options.storageReadClient; + } + this._universeDomain = universeDomain; this.location = options.location; /** @@ -2154,7 +2165,7 @@ export class BigQuery extends Service { const queryReq = this.buildQueryRequest_(query, options); this.trace_('[query] queryReq', queryReq); if (!queryReq) { - this.createQueryJob(query, (err, job, resp) => { + this.createQueryJob(query, async (err, job, resp) => { if (err) { (callback as SimpleQueryRowsCallback)(err, null, resp); return; @@ -2171,7 +2182,7 @@ export class BigQuery extends Service { return; } - this.runJobsQuery(queryReq, (err, job, res) => { + this.runJobsQuery(queryReq, async (err, job, res) => { this.trace_('[runJobsQuery callback]: ', query, err, job, res); if (err) { (callback as SimpleQueryRowsCallback)(err, null, res); @@ -2205,6 +2216,69 @@ export class BigQuery extends Service { }); } + async acceleratedFetchDataFromJob_( + job: Job, + opts: QueryOptions, + callback: QueryRowsCallback, + schema?: bigquery.ITableSchema + ): Promise { + if (!this._storageClient) { + return Promise.reject('storage client not available'); + } + console.time('fetchMetadata'); + const [metadata] = (await job.getMetadata()) as bigquery.IJob[]; + this.trace_('[job metadata]', metadata.configuration?.query); + const qconfig = metadata.configuration?.query; + if (!qconfig) { + return Promise.reject('job is not a query type'); + } + const dstTableRef = qconfig.destinationTable!; + const table = this.dataset(dstTableRef.datasetId!, { + projectId: dstTableRef.projectId, + }).table(dstTableRef.tableId!); + let tableSchema = schema; + if (!tableSchema) { + const [md] = (await table.getMetadata({ + view: 'BASIC', + })) as bigquery.ITable[]; + tableSchema = md.schema; + } + console.timeEnd('fetchMetadata'); + + const tableReader = await this._storageClient.createTableReader({ + table: dstTableRef, + }); + console.time('fetchStorageAPI'); + this.trace_('fetching with Storage API'); + if (opts._asStream) { + const stream = await tableReader.getRowStream(); + const rowStream = stream.pipe( + new MergeSchemaTransform(tableSchema!) + ) as ResourceStream; + rowStream.on('data', data => { + callback(null, [data], { + _streaming: true, + } as any); + }); + rowStream.on('end', () => { + callback(null, [], null); + }); + console.timeEnd('fetchStorageAPI'); + return; + } + const [rawRows] = await tableReader.getRows(); + console.timeEnd('fetchStorageAPI'); + this.trace_('finished fetching with Storage API'); + console.time('mergeSchemaStorage'); + const rows = BigQuery.mergeSchemaWithRows_(tableSchema!, rawRows, { + wrapIntegers: opts.wrapIntegers || false, + parseJSON: opts.parseJSON || false, + }); + + console.timeEnd('mergeSchemaStorage'); + callback(null, rows, null); + } + /** * Check if the given Query can run using the `jobs.query` endpoint. * Returns a bigquery.IQueryRequest that can be used to call `jobs.query`. @@ -2314,8 +2388,10 @@ export class BigQuery extends Service { job = this.job(jobRef.jobId!, { location: jobRef.location, }); + job.metadata = res; } else if (res.queryId) { job = this.job(res.queryId); // stateless query + job.metadata = res; } callback!(null, job, res); } @@ -2329,8 +2405,18 @@ export class BigQuery extends Service { * @private */ queryAsStream_(query: Query, callback?: SimpleQueryRowsCallback) { + this.trace_('queryAsStream_'); + if ((query as QueryResultsOptions)._streaming) { + return; + } if (query.job) { - query.job.getQueryResults(query, callback as QueryRowsCallback); + query.job.getQueryResults( + { + ...query, + _asStream: true, + }, + callback as QueryRowsCallback + ); return; } @@ -2343,6 +2429,7 @@ export class BigQuery extends Service { wrapIntegers, parseJSON, autoPaginate: false, + _asStream: true, }; delete query.location; diff --git a/src/job.ts b/src/job.ts index 78e6a6d2..4247f6d1 100644 --- a/src/job.ts +++ b/src/job.ts @@ -57,6 +57,8 @@ export type QueryResultsOptions = { */ _cachedRows?: any[]; _cachedResponse?: bigquery.IQueryResponse; + _asStream?: boolean; + _streaming?: boolean; }; /** @@ -453,6 +455,49 @@ class Job extends Operation { ); } + async wait(): Promise { + let metadata: JobMetadata | undefined = this.metadata; + if (!metadata) { + [metadata] = await this.getMetadata(); + } + const jobState = metadata?.status?.state; + this.trace_('wait status', jobState); + if (jobState === 'DONE') { + return Promise.resolve(); + } + const isQuery = !!metadata?.configuration?.query; + if (isQuery) { + const stream = this.getQueryResultsStream({ + maxResults: 0, + }); + return new Promise((resolve, reject) => { + stream.on('data', data => { + this.trace_('data arrived on wait', data); + }); + stream.on('error', err => { + this.trace_('error on wait', err); + reject(err); + }); + stream.on('end', () => { + this.trace_('end on wait'); + resolve(); + }); + }); + } + while (true) { + let newMetadata: JobMetadata | undefined; + [newMetadata] = await this.getMetadata(); + const newJobState = metadata?.status?.state; + if (newJobState === 'DONE') { + return Promise.resolve(); + } + // TODO: exponential backoff + await new Promise(resolve => { + setTimeout(resolve, 100); + }); + } + } + /** * Get the results of a job. * @@ -565,6 +610,12 @@ class Job extends Operation { const timeoutOverride = typeof qs.timeoutMs === 'number' ? qs.timeoutMs : false; + if (this.metadata.jobComplete && this.bigQuery._storageClient) { + this.trace_('job complete and storage available'); + this.bigQuery.acceleratedFetchDataFromJob_(this, options, callback!); + return; + } + if (options._cachedRows) { let nextQuery: QueryResultsOptions | null = null; if (options.pageToken) { @@ -636,7 +687,16 @@ class Job extends Operation { options: QueryResultsOptions, callback: QueryRowsCallback ): void { - options = extend({autoPaginate: false}, options); + if (options._streaming) { + return; + } + options = extend( + { + autoPaginate: false, + _asStream: true, + }, + options + ); this.getQueryResults(options, callback); } @@ -683,7 +743,9 @@ paginator.extend(Job, ['getQueryResults']); * All async methods (except for streams) will return a Promise in the event * that a callback is omitted. */ -promisifyAll(Job); +promisifyAll(Job, { + exclude: ['wait'], +}); /** * Reference to the {@link Job} class. diff --git a/src/storage.ts b/src/storage.ts new file mode 100644 index 00000000..fd631cc2 --- /dev/null +++ b/src/storage.ts @@ -0,0 +1,52 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; +import {Transform} from 'stream'; +import {BigQuery, QueryRowsResponse, RowMetadata} from '.'; +import bigquery from './types'; + +// Minimal interface for a BigQuery Storage Read API client +// that can read data from tables. +export interface StorageReadClient { + createTableReader(req: { + table: bigquery.ITableReference; + }): Promise; +} + +// Interface for fetching data from a BigQuery table using +// the BigQuery Storage Read API. +export interface TableReader { + getRows(): Promise; + getRowStream(): Promise>; +} + +export class MergeSchemaTransform extends Transform { + constructor(schema: bigquery.ITableSchema) { + super({ + objectMode: true, + transform(row, _, callback) { + const rows = BigQuery.mergeSchemaWithRows_(schema, [row], { + wrapIntegers: false, + parseJSON: false, + }); + if (rows.length === 0) { + callback(new Error('failed to convert row'), null); + return; + } + callback(null, rows[0]); + }, + }); + } +}