Skip to content

Commit

Permalink
Support all types of data_type using time ingestion partitioning (#496)
Browse files Browse the repository at this point in the history
* Support all types of data_type using time ingestion partitioning

* rework bq_create_table_as & fix partitions

* touchups after verifying no bug

* change case of test field because the parse routine now sanitizes the config val

---------

Co-authored-by: Mila Page <[email protected]>
Co-authored-by: Mila Page <[email protected]>
  • Loading branch information
3 people authored Apr 26, 2023
1 parent 1972d72 commit 7c21644
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 94 deletions.
8 changes: 8 additions & 0 deletions .changes/unreleased/Fixes-20230202-010912.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Fixes
body: Support all types of data_type using time ingestion partitioning as previously
`date` was failing
time: 2023-02-02T01:09:12.013631+01:00
custom:
Author: Kayrnt
Issue: "486"
PR: "496"
74 changes: 55 additions & 19 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,46 @@ class PartitionConfig(dbtClassMixin):
time_ingestion_partitioning: bool = False
copy_partitions: bool = False

PARTITION_DATE = "_PARTITIONDATE"
PARTITION_TIME = "_PARTITIONTIME"

def data_type_for_partition(self):
"""Return the data type of partitions for replacement."""
return self.data_type if not self.time_ingestion_partitioning else "timestamp"
"""Return the data type of partitions for replacement.
When time_ingestion_partitioning is enabled, the data type supported are date & timestamp.
"""
if not self.time_ingestion_partitioning:
return self.data_type

return "date" if self.data_type == "date" else "timestamp"

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]

def data_type_should_be_truncated(self):
"""Return true if the data type should be truncated instead of cast to the data type."""
return not (
self.data_type.lower() == "int64"
or (self.data_type.lower() == "date" and self.granularity.lower() == "day")
self.data_type == "int64" or (self.data_type == "date" and self.granularity == "day")
)

def time_partitioning_field(self) -> str:
"""Return the time partitioning field name based on the data type.
The default is _PARTITIONTIME, but for date it is _PARTITIONDATE
else it will fail statements for type mismatch."""
if self.data_type == "date":
return self.PARTITION_DATE
else:
return self.PARTITION_TIME

def insertable_time_partitioning_field(self) -> str:
"""Return the insertable time partitioning field name based on the data type.
Practically, only _PARTITIONTIME works so far.
The function is meant to keep the call sites consistent as it might evolve."""
return self.PARTITION_TIME

def render(self, alias: Optional[str] = None):
column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME"
column: str = (
self.field if not self.time_ingestion_partitioning else self.time_partitioning_field()
)
if alias:
column = f"{alias}.{column}"

Expand All @@ -107,6 +131,9 @@ def render_wrapped(self, alias: Optional[str] = None):
if (
self.data_type in ("date", "timestamp", "datetime")
and not self.data_type_should_be_truncated()
and not (
self.time_ingestion_partitioning and self.data_type == "date"
) # _PARTITIONDATE is already a date
):
return f"{self.data_type}({self.render(alias)})"
else:
Expand All @@ -118,7 +145,12 @@ def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]:
return None
try:
cls.validate(raw_partition_by)
return cls.from_dict(raw_partition_by)
return cls.from_dict(
{
key: (value.lower() if isinstance(value, str) else value)
for key, value in raw_partition_by.items()
}
)
except ValidationError as exc:
raise dbt.exceptions.DbtValidationError("Could not parse partition config") from exc
except TypeError:
Expand Down Expand Up @@ -273,9 +305,16 @@ def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryCo
return []

@available.parse(lambda *a, **k: [])
def add_time_ingestion_partition_column(self, columns) -> List[BigQueryColumn]:
"Add time ingestion partition column to columns list"
columns.append(self.Column("_PARTITIONTIME", "TIMESTAMP", None, "NULLABLE"))
def add_time_ingestion_partition_column(self, partition_by, columns) -> List[BigQueryColumn]:
"""Add time ingestion partition column to columns list"""
columns.append(
self.Column(
partition_by.insertable_time_partitioning_field(),
partition_by.data_type,
None,
"NULLABLE",
)
)
return columns

