diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 5e0675fd13..8008c1189a 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -1198,8 +1198,8 @@ def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue: destination = self._session._ibis_to_session_table( ibis_expr, cluster_cols=cluster_cols, api_name="cache" ) - table_expression = self._session.ibis_client.sql( - f"SELECT * FROM `_SESSION`.`{destination.table_id}`" + table_expression = self._session.ibis_client.table( + f"{destination.project}.{destination.dataset_id}.{destination.table_id}" ) new_columns = [table_expression[column] for column in self.column_names] new_hidden_columns = [ diff --git a/bigframes/core/io.py b/bigframes/core/io.py index 3c2e5a25f5..d47efbdddc 100644 --- a/bigframes/core/io.py +++ b/bigframes/core/io.py @@ -16,7 +16,8 @@ import datetime import textwrap -from typing import Dict, Union +import types +from typing import Dict, Iterable, Union import google.cloud.bigquery as bigquery @@ -89,6 +90,48 @@ def create_snapshot_sql( ) +# BigQuery REST API returns types in Legacy SQL format +# https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL +# names +# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types +BQ_STANDARD_TYPES = types.MappingProxyType( + { + "BOOLEAN": "BOOL", + "INTEGER": "INT64", + "FLOAT": "FLOAT64", + } +) + + +def bq_field_to_type_sql(field: bigquery.SchemaField): + if field.mode == "REPEATED": + nested_type = bq_field_to_type_sql( + bigquery.SchemaField( + field.name, field.field_type, mode="NULLABLE", fields=field.fields + ) + ) + return f"ARRAY<{nested_type}>" + + if field.field_type == "RECORD": + nested_fields_sql = ", ".join( + bq_field_to_sql(child_field) for child_field in field.fields + ) + return f"STRUCT<{nested_fields_sql}>" + + type_ = field.field_type + return BQ_STANDARD_TYPES.get(type_, type_) + + +def bq_field_to_sql(field: bigquery.SchemaField): + name = field.name + type_ = bq_field_to_type_sql(field) + return f"`{name}` {type_}" + + +def bq_schema_to_sql(schema: Iterable[bigquery.SchemaField]): + return ", ".join(bq_field_to_sql(field) for field in schema) + + def format_option(key: str, value: Union[bool, str]) -> str: if isinstance(value, bool): return f"{key}=true" if value else f"{key}=false" diff --git a/bigframes/session.py b/bigframes/session.py index 7b827c7dcf..ac48c977cb 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -449,13 +449,6 @@ def _query_to_destination( index_cols: List[str], api_name: str, ) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]: - # If there are no index columns, then there's no reason to cache to a - # (clustered) session table, as we'll just have to query it again to - # create a default index & ordering. - if not index_cols: - _, query_job = self._start_query(query) - return query_job.destination, query_job - # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. dry_run_config = bigquery.QueryJobConfig() @@ -465,15 +458,24 @@ def _query_to_destination( _, query_job = self._start_query(query) return query_job.destination, query_job - # Make sure we cluster by the index column(s) so that subsequent - # operations are as speedy as they can be. + # Create a table to workaround BigQuery 10 GB query results limit. See: + # internal issue 303057336. + # Since we have a `statement_type == 'SELECT'`, schema should be populated. + schema = typing.cast(Iterable[bigquery.SchemaField], dry_run_job.schema) + temp_table = self._create_session_table_empty(api_name, schema, index_cols) + + job_config = bigquery.QueryJobConfig() + job_config.destination = temp_table + try: - ibis_expr = self.ibis_client.sql(query) - return self._ibis_to_session_table(ibis_expr, index_cols, api_name), None + # Write to temp table to workaround BigQuery 10 GB query results + # limit. See: internal issue 303057336. + _, query_job = self._start_query(query, job_config=job_config) + return query_job.destination, query_job except google.api_core.exceptions.BadRequest: - # Some SELECT statements still aren't compatible with CREATE TEMP - # TABLE ... AS SELECT ... statements. For example, if the query has - # a top-level ORDER BY, this conflicts with our ability to cluster + # Some SELECT statements still aren't compatible with cluster + # tables as the destination. For example, if the query has a + # top-level ORDER BY, this conflicts with our ability to cluster # the table by the index column(s). _, query_job = self._start_query(query) return query_job.destination, query_job @@ -1231,6 +1233,54 @@ def _create_session_table(self) -> bigquery.TableReference: ) return dataset.table(table_name) + def _create_session_table_empty( + self, + api_name: str, + schema: Iterable[bigquery.SchemaField], + cluster_cols: List[str], + ) -> bigquery.TableReference: + # Can't set a table in _SESSION as destination via query job API, so we + # run DDL, instead. + table = self._create_session_table() + schema_sql = bigframes_io.bq_schema_to_sql(schema) + + clusterable_cols = [ + col.name + for col in schema + if col.name in cluster_cols and _can_cluster_bq(col) + ][:_MAX_CLUSTER_COLUMNS] + + if clusterable_cols: + cluster_cols_sql = ", ".join( + f"`{cluster_col}`" for cluster_col in clusterable_cols + ) + cluster_sql = f"CLUSTER BY {cluster_cols_sql}" + else: + cluster_sql = "" + + ddl_text = f""" + CREATE TEMP TABLE + `_SESSION`.`{table.table_id}` + ({schema_sql}) + {cluster_sql} + """ + + job_config = bigquery.QueryJobConfig() + + # Include a label so that Dataplex Lineage can identify temporary + # tables that BigQuery DataFrames creates. Googlers: See internal issue + # 296779699. We're labeling the job instead of the table because + # otherwise we get `BadRequest: 400 OPTIONS on temporary tables are not + # supported`. + job_config.labels = {"source": "bigquery-dataframes-temp"} + job_config.labels["bigframes-api"] = api_name + + _, query_job = self._start_query(ddl_text, job_config=job_config) + + # Use fully-qualified name instead of `_SESSION` name so that the + # created table can be used as the destination table. + return query_job.destination + def _create_sequential_ordering( self, table: ibis_types.Table, @@ -1249,7 +1299,9 @@ def _create_sequential_ordering( cluster_cols=list(index_cols) + [default_ordering_name], api_name=api_name, ) - table = self.ibis_client.sql(f"SELECT * FROM `{table_ref.table_id}`") + table = self.ibis_client.table( + f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" + ) ordering_reference = core.OrderingColumnReference(default_ordering_name) ordering = core.ExpressionOrdering( ordering_value_columns=[ordering_reference], @@ -1264,55 +1316,13 @@ def _ibis_to_session_table( cluster_cols: Iterable[str], api_name: str, ) -> bigquery.TableReference: - clusterable_cols = [ - col for col in cluster_cols if _can_cluster(table[col].type()) - ][:_MAX_CLUSTER_COLUMNS] - return self._query_to_session_table( + desination, _ = self._query_to_destination( self.ibis_client.compile(table), - cluster_cols=clusterable_cols, + index_cols=list(cluster_cols), api_name=api_name, ) - - def _query_to_session_table( - self, - query_text: str, - cluster_cols: Iterable[str], - api_name: str, - ) -> bigquery.TableReference: - if len(list(cluster_cols)) > _MAX_CLUSTER_COLUMNS: - raise ValueError( - f"Too many cluster columns: {list(cluster_cols)}, max {_MAX_CLUSTER_COLUMNS} allowed." - ) - # Can't set a table in _SESSION as destination via query job API, so we - # run DDL, instead. - table = self._create_session_table() - cluster_cols_sql = ", ".join(f"`{cluster_col}`" for cluster_col in cluster_cols) - - # TODO(swast): This might not support multi-statement SQL queries (scripts). - ddl_text = f""" - CREATE TEMP TABLE `_SESSION`.`{table.table_id}` - CLUSTER BY {cluster_cols_sql} - AS {query_text} - """ - - job_config = bigquery.QueryJobConfig() - - # Include a label so that Dataplex Lineage can identify temporary - # tables that BigQuery DataFrames creates. Googlers: See internal issue - # 296779699. We're labeling the job instead of the table because - # otherwise we get `BadRequest: 400 OPTIONS on temporary tables are not - # supported`. - job_config.labels = {"source": "bigquery-dataframes-temp"} - job_config.labels["bigframes-api"] = api_name - - try: - self._start_query( - ddl_text, job_config=job_config - ) # Wait for the job to complete - except google.api_core.exceptions.Conflict: - # Allow query retry to succeed. - pass - return table + # There should always be a destination table for this query type. + return typing.cast(bigquery.TableReference, desination) def remote_function( self, @@ -1494,14 +1504,21 @@ def connect(context: Optional[bigquery_options.BigQueryOptions] = None) -> Sessi return Session(context) -def _can_cluster(ibis_type: ibis_dtypes.DataType): +def _can_cluster_bq(field: bigquery.SchemaField): # https://cloud.google.com/bigquery/docs/clustered-tables # Notably, float is excluded - return ( - ibis_type.is_integer() - or ibis_type.is_string() - or ibis_type.is_decimal() - or ibis_type.is_date() - or ibis_type.is_timestamp() - or ibis_type.is_boolean() + type_ = field.field_type + return type_ in ( + "INTEGER", + "INT64", + "STRING", + "NUMERIC", + "DECIMAL", + "BIGNUMERIC", + "BIGDECIMAL", + "DATE", + "DATETIME", + "TIMESTAMP", + "BOOL", + "BOOLEAN", ) diff --git a/tests/system/small/ml/test_core.py b/tests/system/small/ml/test_core.py index ace943956f..f911dd7eeb 100644 --- a/tests/system/small/ml/test_core.py +++ b/tests/system/small/ml/test_core.py @@ -23,6 +23,7 @@ import bigframes from bigframes.ml import core +import tests.system.utils def test_model_eval( @@ -224,7 +225,7 @@ def test_pca_model_principal_component_info(penguins_bqml_pca_model: core.BqmlMo "cumulative_explained_variance_ratio": [0.469357, 0.651283, 0.812383], }, ) - pd.testing.assert_frame_equal( + tests.system.utils.assert_pandas_df_equal_ignore_ordering( result, expected, check_exact=False, diff --git a/tests/system/small/ml/test_decomposition.py b/tests/system/small/ml/test_decomposition.py index c71bbbe3b0..e31681f4a0 100644 --- a/tests/system/small/ml/test_decomposition.py +++ b/tests/system/small/ml/test_decomposition.py @@ -15,6 +15,7 @@ import pandas as pd from bigframes.ml import decomposition +import tests.system.utils def test_pca_predict(penguins_pca_model, new_penguins_df): @@ -129,7 +130,7 @@ def test_pca_explained_variance_(penguins_pca_model: decomposition.PCA): "explained_variance": [3.278657, 1.270829, 1.125354], }, ) - pd.testing.assert_frame_equal( + tests.system.utils.assert_pandas_df_equal_ignore_ordering( result, expected, check_exact=False, @@ -148,7 +149,7 @@ def test_pca_explained_variance_ratio_(penguins_pca_model: decomposition.PCA): "explained_variance_ratio": [0.469357, 0.181926, 0.1611], }, ) - pd.testing.assert_frame_equal( + tests.system.utils.assert_pandas_df_equal_ignore_ordering( result, expected, check_exact=False, diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index d2560174e4..53ddfa3c49 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -57,6 +57,7 @@ def test_read_gbq_tokyo( ), pytest.param( """SELECT + t.int64_col + 1 as my_ints, t.float64_col * 2 AS my_floats, CONCAT(t.string_col, "_2") AS my_strings, t.int64_col > 0 AS my_bools, diff --git a/tests/unit/core/test_io.py b/tests/unit/core/test_io.py index c5074f80c2..afb38a5f75 100644 --- a/tests/unit/core/test_io.py +++ b/tests/unit/core/test_io.py @@ -13,8 +13,10 @@ # limitations under the License. import datetime +from typing import Iterable import google.cloud.bigquery as bigquery +import pytest import bigframes.core.io @@ -47,3 +49,56 @@ def test_create_snapshot_sql_doesnt_timetravel_session_datasets(): # Don't need the project ID for _SESSION tables. assert "my-test-project" not in sql + + +@pytest.mark.parametrize( + ("schema", "expected"), + ( + ( + [bigquery.SchemaField("My Column", "INTEGER")], + "`My Column` INT64", + ), + ( + [ + bigquery.SchemaField("My Column", "INTEGER"), + bigquery.SchemaField("Float Column", "FLOAT"), + bigquery.SchemaField("Bool Column", "BOOLEAN"), + ], + "`My Column` INT64, `Float Column` FLOAT64, `Bool Column` BOOL", + ), + ( + [ + bigquery.SchemaField("My Column", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("Float Column", "FLOAT", mode="REPEATED"), + bigquery.SchemaField("Bool Column", "BOOLEAN", mode="REPEATED"), + ], + "`My Column` ARRAY, `Float Column` ARRAY, `Bool Column` ARRAY", + ), + ( + [ + bigquery.SchemaField( + "My Column", + "RECORD", + mode="REPEATED", + fields=( + bigquery.SchemaField("Float Column", "FLOAT", mode="REPEATED"), + bigquery.SchemaField("Bool Column", "BOOLEAN", mode="REPEATED"), + bigquery.SchemaField( + "Nested Column", + "RECORD", + fields=(bigquery.SchemaField("Int Column", "INTEGER"),), + ), + ), + ), + ], + ( + "`My Column` ARRAY," + + " `Bool Column` ARRAY," + + " `Nested Column` STRUCT<`Int Column` INT64>>>" + ), + ), + ), +) +def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str): + pass