Skip to content

Commit

Permalink
feat: use Storage Read API for faster data fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx committed May 31, 2024
1 parent 247fc15 commit 4598bc3
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 2 deletions.
80 changes: 78 additions & 2 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
import {GoogleErrorBody} from '@google-cloud/common/build/src/util';
import bigquery from './types';
import {logger, setLogFunction} from './logger';
import {StorageReadClient} from './storage';

// Third-Party Re-exports
export {common};
Expand Down Expand Up @@ -243,6 +244,11 @@ export interface BigQueryOptions extends GoogleAuthOptions {
* Defaults to `googleapis.com`.
*/
universeDomain?: string;

/**
* Storage Reader
*/
storageReadClient?: StorageReadClient;
}

export interface IntegerTypeCastOptions {
Expand Down Expand Up @@ -335,6 +341,7 @@ export class BigQuery extends Service {
location?: string;
private _universeDomain: string;
private _enableQueryPreview: boolean;
private _storageClient?: StorageReadClient;

createQueryStream(options?: Query | string): ResourceStream<RowMetadata> {
// placeholder body, overwritten in constructor
Expand Down Expand Up @@ -400,6 +407,10 @@ export class BigQuery extends Service {
}
}

if (options.storageReadClient) {
this._storageClient = options.storageReadClient;
}

this._universeDomain = universeDomain;
this.location = options.location;
/**
Expand Down Expand Up @@ -2150,7 +2161,7 @@ export class BigQuery extends Service {
const queryReq = this.buildQueryRequest_(query, options);
this.trace_('[query] queryReq', queryReq);
if (!queryReq) {
this.createQueryJob(query, (err, job, resp) => {
this.createQueryJob(query, async (err, job, resp) => {
if (err) {
(callback as SimpleQueryRowsCallback)(err, null, resp);
return;
Expand All @@ -2159,6 +2170,15 @@ export class BigQuery extends Service {
(callback as SimpleQueryRowsCallback)(null, [], resp);
return;
}
if (job && this._storageClient) {
try {
const rows = await this.acceleratedFetchDataFromJob_(job, options);
(callback as QueryRowsCallback)(null, rows, null);
return;
} catch (err) {
this.trace_('failed to fetch using Storage Reader', err);
}
}
// The Job is important for the `queryAsStream_` method, so a new query
// isn't created each time results are polled for.
options = extend({job}, queryOpts, options);
Expand All @@ -2167,7 +2187,7 @@ export class BigQuery extends Service {
return;
}

this.runJobsQuery(queryReq, (err, job, res) => {
this.runJobsQuery(queryReq, async (err, job, res) => {
this.trace_('[runJobsQuery callback]: ', query, err, job, res);
if (err) {
(callback as SimpleQueryRowsCallback)(err, null, res);
Expand All @@ -2176,6 +2196,17 @@ export class BigQuery extends Service {

options = extend({job}, queryOpts, options);
if (res && res.jobComplete) {
if (job && res.pageToken && this._storageClient) {
try {
const rows = await this.acceleratedFetchDataFromJob_(job, options, res.schema);
(callback as QueryRowsCallback)(null, rows, null);
return;
} catch (err) {
console.log('failed to fetch using storage reader', err);

Check failure on line 2205 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `job,·options,·res.schema` with `⏎··············job,⏎··············options,⏎··············res.schema⏎············`
this.trace_('failed to fetch using Storage Reader', err);
}
}

let rows: any = [];
if (res.schema && res.rows) {
rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, {
Expand All @@ -2200,6 +2231,51 @@ export class BigQuery extends Service {
});
}

private async acceleratedFetchDataFromJob_(
job: Job,
opts: QueryOptions,
schema?: bigquery.ITableSchema,
): Promise<any[]> {
if (!this._storageClient) {
return Promise.reject('storage client not available');
}
console.time('fetchMetadata');

Check failure on line 2242 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `,`
const [metadata] = (await job.getMetadata()) as bigquery.IJob[];

Check warning on line 2243 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
this.trace_('[job metadata]', metadata.configuration?.query);
const qconfig = metadata.configuration?.query;
if (!qconfig) {
return Promise.reject('job is not a query type');

Check failure on line 2247 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `····`

Check failure on line 2247 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
}
const dstTableRef = qconfig.destinationTable!;
const table = this.dataset(dstTableRef.datasetId!, {
projectId: dstTableRef.projectId,
}).table(dstTableRef.tableId!);
let tableSchema = schema;
if (!tableSchema) {
const [md] = (await table.getMetadata({
view: 'BASIC',
})) as bigquery.ITable[];
tableSchema = md.schema;
}
console.timeEnd('fetchMetadata');

const tableReader = await this._storageClient.createTableReader({
table: dstTableRef,
});
console.time('fetchStorageAPI');
this.trace_('fetching with Storage API');
const [rawRows] = await tableReader.getRows();
this.trace_('finished fetching with Storage API');
console.timeEnd('fetchStorageAPI');
console.time('mergeSchemaStorage');
const rows = BigQuery.mergeSchemaWithRows_(tableSchema!, rawRows, {
wrapIntegers: opts.wrapIntegers || false,
parseJSON: opts.parseJSON || false,
});
console.timeEnd('mergeSchemaStorage');
return rows;
}

/**
* Check if the given Query can run using the `jobs.query` endpoint.
* Returns a bigquery.IQueryRequest that can be used to call `jobs.query`.
Expand Down
52 changes: 52 additions & 0 deletions src/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {ResourceStream} from '@google-cloud/paginator';
import {Transform} from 'stream';
import {BigQuery, QueryRowsResponse, RowMetadata} from '.';
import bigquery from './types';

// Minimal interface for a BigQuery Storage Read API client

Check failure on line 20 in src/storage.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `·`

Check failure on line 20 in src/storage.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// that can read data from tables.
export interface StorageReadClient {
createTableReader(req: {
table: bigquery.ITableReference;
}): Promise<TableReader>;
}

// Interface for fetching data from a BigQuery table using

Check failure on line 28 in src/storage.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `·`

Check failure on line 28 in src/storage.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// the BigQuery Storage Read API.
export interface TableReader {
getRows(): Promise<QueryRowsResponse>;
getRowsAsStream(): Promise<ResourceStream<RowMetadata>>;
}

export class MergeSchemaTransform extends Transform {
constructor(schema: bigquery.ITableSchema) {
super({
objectMode: true,
transform(row, _, callback) {
const rows = BigQuery.mergeSchemaWithRows_(schema, [row], {
wrapIntegers: false,
parseJSON: false,
});
if (rows.length == 0) {

Check failure on line 44 in src/storage.ts

View workflow job for this annotation

GitHub Actions / lint

Expected '===' and instead saw '=='
callback(new Error('failed to convert row'), null);
return;
}
callback(null, rows[0]);
},
});
}
}

0 comments on commit 4598bc3

Please sign in to comment.