def expand_column_types(self, goal: BigQueryRelation, current: BigQueryRelation) -> None: # type: ignore[override]
Expand Down Expand Up @@ -564,18 +603,15 @@ def _partitions_match(table, conf_partition: Optional[PartitionConfig]) -> bool:
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
partitioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partitioning_field.lower()
table_granularity = table.partitioning_type.lower()
conf_table_field = (
conf_partition.field
if not conf_partition.time_ingestion_partitioning
else "_PARTITIONTIME"
table_field = (
table.time_partitioning.field.lower() if table.time_partitioning.field else None
)
table_granularity = table.partitioning_type
conf_table_field = conf_partition.field
return (
table_field == conf_table_field.lower()
and table_granularity == conf_partition.granularity.lower()
)
table_field == conf_table_field
or (conf_partition.time_ingestion_partitioning and table_field is not None)
) and table_granularity == conf_partition.granularity
elif conf_partition and table.range_partitioning is not None:
dest_part = table.range_partitioning
conf_part = conf_partition.range or {}
Expand Down
16 changes: 16 additions & 0 deletions dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
{% macro partition_by(partition_config) -%}
{%- if partition_config is none -%}
{% do return('') %}
{%- elif partition_config.time_ingestion_partitioning -%}
partition by {{ partition_config.render_wrapped() }}
{%- elif partition_config.data_type | lower in ('date','timestamp','datetime') -%}
partition by {{ partition_config.render() }}
{%- elif partition_config.data_type | lower in ('int64') -%}
Expand Down Expand Up @@ -48,6 +50,11 @@
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}
{%- if partition_config.time_ingestion_partitioning -%}
{%- set columns = get_columns_with_types_in_query_sql(sql) -%}
{%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%}
{%- set columns = '(' ~ table_dest_columns_csv ~ ')' -%}
{%- endif -%}

{{ sql_header if sql_header is not none }}

Expand All @@ -57,14 +64,23 @@
{{ get_assert_columns_equivalent(compiled_code) }}
{{ get_table_columns_and_constraints() }}
{%- set compiled_code = get_select_subquery(compiled_code) %}
{% else %}
{#-- cannot do contracts at the same time as time ingestion partitioning -#}
{{ columns }}
{% endif %}
{{ partition_by(partition_config) }}
{{ cluster_by(raw_cluster_by) }}

{{ bigquery_table_options(config, model, temporary) }}

{#-- PARTITION BY cannot be used with the AS query_statement clause.
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#partition_expression
-#}
{%- if not partition_config.time_ingestion_partitioning %}
as (
{{ compiled_code }}
);
{%- endif %}
{%- elif language == 'python' -%}
{#--
N.B. Python models _can_ write to temp views HOWEVER they use a different session
Expand Down
31 changes: 16 additions & 15 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,25 @@
{% macro source_sql_with_partition(partition_by, source_sql) %}

{%- if partition_by.time_ingestion_partitioning %}
{{ return(wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by.field), source_sql, False)) }}
{{ return(wrap_with_time_ingestion_partitioning_sql(partition_by, source_sql, False)) }}
{% else %}
{{ return(source_sql) }}
{%- endif -%}

{% endmacro %}
{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %}
{% if is_time_ingestion_partitioning and language == 'python' %}

{% macro bq_create_table_as(partition_by, temporary, relation, compiled_code, language='sql') %}
{%- set _dbt_max_partition = declare_dbt_max_partition(this, partition_by, compiled_code, language) -%}
{% if partition_by.time_ingestion_partitioning and language == 'python' %}
{% do exceptions.raise_compiler_error(
"Python models do not support ingestion time partitioning"
) %}
{% endif %}
{% if is_time_ingestion_partitioning and language == 'sql' %}
{% elif partition_by.time_ingestion_partitioning and language == 'sql' %}
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
{% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, compiled_code)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, compiled_code)) }}
{% do run_query(create_table_as(temporary, relation, compiled_code)) %}
{{ return(_dbt_max_partition + bq_insert_into_ingestion_time_partitioned_table_sql(relation, compiled_code)) }}
{% else %}
{{ return(create_table_as(temporary, relation, compiled_code, language)) }}
{{ return(_dbt_max_partition + create_table_as(temporary, relation, compiled_code, language)) }}
{% endif %}
{% endmacro %}

Expand Down Expand Up @@ -93,14 +94,14 @@

