Skip to content

Commit

Permalink
Add an option to use INFORMATION_SCHEMA for partition info retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Aug 8, 2023
1 parent b06d230 commit 19bd9b8
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 8 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230807-235539.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add an option to use INFORMATION_SCHEMA for partition info retrieval
time: 2023-08-07T23:55:39.31409+02:00
custom:
Author: Kayrnt
Issue: "867"
10 changes: 6 additions & 4 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,15 +598,17 @@ def dry_run(self, sql: str) -> BigQueryAdapterResponse:
def _bq_job_link(location, project_id, job_id) -> str:
return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"

def get_partitions_metadata(self, table):
def get_partitions_metadata(self, table, use_legacy_sql=False):
def standard_to_legacy(table):
return table.project + ":" + table.dataset + "." + table.identifier

legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]"
if use_legacy_sql:
sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]"
else:
sql = f"SELECT * FROM `{table.project}.{table.dataset}.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '{table.identifier}'"

sql = self._add_query_comment(legacy_sql)
# auto_begin is ignored on bigquery, and only included for consistency
_, iterator = self.raw_execute(sql, use_legacy_sql=True)
_, iterator = self.raw_execute(self._add_query_comment(sql), use_legacy_sql=use_legacy_sql)
return self.get_table_from_response(iterator)

def copy_bq_table(self, source, destination, write_disposition):
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class PartitionConfig(dbtClassMixin):
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False
copy_partitions: bool = False
partition_information: str = "model"

PARTITION_DATE = "_PARTITIONDATE"
PARTITION_TIME = "_PARTITIONTIME"
Expand Down
12 changes: 10 additions & 2 deletions dbt/include/bigquery/macros/etc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
{% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %}
{% endmacro %}

{%- macro get_partitions_metadata(table) -%}
{#
This macro returns the partition matadata for provided table.
The expected input is a table object (ie through a `source` or `ref`).
The output contains the result from partitions informations for your input table.
The details of the retrieved columns can be found on https://cloud.google.com/bigquery/docs/managing-partitioned-tables
if use_legacy_sql is set to True, the query will be executed using legacy sql and access the data from __PARTITIONS_SUMMARY__ meta-table
else it will leverage the INFORMATION_SCHEMA.PARTITIONS table.
#}
{%- macro get_partitions_metadata(table, use_legacy_sql = True) -%}
{%- if execute -%}
{%- set res = adapter.get_partitions_metadata(table) -%}
{%- set res = adapter.get_partitions_metadata(table, use_legacy_sql) -%}
{{- return(res) -}}
{%- endif -%}
{{- return(None) -}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,57 @@

{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%}
{%- if partition_by.partition_information == "information_schema" -%}
{{ dbt_max_partition_from_information_schema_data_sql(relation, partition_by) }}
{%- else -%}
{{ dbt_max_partition_from_table_data_sql(relation, partition_by) }}
{%- endif -%}

declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
{%- endif -%}

{% endmacro %}

{% macro dbt_max_partition_from_table_data_sql(relation, partition_by) %}

declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);
);

{% endmacro %}

{% macro max_partition_wrapper(field) %}
{{ "MAX(" ~ field ~ ") AS max_partition" }}
{% endmacro %}

{% macro dbt_max_partition_from_information_schema_data_sql(relation, partition_by) %}
declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
{{ partition_from_information_schema_data_sql(relation, partition_by, max_partition_wrapper) }}
);
{% endmacro %}

{% macro partition_from_information_schema_data_sql(relation, partition_by, field_function) %}

{%- if data_type is none -%}
{%- set data_type = partition_by.data_type -%}
{%- set granularity = partition_by.granularity -%}
{%- endif -%}

{# Format partition_id to match the declared variable type #}
{%- if data_type | lower in ('date', 'timestamp', 'datetime') -%}
{%- if granularity == "day" -%}
{%- set format = "%Y%m%d" -%}
{%- else -%}
{%- set format = "%Y%m%d%H" -%}
{%- endif -%}
{%- set field = "parse_" ~data_type ~ "('" ~ format ~ "', partition_id)" -%}
{%- else -%}
{%- set field = "CAST(partition_id AS INT64)" -%}
{%- endif -%}

SELECT {{ field_function(field) }}
FROM `{{relation.project}}.{{relation.dataset}}.INFORMATION_SCHEMA.PARTITIONS`
WHERE TABLE_NAME = '{{relation.identifier}}'
AND NOT(STARTS_WITH(partition_id, "__"))

{% endmacro %}

0 comments on commit 19bd9b8

Please sign in to comment.