Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow use of multiple column unique keys in snapshots #326

Merged
merged 17 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240422-081302.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allows unique_key for snapshots to take a list
time: 2024-04-22T08:13:02.937534-04:00
custom:
Author: agpapa
Issue: "181"
11 changes: 11 additions & 0 deletions dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Dict,
FrozenSet,
Iterator,
List,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -341,6 +342,16 @@ def create(
)
return cls.from_dict(kwargs)

@classmethod
def scd_args(cls: Type[Self], primary_key: Union[str, List[str]], updated_at) -> List[str]:
scd_args = []
if isinstance(primary_key, list):
scd_args.extend(primary_key)
else:
scd_args.append(primary_key)
scd_args.append(updated_at)
return scd_args

@property
def can_be_renamed(self) -> bool:
return self.type in self.renameable_relations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

select *, {{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
where
{% if config.get('dbt_valid_to_current') %}
Expand All @@ -65,9 +63,7 @@

insertions_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
select *, {{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }},
Expand All @@ -78,9 +74,7 @@

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
select *, {{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
Expand All @@ -92,9 +86,7 @@

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
select *, {{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}
Expand All @@ -106,13 +98,11 @@
source_data.*

from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and {{ strategy.row_changed }})

)

),
Expand All @@ -125,7 +115,8 @@
snapshotted_data.{{ columns.dbt_scd_id }}

from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
{{ strategy.row_changed }}
)
Expand All @@ -145,8 +136,9 @@
snapshotted_data.{{ columns.dbt_scd_id }}

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

Expand Down Expand Up @@ -217,8 +209,51 @@
{% endif %}
{% endmacro %}


{% macro get_dbt_valid_to_current(strategy, columns) %}
{% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "null" %}
coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}})
as {{ columns.dbt_valid_to }}
{% endmacro %}


{% macro unique_key_fields(unique_key) %}
{% if unique_key | is_list %}
{% for key in unique_key %}
{{ key }} as dbt_unique_key_{{ loop.index }}
{%- if not loop.last %} , {%- endif %}
{% endfor %}
{% else %}
{{ unique_key }} as dbt_unique_key
{% endif %}
{% endmacro %}


{% macro unique_key_join_on(unique_key, identifier, from_identifier) %}
{% if strategy.unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ identifier }}.dbt_unique_key_{{ loop.index }} = {{ from_identifier }}.dbt_unique_key_{{ loop.index }}
{%- if not loop.last %} and {%- endif %}
{% endfor %}
{% else %}
{{ identifier }}.dbt_unique_key = {{ from_identifier }}.dbt_unique_key
{% endif %}
{% endmacro %}


{% macro unique_key_is_null(unique_key, identifier) %}
{% if unique_key | is_list %}
{{ identifier }}.dbt_unique_key_1 is null
{% else %}
{{ identifer }}.dbt_unique_key is null
{% endif %}
{% endmacro %}


{% macro unique_key_is_not_null(unique_key, identifier) %}
{% if unique_key | is_list %}
{{ identifier }}.dbt_unique_key_1 is not null
{% else %}
{{ identifer }}.dbt_unique_key is not null
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,22 @@
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% set quoted_source_columns = [] %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }})
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
{% set scd_args = api.Relation.scd_args(primary_key, updated_at) %}
{% set scd_id_expr = snapshot_hash_arguments(scd_args) %}

{% do return({
"unique_key": primary_key,
Expand Down Expand Up @@ -166,7 +167,8 @@
)
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
{% set scd_args = api.Relation.scd_args(primary_key, updated_at) %}
{% set scd_id_expr = snapshot_hash_arguments(scd_args) %}

{% do return({
"unique_key": primary_key,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]
dependencies = [
"dbt-common>=1.10,<2.0",
"dbt-common>=1.11,<2.0",
"pytz>=2015.7",
# installed via dbt-common but used directly
"agate>=1.0,<2.0",
Expand Down
Loading