From 4598bc3ebf7f0d72b1143afb9931a2dd8e208506 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 31 May 2024 11:16:12 -0400 Subject: [PATCH] feat: use Storage Read API for faster data fetching --- src/bigquery.ts | 80 +++++++++++++++++++++++++++++++++++++++++++++++-- src/storage.ts | 52 ++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 src/storage.ts diff --git a/src/bigquery.ts b/src/bigquery.ts index 9a4b8fff..040208ff 100644 --- a/src/bigquery.ts +++ b/src/bigquery.ts @@ -46,6 +46,7 @@ import { import {GoogleErrorBody} from '@google-cloud/common/build/src/util'; import bigquery from './types'; import {logger, setLogFunction} from './logger'; +import {StorageReadClient} from './storage'; // Third-Party Re-exports export {common}; @@ -243,6 +244,11 @@ export interface BigQueryOptions extends GoogleAuthOptions { * Defaults to `googleapis.com`. */ universeDomain?: string; + + /** + * Storage Reader + */ + storageReadClient?: StorageReadClient; } export interface IntegerTypeCastOptions { @@ -335,6 +341,7 @@ export class BigQuery extends Service { location?: string; private _universeDomain: string; private _enableQueryPreview: boolean; + private _storageClient?: StorageReadClient; createQueryStream(options?: Query | string): ResourceStream { // placeholder body, overwritten in constructor @@ -400,6 +407,10 @@ export class BigQuery extends Service { } } + if (options.storageReadClient) { + this._storageClient = options.storageReadClient; + } + this._universeDomain = universeDomain; this.location = options.location; /** @@ -2150,7 +2161,7 @@ export class BigQuery extends Service { const queryReq = this.buildQueryRequest_(query, options); this.trace_('[query] queryReq', queryReq); if (!queryReq) { - this.createQueryJob(query, (err, job, resp) => { + this.createQueryJob(query, async (err, job, resp) => { if (err) { (callback as SimpleQueryRowsCallback)(err, null, resp); return; @@ -2159,6 +2170,15 @@ export class BigQuery extends Service { (callback as SimpleQueryRowsCallback)(null, [], resp); return; } + if (job && this._storageClient) { + try { + const rows = await this.acceleratedFetchDataFromJob_(job, options); + (callback as QueryRowsCallback)(null, rows, null); + return; + } catch (err) { + this.trace_('failed to fetch using Storage Reader', err); + } + } // The Job is important for the `queryAsStream_` method, so a new query // isn't created each time results are polled for. options = extend({job}, queryOpts, options); @@ -2167,7 +2187,7 @@ export class BigQuery extends Service { return; } - this.runJobsQuery(queryReq, (err, job, res) => { + this.runJobsQuery(queryReq, async (err, job, res) => { this.trace_('[runJobsQuery callback]: ', query, err, job, res); if (err) { (callback as SimpleQueryRowsCallback)(err, null, res); @@ -2176,6 +2196,17 @@ export class BigQuery extends Service { options = extend({job}, queryOpts, options); if (res && res.jobComplete) { + if (job && res.pageToken && this._storageClient) { + try { + const rows = await this.acceleratedFetchDataFromJob_(job, options, res.schema); + (callback as QueryRowsCallback)(null, rows, null); + return; + } catch (err) { + console.log('failed to fetch using storage reader', err); + this.trace_('failed to fetch using Storage Reader', err); + } + } + let rows: any = []; if (res.schema && res.rows) { rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, { @@ -2200,6 +2231,51 @@ export class BigQuery extends Service { }); } + private async acceleratedFetchDataFromJob_( + job: Job, + opts: QueryOptions, + schema?: bigquery.ITableSchema, + ): Promise { + if (!this._storageClient) { + return Promise.reject('storage client not available'); + } + console.time('fetchMetadata'); + const [metadata] = (await job.getMetadata()) as bigquery.IJob[]; + this.trace_('[job metadata]', metadata.configuration?.query); + const qconfig = metadata.configuration?.query; + if (!qconfig) { + return Promise.reject('job is not a query type'); + } + const dstTableRef = qconfig.destinationTable!; + const table = this.dataset(dstTableRef.datasetId!, { + projectId: dstTableRef.projectId, + }).table(dstTableRef.tableId!); + let tableSchema = schema; + if (!tableSchema) { + const [md] = (await table.getMetadata({ + view: 'BASIC', + })) as bigquery.ITable[]; + tableSchema = md.schema; + } + console.timeEnd('fetchMetadata'); + + const tableReader = await this._storageClient.createTableReader({ + table: dstTableRef, + }); + console.time('fetchStorageAPI'); + this.trace_('fetching with Storage API'); + const [rawRows] = await tableReader.getRows(); + this.trace_('finished fetching with Storage API'); + console.timeEnd('fetchStorageAPI'); + console.time('mergeSchemaStorage'); + const rows = BigQuery.mergeSchemaWithRows_(tableSchema!, rawRows, { + wrapIntegers: opts.wrapIntegers || false, + parseJSON: opts.parseJSON || false, + }); + console.timeEnd('mergeSchemaStorage'); + return rows; + } + /** * Check if the given Query can run using the `jobs.query` endpoint. * Returns a bigquery.IQueryRequest that can be used to call `jobs.query`. diff --git a/src/storage.ts b/src/storage.ts new file mode 100644 index 00000000..294dd46b --- /dev/null +++ b/src/storage.ts @@ -0,0 +1,52 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; +import {Transform} from 'stream'; +import {BigQuery, QueryRowsResponse, RowMetadata} from '.'; +import bigquery from './types'; + +// Minimal interface for a BigQuery Storage Read API client +// that can read data from tables. +export interface StorageReadClient { + createTableReader(req: { + table: bigquery.ITableReference; + }): Promise; +} + +// Interface for fetching data from a BigQuery table using +// the BigQuery Storage Read API. +export interface TableReader { + getRows(): Promise; + getRowsAsStream(): Promise>; +} + +export class MergeSchemaTransform extends Transform { + constructor(schema: bigquery.ITableSchema) { + super({ + objectMode: true, + transform(row, _, callback) { + const rows = BigQuery.mergeSchemaWithRows_(schema, [row], { + wrapIntegers: false, + parseJSON: false, + }); + if (rows.length == 0) { + callback(new Error('failed to convert row'), null); + return; + } + callback(null, rows[0]); + }, + }); + } +}