diff --git a/.changes/unreleased/Fixes-20240921-213533.yaml b/.changes/unreleased/Fixes-20240921-213533.yaml new file mode 100644 index 000000000..6e478a00f --- /dev/null +++ b/.changes/unreleased/Fixes-20240921-213533.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix copy_partitions when used with static partitions +time: 2024-09-21T21:35:33.180816+02:00 +custom: + Author: Kayrnt + Issue: "1349" diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 3ba67931e..074ca6336 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -47,37 +47,63 @@ {% endif %} {% endmacro %} +{% macro bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %} + {%- set source_sql -%} + ( + {% if partition_by.time_ingestion_partitioning and tmp_relation_exists -%} + select + {{ partition_by.insertable_time_partitioning_field() }}, + * from {{ tmp_relation }} + {% elif tmp_relation_exists -%} + select + * from {{ tmp_relation }} + {%- elif partition_by.time_ingestion_partitioning -%} + {{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }} + {%- else -%} + {{sql}} + {%- endif %} + ) + {%- endset -%} + {{ return(source_sql) }} +{% endmacro %} + +{% macro bq_static_copy_partitions_insert_overwrite_sql( + tmp_relation, target_relation, sql, partition_by, partitions, tmp_relation_exists + ) %} + {%- if tmp_relation_exists is false -%} + {%- set source_sql = bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %} + {# We run temp table creation in a separated script to move to partitions copy if it doesn't already exist #} + {%- call statement('create_tmp_relation_for_copy', language='sql') -%} + {{ bq_create_table_as(partition_by, True, tmp_relation, source_sql, 'sql') + }} + {%- endcall %} + {%- endif -%} + {%- set partitions_sql -%} + select + {%- for partition in partitions %} + CAST({{ partition }} AS TIMESTAMP){%- if not loop.last -%},{%- endif -%} + {%- endfor %} + from {{ tmp_relation }} + {%- endset -%} + {%- set partitions = run_query(partitions_sql).columns[0].values() -%} + {# We copy the partitions #} + {%- do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) -%} + -- Clean up the temp table + drop table if exists {{ tmp_relation }} +{% endmacro %} + {% macro bq_static_insert_overwrite_sql( tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions ) %} - + {%- if copy_partitions %} + {{ bq_static_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, tmp_relation_exists) }} + {% else -%} {% set predicate -%} {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in ( {{ partitions | join (', ') }} ) {%- endset %} - - {%- set source_sql -%} - ( - {% if partition_by.time_ingestion_partitioning and tmp_relation_exists -%} - select - {{ partition_by.insertable_time_partitioning_field() }}, - * from {{ tmp_relation }} - {% elif tmp_relation_exists -%} - select - * from {{ tmp_relation }} - {%- elif partition_by.time_ingestion_partitioning -%} - {{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }} - {%- else -%} - {{sql}} - {%- endif %} - - ) - {%- endset -%} - - {% if copy_partitions %} - {% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} - {% else %} + {%- set source_sql = bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %} {#-- In case we're putting the model SQL _directly_ into the MERGE statement, we need to prepend the MERGE statement with the user-configured sql_header, @@ -92,8 +118,7 @@ -- 2. clean up the temp table drop table if exists {{ tmp_relation }}; {%- endif -%} - - {% endif %} + {%- endif -%} {% endmacro %} {% macro bq_dynamic_copy_partitions_insert_overwrite_sql( @@ -118,7 +143,7 @@ {% endmacro %} {% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %} - {%- if copy_partitions is true %} + {%- if copy_partitions %} {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} {% else -%} {% set predicate -%} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 02efbb6c2..21f4fab5e 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -336,6 +336,52 @@ }} +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as date) as date_day union all + select 2 as id, cast('2020-01-01' as date) as date_day union all + select 3 as id, cast('2020-01-01' as date) as date_day union all + select 4 as id, cast('2020-01-01' as date) as date_day + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as date) as date_day union all + select 20 as id, cast('2020-01-01' as date) as date_day union all + select 30 as id, cast('2020-01-02' as date) as date_day union all + select 40 as id, cast('2020-01-02' as date) as date_day + + {% endif %} + +) + +select * from data + +{% if is_incremental() %} +where date_day in ({{ config.get("partitions") | join(",") }}) +{% endif %} +-- Test comment to prevent recurrence of https://github.com/dbt-labs/dbt-bigquery/issues/896 +""".lstrip() + +overwrite_copy_partitions_with_partitions_sql = """ +{{ + config( + materialized="incremental", + incremental_strategy='insert_overwrite', + cluster_by="id", + partitions=["CAST('2020-01-02' AS DATE)","'2020-01-02'"], + partition_by={ + "field": "date_day", + "data_type": "date", + "copy_partitions": true + } + ) +}} + + with data as ( {% if not is_incremental() %} diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 1a339d601..def446986 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -22,6 +22,7 @@ overwrite_day_sql, overwrite_day_with_copy_partitions_sql, overwrite_partitions_sql, + overwrite_copy_partitions_with_partitions_sql, overwrite_range_sql, overwrite_time_sql, overwrite_day_with_time_ingestion_sql, @@ -45,6 +46,7 @@ def models(self): "incremental_overwrite_day.sql": overwrite_day_sql, "incremental_overwrite_day_with_copy_partitions.sql": overwrite_day_with_copy_partitions_sql, "incremental_overwrite_partitions.sql": overwrite_partitions_sql, + "incremental_overwrite_copy_partitions_with_partitions.sql": overwrite_copy_partitions_with_partitions_sql, "incremental_overwrite_range.sql": overwrite_range_sql, "incremental_overwrite_time.sql": overwrite_time_sql, "incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql, @@ -78,6 +80,10 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se ("incremental_overwrite_time", "incremental_overwrite_time_expected"), ("incremental_overwrite_date", "incremental_overwrite_date_expected"), ("incremental_overwrite_partitions", "incremental_overwrite_date_expected"), + ( + "incremental_overwrite_copy_partitions_with_partitions", + "incremental_overwrite_date_expected", + ), ("incremental_overwrite_day", "incremental_overwrite_day_expected"), ("incremental_overwrite_range", "incremental_overwrite_range_expected"), (