Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/hashed hubs #246

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions macros/internal/metadata_processing/get_escape_characters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@
{%- macro postgres__get_escape_characters() %}
{%- do return (('"', '"')) -%}
{%- endmacro %}

{%- macro duckdb__get_escape_characters() %}
{%- do return (('"', '"')) -%}
{%- endmacro %}
16 changes: 16 additions & 0 deletions macros/materialisations/period_mat_helpers/get_period_of_load.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,19 @@

{% do return(period_of_load) %}
{%- endmacro -%}


{%- macro duckdb__get_period_of_load(period, offset, start_timestamp) -%}
{# Postgres uses different DateTime arithmetic #}
{% set period_of_load_sql -%}
SELECT DATE_TRUNC('{{ period }}',
TO_TIMESTAMP('{{ start_timestamp }}', 'YYYY-MM-DD HH24:MI:SS') + interval '{{ offset }} {{ period }}'
) AS period_of_load
{%- endset %}

{% set period_of_load_dict = dbtvault.get_query_results_as_dict(period_of_load_sql) %}

{% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %}

{% do return(period_of_load) %}
{%- endmacro -%}
9 changes: 9 additions & 0 deletions macros/staging/null_columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,13 @@
{{ col_name }} AS {{ col_name ~ "_ORIGINAL" }},
COALESCE({{ col_name }}, '{{ default_value }}') AS {{ col_name }}

{%- endmacro -%}

{%- macro duckdb__null_column_sql(col_name, default_value) -%}

{%- set col_name_esc = dbtvault.escape_column_names(col_name) -%}
{%- set col_name_orig_esc = dbtvault.escape_column_names(col_name ~ "_ORIGINAL") -%}
{{ col_name_esc }} AS {{ col_name_orig_esc }},
COALESCE({{ col_name_esc }}, '{{ default_value }}') AS {{ col_name_esc }}

{%- endmacro -%}
8 changes: 7 additions & 1 deletion macros/supporting/casting/cast_date.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,10 @@

{%- if alias %} AS {{ alias }} {%- endif %}

{%- endmacro -%}
{%- endmacro -%}

{%- macro duckdb__cast_date(column_str, as_string=false, datetime=false, alias=none) -%}

{{ dbtvault.snowflake__cast_date(column_str=column_str, as_string=as_string, datetime=datetime, alias=alias)}}

{%- endmacro -%}
10 changes: 9 additions & 1 deletion macros/supporting/casting/cast_datetime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,12 @@

{%- if alias %} AS {{ alias }} {%- endif %}

{%- endmacro -%}
{%- endmacro -%}

{%- macro duckdb__cast_datetime(column_str, as_string=false, alias=none, date_type=none) -%}

to_char(timestamp {{ column_str }}, 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp

{%- if alias %} AS {{ alias }} {%- endif %}

{%- endmacro -%}
88 changes: 88 additions & 0 deletions macros/supporting/hash.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,92 @@

{{ automate_dv.default__hash(columns=columns, alias=alias, is_hashdiff=is_hashdiff, columns_to_escape=columns_to_escape) }}

{%- endmacro -%}

{%- macro duckdb__hash(columns, alias, is_hashdiff, columns_to_escape=none) -%}

{%- set hash = var('hash', 'MD5') -%}
{%- set concat_string = var('concat_string', '||') -%}
{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%}

{#- Select hashing algorithm -#}
{%- if hash == 'MD5' -%}
{%- set hash_alg = 'MD5' -%}
{%- elif hash == 'SHA' -%}
{%- set hash_alg = 'SHA256' -%}
{%- else -%}
{%- set hash_alg = 'MD5' -%}
{%- endif -%}

{#- Select hashing expression (left and right sides) -#}
{#- * MD5 is simple function call to md5(val) -#}
{#- * SHA256 needs input cast to BYTEA and then its BYTEA result encoded as hex text output -#}
{#- e.g. ENCODE(SHA256(CAST(val AS BYTEA)), 'hex') -#}
{#- Ref: https://www.postgresql.org/docs/11/functions-binarystring.html -#}
{%- if hash_alg == 'MD5' -%}
{%- set hash_expr_left = 'MD5(' -%}
{%- set hash_expr_right = ')' -%}
{%- elif hash_alg == 'SHA256' -%}
{%- set hash_expr_left = 'ENCODE(SHA256(CAST(' -%}
{%- set hash_expr_right = " AS BYTEA)), 'hex')" -%}
{%- endif -%}

{%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS VARCHAR))), '')" -%}

{#- Alpha sort columns before hashing if a hashdiff -#}
{%- if is_hashdiff and automate_dv.is_list(columns) -%}
{%- set columns = columns|sort -%}
{%- endif -%}

{#- If single column to hash -#}
{%- if columns is string -%}
{%- set column_str = automate_dv.as_constant(columns) -%}
{%- if automate_dv.is_expression(column_str) -%}
{%- set escaped_column_str = column_str -%}
{%- else -%}
{%- set escaped_column_str = automate_dv.escape_column_names(column_str) -%}
{%- endif -%}

{{- "CAST(UPPER({}{}{}) AS BYTEA) AS {}".format(hash_expr_left, standardise | replace('[EXPRESSION]', escaped_column_str), hash_expr_right, automate_dv.escape_column_names(alias)) | indent(4) -}}

{#- Else a list of columns to hash -#}
{%- else -%}
{%- set all_null = [] -%}

{%- if is_hashdiff -%}
{{- "CAST(UPPER({}CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}}
{%- else -%}
{{- "CAST(UPPER({}NULLIF(CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}}
{%- endif -%}

{%- for column in columns -%}

{%- do all_null.append(null_placeholder_string) -%}

{%- set column_str = automate_dv.as_constant(column) -%}
{%- if automate_dv.is_expression(column_str) -%}
{%- set escaped_column_str = column_str -%}
{%- else -%}
{%- set escaped_column_str = automate_dv.escape_column_names(column_str) -%}
{%- endif -%}

{{- "\nCOALESCE({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}}
{{- "," if not loop.last -}}

{%- if loop.last -%}

{% if is_hashdiff %}
{{- "\n){}) AS BYTEA) AS {}".format(hash_expr_right, automate_dv.escape_column_names(alias)) -}}
{%- else -%}
{{- "\n), '{}'){}) AS BYTEA) AS {}".format(all_null | join(""), hash_expr_right, automate_dv.escape_column_names(alias)) -}}
{%- endif -%}
{%- else -%}

{%- do all_null.append(concat_string) -%}

{%- endif -%}
{%- endfor -%}

{%- endif -%}

{%- endmacro -%}
89 changes: 89 additions & 0 deletions macros/tables/duckdb/hub.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{%- macro duckdb__hub(src_pk, src_nk, src_extra_columns, src_ldts, src_source, source_model) -%}

{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%}

{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + automate_dv.escape_column_names([config.get('rank_column')]) -%}
{%- endif -%}

{{ automate_dv.prepend_generated_by() }}

{{ 'WITH ' -}}

{%- if not (source_model is iterable and source_model is not string) -%}
{%- set source_model = [source_model] -%}
{%- endif -%}

{%- set ns = namespace(last_cte= "") -%}

{%- for src in source_model -%}

{%- set source_number = loop.index | string -%}

row_rank_{{ source_number }} AS (
{#- PostgreSQL has DISTINCT ON which should be more performant than the
strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ...
-#}
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'rr') }}) {{ automate_dv.prefix(source_cols_with_rank, 'rr') }}
{%- else %}
SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'rr') }}) {{ automate_dv.prefix(source_cols, 'rr') }}
{%- endif %}
FROM {{ ref(src) }} AS rr
WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }}
ORDER BY {{ automate_dv.prefix([src_pk], 'rr') }}, {{ automate_dv.prefix([src_ldts], 'rr') }}
{%- set ns.last_cte = "row_rank_{}".format(source_number) %}
),{{ "\n" if not loop.last }}
{% endfor -%}
{% if source_model | length > 1 %}
stage_union AS (
{%- for src in source_model %}
SELECT * FROM row_rank_{{ loop.index | string }}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{%- endfor %}
{%- set ns.last_cte = "stage_union" %}
),
{%- endif -%}
{%- if model.config.materialized == 'vault_insert_by_period' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __PERIOD_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __RANK_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- endif -%}
{%- if source_model | length > 1 %}

row_rank_union AS (
{#- PostgreSQL has DISTINCT ON which should be more performant than the
strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ...
-#}
SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'ru') }}) ru.*
FROM {{ ns.last_cte }} AS ru
WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }}
ORDER BY {{ automate_dv.prefix([src_pk], 'ru') }}, {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC
{%- set ns.last_cte = "row_rank_union" %}
),
{% endif %}
records_to_insert AS (
SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if automate_dv.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }}
WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }}
{%- endif %}
)