{% elif existing_relation is none %}
{%- call statement('main', language=language) -%}
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
{{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{%- call statement('main', language=language) -%}
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
{{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif full_refresh_mode %}
Expand All @@ -110,7 +111,7 @@
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{%- call statement('main', language=language) -%}
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
{{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% else %}
Expand All @@ -127,9 +128,7 @@
{#-- Check first, since otherwise we may not build a temp table --#}
{#-- Python always needs to create a temp table --#}
{%- call statement('create_tmp_relation', language=language) -%}
{{ declare_dbt_max_partition(this, partition_by, compiled_code, language) +
bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code, language)
}}
{{ bq_create_table_as(partition_by, True, tmp_relation, compiled_code, language) }}
{%- endcall -%}
{% set tmp_relation_exists = true %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
Expand All @@ -139,9 +138,11 @@
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{#-- Add time ingestion pseudo column to destination column as not part of the 'schema' but still need it for actual data insertion --#}
{% if partition_by.time_ingestion_partitioning %}
{% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %}
{% set dest_columns = adapter.add_time_ingestion_partition_column(partition_by, dest_columns) %}
{% endif %}

{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates
) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
{% macro build_partition_time_exp(partition_by) %}
{% if partition_by.data_type == 'timestamp' %}
{% set partition_value = partition_by.field %}
{% else %}
{% set partition_value = 'timestamp(' + partition_by.field + ')' %}
{% endif %}
{{ return({'value': partition_value, 'field': partition_by.field}) }}
{% endmacro %}

{% macro declare_dbt_max_partition(relation, partition_by, compiled_code, language='sql') %}

{#-- TODO: revisit partitioning with python models --#}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
{%- set source_sql -%}
(
{%- if partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by), sql, True) }}
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }}
{%- else -%}
{{sql}}
{%- endif -%}
Expand All @@ -85,8 +85,7 @@
) %}
{# We run temp table creation in a separated script to move to partitions copy #}
{%- call statement('create_tmp_relation_for_copy', language='sql') -%}
{{ declare_dbt_max_partition(this, partition_by, sql, 'sql') +
bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql, 'sql')
{{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql')
}}
{%- endcall %}
{%- set partitions_sql -%}
Expand All @@ -112,7 +111,7 @@
(
select
{% if partition_by.time_ingestion_partitioning -%}
_PARTITIONTIME,
{{ partition_by.insertable_time_partitioning_field() }},
{%- endif -%}
* from {{ tmp_relation }}
)
Expand All @@ -123,19 +122,18 @@

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table with model data
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql, 'sql') }}
{{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql') }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}
{%- set partition_field = partition_by.time_partitioning_field() if partition_by.time_ingestion_partitioning else partition_by.render_wrapped() -%}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct {{ partition_by.render_wrapped() }} IGNORE NULLS)
array_agg(distinct {{ partition_field }} IGNORE NULLS)
from {{ tmp_relation }}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
(
select
{% if partition_by.time_ingestion_partitioning -%}
_PARTITIONTIME,
{{ partition_by.insertable_time_partitioning_field() }},
{%- endif -%}
* from {{ tmp_relation }}
)
{%- else -%} {#-- wrap sql in parens to make it a subquery --#}
(
{%- if partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by), sql, True) }}
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }}
{%- else -%}
{{sql}}
{%- endif %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,11 @@
{% macro wrap_with_time_ingestion_partitioning_sql(partition_time_exp, sql, is_nested) %}
{% macro wrap_with_time_ingestion_partitioning_sql(partition_by, sql, is_nested) %}

select {{ partition_time_exp['value'] }} as _partitiontime, * EXCEPT({{ partition_time_exp['field'] }}) from (
select TIMESTAMP({{ partition_by.field }}) as {{ partition_by.insertable_time_partitioning_field() }}, * EXCEPT({{ partition_by.field }}) from (
{{ sql }}
){%- if not is_nested -%};{%- endif -%}

{% endmacro %}

{% macro create_ingestion_time_partitioned_table_as_sql(temporary, relation, sql) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{%- set columns = get_columns_with_types_in_query_sql(sql) -%}
{%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%}

{{ sql_header if sql_header is not none }}

{% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %}
{% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %}

{%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%}

create or replace table {{ relation }} ({{table_dest_columns_csv}})
{{ partition_by(ingestion_time_partition_config) }}
{{ cluster_by(raw_cluster_by) }}
{{ bigquery_table_options(config, model, temporary) }}

{%- endmacro -%}

{% macro get_quoted_with_types_csv(columns) %}
{% set quoted = [] %}
{% for col in columns -%}
Expand All @@ -48,12 +24,13 @@
{%- endmacro -%}

{% macro bq_insert_into_ingestion_time_partitioned_table_sql(target_relation, sql) -%}
{%- set partition_by = config.get('partition_by', none) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }})
{{ wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by), sql, False) }}
insert into {{ target_relation }} ({{ partition_by.insertable_time_partitioning_field() }}, {{ dest_columns_csv }})
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, False) }}

{%- endmacro -%}

Expand Down
Loading

0 comments on commit 7c21644

Please sign in to comment.