From 19bd9b810e05bc19420b749a87f24e5ce3b4ad0a Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Mon, 7 Aug 2023 17:58:16 +0200 Subject: [PATCH] Add an option to use INFORMATION_SCHEMA for partition info retrieval --- .../unreleased/Features-20230807-235539.yaml | 6 +++ dbt/adapters/bigquery/connections.py | 10 ++-- dbt/adapters/bigquery/impl.py | 1 + dbt/include/bigquery/macros/etc.sql | 12 ++++- .../incremental_strategy/common.sql | 49 ++++++++++++++++++- 5 files changed, 70 insertions(+), 8 deletions(-) create mode 100644 .changes/unreleased/Features-20230807-235539.yaml diff --git a/.changes/unreleased/Features-20230807-235539.yaml b/.changes/unreleased/Features-20230807-235539.yaml new file mode 100644 index 000000000..0fbde028f --- /dev/null +++ b/.changes/unreleased/Features-20230807-235539.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add an option to use INFORMATION_SCHEMA for partition info retrieval +time: 2023-08-07T23:55:39.31409+02:00 +custom: + Author: Kayrnt + Issue: "867" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index b466fee3b..11548cda4 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -598,15 +598,17 @@ def dry_run(self, sql: str) -> BigQueryAdapterResponse: def _bq_job_link(location, project_id, job_id) -> str: return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults" - def get_partitions_metadata(self, table): + def get_partitions_metadata(self, table, use_legacy_sql=False): def standard_to_legacy(table): return table.project + ":" + table.dataset + "." + table.identifier - legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" + if use_legacy_sql: + sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" + else: + sql = f"SELECT * FROM `{table.project}.{table.dataset}.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '{table.identifier}'" - sql = self._add_query_comment(legacy_sql) # auto_begin is ignored on bigquery, and only included for consistency - _, iterator = self.raw_execute(sql, use_legacy_sql=True) + _, iterator = self.raw_execute(self._add_query_comment(sql), use_legacy_sql=use_legacy_sql) return self.get_table_from_response(iterator) def copy_bq_table(self, source, destination, write_disposition): diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index f53cd4084..08648545e 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -78,6 +78,7 @@ class PartitionConfig(dbtClassMixin): range: Optional[Dict[str, Any]] = None time_ingestion_partitioning: bool = False copy_partitions: bool = False + partition_information: str = "model" PARTITION_DATE = "_PARTITIONDATE" PARTITION_TIME = "_PARTITIONTIME" diff --git a/dbt/include/bigquery/macros/etc.sql b/dbt/include/bigquery/macros/etc.sql index 59b61473e..92cd09169 100644 --- a/dbt/include/bigquery/macros/etc.sql +++ b/dbt/include/bigquery/macros/etc.sql @@ -6,9 +6,17 @@ {% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %} {% endmacro %} -{%- macro get_partitions_metadata(table) -%} +{# + This macro returns the partition matadata for provided table. + The expected input is a table object (ie through a `source` or `ref`). + The output contains the result from partitions informations for your input table. + The details of the retrieved columns can be found on https://cloud.google.com/bigquery/docs/managing-partitioned-tables + if use_legacy_sql is set to True, the query will be executed using legacy sql and access the data from __PARTITIONS_SUMMARY__ meta-table + else it will leverage the INFORMATION_SCHEMA.PARTITIONS table. +#} +{%- macro get_partitions_metadata(table, use_legacy_sql = True) -%} {%- if execute -%} - {%- set res = adapter.get_partitions_metadata(table) -%} + {%- set res = adapter.get_partitions_metadata(table, use_legacy_sql) -%} {{- return(res) -}} {%- endif -%} {{- return(None) -}} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql index 9d71ba7c0..045edfc05 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql @@ -2,12 +2,57 @@ {#-- TODO: revisit partitioning with python models --#} {%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%} + {%- if partition_by.partition_information == "information_schema" -%} + {{ dbt_max_partition_from_information_schema_data_sql(relation, partition_by) }} + {%- else -%} + {{ dbt_max_partition_from_table_data_sql(relation, partition_by) }} + {%- endif -%} - declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( + {%- endif -%} + +{% endmacro %} + +{% macro dbt_max_partition_from_table_data_sql(relation, partition_by) %} + + declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( select max({{ partition_by.field }}) from {{ this }} where {{ partition_by.field }} is not null - ); + ); + +{% endmacro %} +{% macro max_partition_wrapper(field) %} + {{ "MAX(" ~ field ~ ") AS max_partition" }} +{% endmacro %} + +{% macro dbt_max_partition_from_information_schema_data_sql(relation, partition_by) %} + declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( + {{ partition_from_information_schema_data_sql(relation, partition_by, max_partition_wrapper) }} + ); +{% endmacro %} + +{% macro partition_from_information_schema_data_sql(relation, partition_by, field_function) %} + + {%- if data_type is none -%} + {%- set data_type = partition_by.data_type -%} + {%- set granularity = partition_by.granularity -%} {%- endif -%} + {# Format partition_id to match the declared variable type #} + {%- if data_type | lower in ('date', 'timestamp', 'datetime') -%} + {%- if granularity == "day" -%} + {%- set format = "%Y%m%d" -%} + {%- else -%} + {%- set format = "%Y%m%d%H" -%} + {%- endif -%} + {%- set field = "parse_" ~data_type ~ "('" ~ format ~ "', partition_id)" -%} + {%- else -%} + {%- set field = "CAST(partition_id AS INT64)" -%} + {%- endif -%} + + SELECT {{ field_function(field) }} + FROM `{{relation.project}}.{{relation.dataset}}.INFORMATION_SCHEMA.PARTITIONS` + WHERE TABLE_NAME = '{{relation.identifier}}' + AND NOT(STARTS_WITH(partition_id, "__")) + {% endmacro %}