SELECT * FROM records_to_insert

{%- endmacro -%}
100 changes: 100 additions & 0 deletions macros/tables/duckdb/link.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{%- macro duckdb__link(src_pk, src_fk, src_extra_columns, src_ldts, src_source, source_model) -%}

{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%}
{%- set fk_cols = automate_dv.expand_column_list([src_fk]) -%}

{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + automate_dv.escape_column_names([config.get('rank_column')]) -%}
{%- endif -%}

{{ automate_dv.prepend_generated_by() }}

{{ 'WITH ' -}}

{%- if not (source_model is iterable and source_model is not string) -%}
{%- set source_model = [source_model] -%}
{%- endif -%}

{%- set ns = namespace(last_cte= "") -%}

{%- for src in source_model -%}

{%- set source_number = loop.index | string -%}

row_rank_{{ source_number }} AS (
SELECT * FROM (
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT {{ automate_dv.prefix(source_cols_with_rank, 'rr') }},
{%- else %}
SELECT {{ automate_dv.prefix(source_cols, 'rr') }},
{%- endif %}
ROW_NUMBER() OVER(
PARTITION BY {{ automate_dv.prefix([src_pk], 'rr') }}
ORDER BY {{ automate_dv.prefix([src_ldts], 'rr') }}
) AS row_number
FROM {{ ref(src) }} AS rr
{%- if source_model | length == 1 %}
WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }}
AND {{ automate_dv.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }}
{%- endif %}
) as l
WHERE row_number = 1
{%- set ns.last_cte = "row_rank_{}".format(source_number) %}
),{{ "\n" if not loop.last }}
{% endfor -%}
{% if source_model | length > 1 %}
stage_union AS (
{%- for src in source_model %}
SELECT * FROM row_rank_{{ loop.index | string }}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{%- endfor %}
{%- set ns.last_cte = "stage_union" %}
),
{%- endif -%}
{%- if model.config.materialized == 'vault_insert_by_period' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __PERIOD_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __RANK_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{% endif %}
{%- if source_model | length > 1 %}

row_rank_union AS (
SELECT * FROM (
SELECT ru.*,
ROW_NUMBER() OVER(
PARTITION BY {{ automate_dv.prefix([src_pk], 'ru') }}
ORDER BY {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC
) AS row_rank_number
FROM {{ ns.last_cte }} AS ru
WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }}
AND {{ automate_dv.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }}
) AS a
WHERE row_rank_number = 1
{%- set ns.last_cte = "row_rank_union" %}
),
{% endif %}
records_to_insert AS (
SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if automate_dv.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }}
WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }}
{%- endif %}
)

SELECT * FROM records_to_insert

{%- endmacro -%}
Loading