Skip to content

Commit

Permalink
fix: avoid 403 response too large to return error with read_gbq a…
Browse files Browse the repository at this point in the history
…nd large query results (googleapis#77)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Towards internal issue 303057336 🦕
  • Loading branch information
tswast authored Oct 3, 2023
1 parent 158c00c commit 8f3b5b2
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 75 deletions.
4 changes: 2 additions & 2 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,8 @@ def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue:
destination = self._session._ibis_to_session_table(
ibis_expr, cluster_cols=cluster_cols, api_name="cache"
)
table_expression = self._session.ibis_client.sql(
f"SELECT * FROM `_SESSION`.`{destination.table_id}`"
table_expression = self._session.ibis_client.table(
f"{destination.project}.{destination.dataset_id}.{destination.table_id}"
)
new_columns = [table_expression[column] for column in self.column_names]
new_hidden_columns = [
Expand Down
45 changes: 44 additions & 1 deletion bigframes/core/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

import datetime
import textwrap
from typing import Dict, Union
import types
from typing import Dict, Iterable, Union

import google.cloud.bigquery as bigquery

Expand Down Expand Up @@ -89,6 +90,48 @@ def create_snapshot_sql(
)


# BigQuery REST API returns types in Legacy SQL format
# https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL
# names
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
BQ_STANDARD_TYPES = types.MappingProxyType(
{
"BOOLEAN": "BOOL",
"INTEGER": "INT64",
"FLOAT": "FLOAT64",
}
)


def bq_field_to_type_sql(field: bigquery.SchemaField):
if field.mode == "REPEATED":
nested_type = bq_field_to_type_sql(
bigquery.SchemaField(
field.name, field.field_type, mode="NULLABLE", fields=field.fields
)
)
return f"ARRAY<{nested_type}>"

if field.field_type == "RECORD":
nested_fields_sql = ", ".join(
bq_field_to_sql(child_field) for child_field in field.fields
)
return f"STRUCT<{nested_fields_sql}>"

type_ = field.field_type
return BQ_STANDARD_TYPES.get(type_, type_)


def bq_field_to_sql(field: bigquery.SchemaField):
name = field.name
type_ = bq_field_to_type_sql(field)
return f"`{name}` {type_}"


def bq_schema_to_sql(schema: Iterable[bigquery.SchemaField]):
return ", ".join(bq_field_to_sql(field) for field in schema)


def format_option(key: str, value: Union[bool, str]) -> str:
if isinstance(value, bool):
return f"{key}=true" if value else f"{key}=false"
Expand Down
155 changes: 86 additions & 69 deletions bigframes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,6 @@ def _query_to_destination(
index_cols: List[str],
api_name: str,
) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]:
# If there are no index columns, then there's no reason to cache to a
# (clustered) session table, as we'll just have to query it again to
# create a default index & ordering.
if not index_cols:
_, query_job = self._start_query(query)
return query_job.destination, query_job

# If a dry_run indicates this is not a query type job, then don't
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
dry_run_config = bigquery.QueryJobConfig()
Expand All @@ -465,15 +458,24 @@ def _query_to_destination(
_, query_job = self._start_query(query)
return query_job.destination, query_job

# Make sure we cluster by the index column(s) so that subsequent
# operations are as speedy as they can be.
# Create a table to workaround BigQuery 10 GB query results limit. See:
# internal issue 303057336.
# Since we have a `statement_type == 'SELECT'`, schema should be populated.
schema = typing.cast(Iterable[bigquery.SchemaField], dry_run_job.schema)
temp_table = self._create_session_table_empty(api_name, schema, index_cols)

job_config = bigquery.QueryJobConfig()
job_config.destination = temp_table

try:
ibis_expr = self.ibis_client.sql(query)
return self._ibis_to_session_table(ibis_expr, index_cols, api_name), None
# Write to temp table to workaround BigQuery 10 GB query results
# limit. See: internal issue 303057336.
_, query_job = self._start_query(query, job_config=job_config)
return query_job.destination, query_job
except google.api_core.exceptions.BadRequest:
# Some SELECT statements still aren't compatible with CREATE TEMP
# TABLE ... AS SELECT ... statements. For example, if the query has
# a top-level ORDER BY, this conflicts with our ability to cluster
# Some SELECT statements still aren't compatible with cluster
# tables as the destination. For example, if the query has a
# top-level ORDER BY, this conflicts with our ability to cluster
# the table by the index column(s).
_, query_job = self._start_query(query)
return query_job.destination, query_job
Expand Down Expand Up @@ -1231,6 +1233,54 @@ def _create_session_table(self) -> bigquery.TableReference:
)
return dataset.table(table_name)

def _create_session_table_empty(
self,
api_name: str,
schema: Iterable[bigquery.SchemaField],
cluster_cols: List[str],
) -> bigquery.TableReference:
# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
table = self._create_session_table()
schema_sql = bigframes_io.bq_schema_to_sql(schema)

clusterable_cols = [
col.name
for col in schema
if col.name in cluster_cols and _can_cluster_bq(col)
][:_MAX_CLUSTER_COLUMNS]

if clusterable_cols:
cluster_cols_sql = ", ".join(
f"`{cluster_col}`" for cluster_col in clusterable_cols
)
cluster_sql = f"CLUSTER BY {cluster_cols_sql}"
else:
cluster_sql = ""

ddl_text = f"""
CREATE TEMP TABLE
`_SESSION`.`{table.table_id}`
({schema_sql})
{cluster_sql}
"""

job_config = bigquery.QueryJobConfig()

# Include a label so that Dataplex Lineage can identify temporary
# tables that BigQuery DataFrames creates. Googlers: See internal issue
# 296779699. We're labeling the job instead of the table because
# otherwise we get `BadRequest: 400 OPTIONS on temporary tables are not
# supported`.
job_config.labels = {"source": "bigquery-dataframes-temp"}
job_config.labels["bigframes-api"] = api_name

_, query_job = self._start_query(ddl_text, job_config=job_config)

# Use fully-qualified name instead of `_SESSION` name so that the
# created table can be used as the destination table.
return query_job.destination

def _create_sequential_ordering(
self,
table: ibis_types.Table,
Expand All @@ -1249,7 +1299,9 @@ def _create_sequential_ordering(
cluster_cols=list(index_cols) + [default_ordering_name],
api_name=api_name,
)
table = self.ibis_client.sql(f"SELECT * FROM `{table_ref.table_id}`")
table = self.ibis_client.table(
f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
)
ordering_reference = core.OrderingColumnReference(default_ordering_name)
ordering = core.ExpressionOrdering(
ordering_value_columns=[ordering_reference],
Expand All @@ -1264,55 +1316,13 @@ def _ibis_to_session_table(
cluster_cols: Iterable[str],
api_name: str,
) -> bigquery.TableReference:
clusterable_cols = [
col for col in cluster_cols if _can_cluster(table[col].type())
][:_MAX_CLUSTER_COLUMNS]
return self._query_to_session_table(
desination, _ = self._query_to_destination(
self.ibis_client.compile(table),
cluster_cols=clusterable_cols,
index_cols=list(cluster_cols),
api_name=api_name,
)

def _query_to_session_table(
self,
query_text: str,
cluster_cols: Iterable[str],
api_name: str,
) -> bigquery.TableReference:
if len(list(cluster_cols)) > _MAX_CLUSTER_COLUMNS:
raise ValueError(
f"Too many cluster columns: {list(cluster_cols)}, max {_MAX_CLUSTER_COLUMNS} allowed."
)
# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
table = self._create_session_table()
cluster_cols_sql = ", ".join(f"`{cluster_col}`" for cluster_col in cluster_cols)

# TODO(swast): This might not support multi-statement SQL queries (scripts).
ddl_text = f"""
CREATE TEMP TABLE `_SESSION`.`{table.table_id}`
CLUSTER BY {cluster_cols_sql}
AS {query_text}
"""

job_config = bigquery.QueryJobConfig()

# Include a label so that Dataplex Lineage can identify temporary
# tables that BigQuery DataFrames creates. Googlers: See internal issue
# 296779699. We're labeling the job instead of the table because
# otherwise we get `BadRequest: 400 OPTIONS on temporary tables are not
# supported`.
job_config.labels = {"source": "bigquery-dataframes-temp"}
job_config.labels["bigframes-api"] = api_name

try:
self._start_query(
ddl_text, job_config=job_config
) # Wait for the job to complete
except google.api_core.exceptions.Conflict:
# Allow query retry to succeed.
pass
return table
# There should always be a destination table for this query type.
return typing.cast(bigquery.TableReference, desination)

def remote_function(
self,
Expand Down Expand Up @@ -1494,14 +1504,21 @@ def connect(context: Optional[bigquery_options.BigQueryOptions] = None) -> Sessi
return Session(context)


def _can_cluster(ibis_type: ibis_dtypes.DataType):
def _can_cluster_bq(field: bigquery.SchemaField):
# https://cloud.google.com/bigquery/docs/clustered-tables
# Notably, float is excluded
return (
ibis_type.is_integer()
or ibis_type.is_string()
or ibis_type.is_decimal()
or ibis_type.is_date()
or ibis_type.is_timestamp()
or ibis_type.is_boolean()
type_ = field.field_type
return type_ in (
"INTEGER",
"INT64",
"STRING",
"NUMERIC",
"DECIMAL",
"BIGNUMERIC",
"BIGDECIMAL",
"DATE",
"DATETIME",
"TIMESTAMP",
"BOOL",
"BOOLEAN",
)
3 changes: 2 additions & 1 deletion tests/system/small/ml/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import bigframes
from bigframes.ml import core
import tests.system.utils


def test_model_eval(
Expand Down Expand Up @@ -224,7 +225,7 @@ def test_pca_model_principal_component_info(penguins_bqml_pca_model: core.BqmlMo
"cumulative_explained_variance_ratio": [0.469357, 0.651283, 0.812383],
},
)
pd.testing.assert_frame_equal(
tests.system.utils.assert_pandas_df_equal_ignore_ordering(
result,
expected,
check_exact=False,
Expand Down
5 changes: 3 additions & 2 deletions tests/system/small/ml/test_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pandas as pd

from bigframes.ml import decomposition
import tests.system.utils


def test_pca_predict(penguins_pca_model, new_penguins_df):
Expand Down Expand Up @@ -129,7 +130,7 @@ def test_pca_explained_variance_(penguins_pca_model: decomposition.PCA):
"explained_variance": [3.278657, 1.270829, 1.125354],
},
)
pd.testing.assert_frame_equal(
tests.system.utils.assert_pandas_df_equal_ignore_ordering(
result,
expected,
check_exact=False,
Expand All @@ -148,7 +149,7 @@ def test_pca_explained_variance_ratio_(penguins_pca_model: decomposition.PCA):
"explained_variance_ratio": [0.469357, 0.181926, 0.1611],
},
)
pd.testing.assert_frame_equal(
tests.system.utils.assert_pandas_df_equal_ignore_ordering(
result,
expected,
check_exact=False,
Expand Down
1 change: 1 addition & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_read_gbq_tokyo(
),
pytest.param(
"""SELECT
t.int64_col + 1 as my_ints,
t.float64_col * 2 AS my_floats,
CONCAT(t.string_col, "_2") AS my_strings,
t.int64_col > 0 AS my_bools,
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/core/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

import datetime
from typing import Iterable

import google.cloud.bigquery as bigquery
import pytest

import bigframes.core.io

Expand Down Expand Up @@ -47,3 +49,56 @@ def test_create_snapshot_sql_doesnt_timetravel_session_datasets():

# Don't need the project ID for _SESSION tables.
assert "my-test-project" not in sql


@pytest.mark.parametrize(
("schema", "expected"),
(
(
[bigquery.SchemaField("My Column", "INTEGER")],
"`My Column` INT64",
),
(
[
bigquery.SchemaField("My Column", "INTEGER"),
bigquery.SchemaField("Float Column", "FLOAT"),
bigquery.SchemaField("Bool Column", "BOOLEAN"),
],
"`My Column` INT64, `Float Column` FLOAT64, `Bool Column` BOOL",
),
(
[
bigquery.SchemaField("My Column", "INTEGER", mode="REPEATED"),
bigquery.SchemaField("Float Column", "FLOAT", mode="REPEATED"),
bigquery.SchemaField("Bool Column", "BOOLEAN", mode="REPEATED"),
],
"`My Column` ARRAY<INT64>, `Float Column` ARRAY<FLOAT64>, `Bool Column` ARRAY<BOOL>",
),
(
[
bigquery.SchemaField(
"My Column",
"RECORD",
mode="REPEATED",
fields=(
bigquery.SchemaField("Float Column", "FLOAT", mode="REPEATED"),
bigquery.SchemaField("Bool Column", "BOOLEAN", mode="REPEATED"),
bigquery.SchemaField(
"Nested Column",
"RECORD",
fields=(bigquery.SchemaField("Int Column", "INTEGER"),),
),
),
),
],
(
"`My Column` ARRAY<STRUCT<"
+ "`Float Column` ARRAY<FLOAT64>,"
+ " `Bool Column` ARRAY<BOOL>,"
+ " `Nested Column` STRUCT<`Int Column` INT64>>>"
),
),
),
)
def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str):
pass

0 comments on commit 8f3b5b2

Please sign in to comment.