Skip to content

Commit

Permalink
Add an option to use INFORMATION_SCHEMA for partition info retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Aug 19, 2023
1 parent eb58c7d commit 9614b1c
Show file tree
Hide file tree
Showing 9 changed files with 518 additions and 181 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230807-235539.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add an option to use INFORMATION_SCHEMA for partition info retrieval
time: 2023-08-07T23:55:39.31409+02:00
custom:
Author: Kayrnt
Issue: "867"
9 changes: 3 additions & 6 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,14 +599,11 @@ def _bq_job_link(location, project_id, job_id) -> str:
return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"

def get_partitions_metadata(self, table):
def standard_to_legacy(table):
return table.project + ":" + table.dataset + "." + table.identifier
query_sql = f"SELECT * FROM `{table.project}.{table.dataset}.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '{table.identifier}'"

legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]"

sql = self._add_query_comment(legacy_sql)
sql = self._add_query_comment(query_sql)
# auto_begin is ignored on bigquery, and only included for consistency
_, iterator = self.raw_execute(sql, use_legacy_sql=True)
_, iterator = self.raw_execute(sql)
return self.get_table_from_response(iterator)

def copy_bq_table(self, source, destination, write_disposition):
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class PartitionConfig(dbtClassMixin):
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False
copy_partitions: bool = False
partition_information: str = "model"

PARTITION_DATE = "_PARTITIONDATE"
PARTITION_TIME = "_PARTITIONTIME"
Expand Down
7 changes: 7 additions & 0 deletions dbt/include/bigquery/macros/etc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
{% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %}
{% endmacro %}

{#
This macro returns the partition metadata for provided table.
The expected input is a table object (ie through a `source` or `ref`).
The output contains the result from partitions information for your input table.
The details of the retrieved columns can be found on https://cloud.google.com/bigquery/docs/managing-partitioned-tables
It will leverage the INFORMATION_SCHEMA.PARTITIONS table.
#}
{%- macro get_partitions_metadata(table) -%}
{%- if execute -%}
{%- set res = adapter.get_partitions_metadata(table) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,68 @@

{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%}
{%- if partition_by.partition_information == "information_schema" -%}
{{ dbt_max_partition_from_information_schema_data_sql(relation, partition_by) }}
{%- else -%}
{{ dbt_max_partition_from_model_data_sql(relation, partition_by) }}
{%- endif -%}

declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
select max({{ partition_by.field }}) from {{ this }}
{%- endif -%}

{% endmacro %}

{% macro dbt_max_partition_from_model_data_sql(relation, partition_by) %}
declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
select max({{ partition_by.field }}) from {{ relation }}
where {{ partition_by.field }} is not null
);
);
{% endmacro %}

{% macro max_partition_wrapper(field) %}
MAX({{ field }}) AS max_partition
{% endmacro %}

{% macro array_distinct_partition_wrapper(field) %}
as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct {{ field }} IGNORE NULLS)
{% endmacro %}

{% macro dbt_max_partition_from_information_schema_data_sql(relation, partition_by) %}
declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
{{ partition_from_information_schema_data_sql(relation, partition_by, max_partition_wrapper) }}
);
{% endmacro %}

{% macro partition_from_model_data_sql(relation, partition_by, field_function) %}
select {{ field_function(partition_by.render_wrapped()) }}
from {{ relation }}
{% endmacro %}

{% macro partition_from_information_schema_data_sql(relation, partition_by, field_function) %}

{%- set data_type = partition_by.data_type -%}
{%- set granularity = partition_by.granularity -%}

{# Format partition_id to match the declared variable type #}
{%- if data_type | lower in ('date', 'timestamp', 'datetime') -%}
{# Datetime using time partitioning require timestamp #}
{%- if partition_by.time_ingestion_partitioning and partition_by.data_type == 'datetime' -%}
{%- set data_type = 'timestamp' -%}
{%- endif -%}
{%- if granularity == "day" -%}
{%- set format = "%Y%m%d" -%}
{%- else -%}
{%- set format = "%Y%m%d%H" -%}
{%- endif -%}
{%- set field = "parse_" ~ data_type ~ "('" ~ format ~ "', partition_id)" -%}
{%- else -%}
{%- set field = "CAST(partition_id AS INT64)" -%}
{%- endif -%}

SELECT {{ field_function(field) }}
FROM `{{relation.project}}.{{relation.dataset}}.INFORMATION_SCHEMA.PARTITIONS`
WHERE TABLE_NAME = '{{relation.identifier}}'
AND NOT(STARTS_WITH(partition_id, "__"))

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@
{%- endcall %}
{%- endif -%}
{%- set partitions_sql -%}
select distinct {{ partition_by.render_wrapped() }}
from {{ tmp_relation }}
{{ bq_dynamic_copy_partitions_affected_partitions_sql(tmp_relation, partition_by) }}
{%- endset -%}
{%- set partitions = run_query(partitions_sql).columns[0].values() -%}
{# We copy the partitions #}
Expand All @@ -117,6 +116,19 @@
drop table if exists {{ tmp_relation }}
{% endmacro %}

{% macro distinct_partition_wrapper(field) %}
distinct {{ field }} AS partition_ids
{% endmacro %}

{% macro bq_dynamic_copy_partitions_affected_partitions_sql(tmp_relation, partition_by) %}
{% if partition_by.partition_information == "information_schema" %}
{{ partition_from_information_schema_data_sql(tmp_relation, partition_by, distinct_partition_wrapper) }}
{% else %}
select distinct {{ partition_by.render_wrapped() }}
from {{ tmp_relation }}
{% endif %}
{% endmacro %}

{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
{%- if copy_partitions is true %}
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
Expand Down Expand Up @@ -149,10 +161,12 @@

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct {{ partition_field }} IGNORE NULLS)
from {{ tmp_relation }}
{%- if partition_by.partition_information == "information_schema" -%}
{{ partition_from_information_schema_data_sql(tmp_relation, partition_by, array_distinct_partition_wrapper) }}
{%- else -%}
{# TODO fix datetime case to render_wrapped with timestamp #}
{{ partition_from_model_data_sql(tmp_relation, partition_by, array_distinct_partition_wrapper) }}
{%- endif -%}
);

-- 3. run the merge statement
Expand Down
Loading

0 comments on commit 9614b1c

Please sign in to comment.