From a3a29ac5fa5063b24286e3bbc9a86a8c6ba0ccff Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Mon, 16 Sep 2024 17:37:02 +0400 Subject: [PATCH 01/18] remove unused imports --- tests/load/pipeline/test_scd2.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index c75ff4d3e6..6a2a7c5466 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -9,13 +9,11 @@ from dlt.common.typing import TAnyDateTime from dlt.common.pendulum import pendulum from dlt.common.pipeline import LoadInfo -from dlt.common.schema.exceptions import ColumnNameConflictException from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES from dlt.common.normalizers.json.relational import DataItemNormalizer from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention from dlt.common.time import ensure_pendulum_datetime, reduce_pendulum_datetime_precision from dlt.extract.resource import DltResource -from dlt.pipeline.exceptions import PipelineStepFailed from tests.cases import arrow_table_all_data_types from tests.load.utils import ( From aa6298339f2cf70cafefd69b042b98bb41da25f0 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Mon, 16 Sep 2024 17:38:16 +0400 Subject: [PATCH 02/18] add scd2 retire_if_absent option --- dlt/destinations/sql_jobs.py | 19 +++++-- dlt/extract/hints.py | 41 +++++++++----- tests/load/pipeline/test_scd2.py | 96 ++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 18 deletions(-) diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 13677e01b3..8376c9a8d6 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -755,17 +755,26 @@ def gen_scd2_sql( active_record_timestamp = get_active_record_timestamp(root_table) if active_record_timestamp is None: active_record_literal = "NULL" - is_active_clause = f"{to} IS NULL" + is_active = f"{to} IS NULL" else: # it's a datetime active_record_literal = format_datetime_literal( active_record_timestamp, caps.timestamp_precision ) - is_active_clause = f"{to} = {active_record_literal}" + is_active = f"{to} = {active_record_literal}" - # retire updated and deleted records + retire_if_absent = root_table.get("x-retire-if-absent", True) + if retire_if_absent: + dummy_clause = caps.escape_literal(True) + else: + nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key")) + nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" + + # retire records + # always retire updated records, retire deleted records only if `retire_if_absent` sql.append(f""" {cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal} - WHERE {is_active_clause} + WHERE {is_active} + AND {dummy_clause if retire_if_absent else nk_present} AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name}); """) @@ -776,7 +785,7 @@ def gen_scd2_sql( INSERT INTO {root_table_name} ({col_str}, {from_}, {to}) SELECT {col_str}, {boundary_literal} AS {from_}, {active_record_literal} AS {to} FROM {staging_root_table_name} AS s - WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause}); + WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active}); """) # insert list elements for new active records in nested tables diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 037ebbddf9..7a30812e74 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -452,10 +452,18 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: md_dict: TMergeDispositionDict = dict_.pop("write_disposition") if merge_strategy := md_dict.get("strategy"): dict_["x-merge-strategy"] = merge_strategy - if "boundary_timestamp" in md_dict: - dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] - # add columns for `scd2` merge strategy + if merge_strategy == "scd2": + if "boundary_timestamp" in md_dict: + dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] + if "retire_if_absent" in md_dict: + dict_["x-retire-if-absent"] = md_dict["retire_if_absent"] # type: ignore[typeddict-item] + if "natural_key" in md_dict: + nk = md_dict["natural_key"] # type: ignore[typeddict-item] + if nk in dict_["columns"]: + dict_["columns"][nk]["x-natural-key"] = True + else: + dict_["columns"][nk] = {"name": nk, "x-natural-key": True} if md_dict.get("validity_column_names") is None: from_, to = DEFAULT_VALIDITY_COLUMN_NAMES else: @@ -523,13 +531,20 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}.""" ) - for ts in ("active_record_timestamp", "boundary_timestamp"): - if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None: - continue # None is allowed for active_record_timestamp - if ts in wd: - try: - ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] - except Exception: - raise ValueError( - f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] - ) + if wd.get("strategy") == "scd2": + for ts in ("active_record_timestamp", "boundary_timestamp"): + if ( + ts == "active_record_timestamp" + and wd.get("active_record_timestamp") is None + ): + continue # None is allowed for active_record_timestamp + if ts in wd: + try: + ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] + except Exception: + raise ValueError( + f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] + ) + + if "retire_if_absent" in wd and not wd["retire_if_absent"] and "natural_key" not in wd: # type: ignore[typeddict-item] + raise ValueError("`natural_key` is required when `retire_if_absent=False`") diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 6a2a7c5466..3ac30e3664 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -715,6 +715,102 @@ def r(data): ) +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["duckdb"]), + ids=lambda x: x.name, +) +def test_retire_if_absent( + destination_config: DestinationTestConfiguration, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) + + @dlt.resource( # type: ignore[call-overload] + table_name="dim_test", + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": False, + "natural_key": "nk", + }, + ) + def r(data): + yield data + + # load 1 — initial load + dim_snap = [ + {"nk": 1, "foo": "foo"}, + {"nk": 2, "foo": "foo"}, + ] + info = p.run(r(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + _, to = DEFAULT_VALIDITY_COLUMN_NAMES + # both records should be active (i.e. not retired) + assert [row[to] for row in get_table(p, "dim_test")] == [None, None] + + # load 2 — natural key 2 is absent, natural key 1 is unchanged + dim_snap = [ + {"nk": 1, "foo": "foo"}, + ] + info = p.run(r(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + # both records should still be active + assert [row[to] for row in get_table(p, "dim_test")] == [None, None] + + # load 3 — natural key 2 is absent, natural key 1 has changed + dim_snap = [ + {"nk": 1, "foo": "bar"}, + ] + info = p.run(r(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 3 + boundary_ts = get_load_package_created_at(p, info) + # natural key 1 should now have two records (one retired, one active) + actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")] + expected = [{"nk": 1, to: boundary_ts}, {"nk": 1, to: None}, {"nk": 2, to: None}] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + # now test various configs + + with pytest.raises(ValueError): + # should raise because `natural_key` is required when `retire_if_absent=False` + r.apply_hints( + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": False, + } + ) + + # `retire_if_absent=True` does not require `natural_key` + r.apply_hints( + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": True, + } + ) + assert r.compute_table_schema()["x-retire-if-absent"] + + # user-provided hints for `natural_key` column should be respected + r.apply_hints( + columns={"nk": {"x-foo": "foo"}}, + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": False, + "natural_key": "nk", + }, + ) + assert r.compute_table_schema()["columns"]["nk"] == { + "x-foo": "foo", + "name": "nk", + "x-natural-key": True, + } + + @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), From bcbd0c76a69791ae48ab42daa81264a5646e04fc Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Mon, 16 Sep 2024 19:23:23 +0400 Subject: [PATCH 03/18] rewrite scd2 retire logic --- dlt/destinations/sql_jobs.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 8376c9a8d6..6e71458866 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -762,21 +762,20 @@ def gen_scd2_sql( ) is_active = f"{to} = {active_record_literal}" - retire_if_absent = root_table.get("x-retire-if-absent", True) - if retire_if_absent: - dummy_clause = caps.escape_literal(True) - else: - nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key")) - nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" - # retire records # always retire updated records, retire deleted records only if `retire_if_absent` - sql.append(f""" + retire_sql = f""" {cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal} WHERE {is_active} - AND {dummy_clause if retire_if_absent else nk_present} AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name}); - """) + """ + retire_if_absent = root_table.get("x-retire-if-absent", True) + if not retire_if_absent: + retire_sql = retire_sql.rstrip()[:-1] # remove semicolon + nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key")) + nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" + retire_sql += f" AND {nk_present};" + sql.append(retire_sql) # insert new active records in root table columns = map(escape_column_id, list(root_table["columns"].keys())) From 70d036d3c159f2569fabd3d24e2fffc201b8a82f Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Tue, 17 Sep 2024 15:31:45 +0400 Subject: [PATCH 04/18] include new keys in typing --- dlt/common/schema/typing.py | 2 ++ dlt/extract/hints.py | 10 +++++++--- tests/load/pipeline/test_scd2.py | 6 +++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 8707202f59..855f26706a 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -237,6 +237,8 @@ class TMergeDispositionDict(TWriteDispositionDict, total=False): active_record_timestamp: Optional[TAnyDateTime] boundary_timestamp: Optional[TAnyDateTime] row_version_column_name: Optional[str] + retire_if_absent: Optional[bool] + natural_key: Optional[str] TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict] diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 7a30812e74..22a7574796 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -457,9 +457,9 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: if "boundary_timestamp" in md_dict: dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] if "retire_if_absent" in md_dict: - dict_["x-retire-if-absent"] = md_dict["retire_if_absent"] # type: ignore[typeddict-item] + dict_["x-retire-if-absent"] = md_dict["retire_if_absent"] if "natural_key" in md_dict: - nk = md_dict["natural_key"] # type: ignore[typeddict-item] + nk = md_dict["natural_key"] if nk in dict_["columns"]: dict_["columns"][nk]["x-natural-key"] = True else: @@ -546,5 +546,9 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] ) - if "retire_if_absent" in wd and not wd["retire_if_absent"] and "natural_key" not in wd: # type: ignore[typeddict-item] + if ( + "retire_if_absent" in wd + and not wd["retire_if_absent"] + and "natural_key" not in wd + ): raise ValueError("`natural_key` is required when `retire_if_absent=False`") diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 3ac30e3664..8bbf30a47f 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -725,7 +725,7 @@ def test_retire_if_absent( ) -> None: p = destination_config.setup_pipeline("abstract", dev_mode=True) - @dlt.resource( # type: ignore[call-overload] + @dlt.resource( table_name="dim_test", write_disposition={ "disposition": "merge", @@ -792,11 +792,11 @@ def r(data): "retire_if_absent": True, } ) - assert r.compute_table_schema()["x-retire-if-absent"] + assert r.compute_table_schema()["x-retire-if-absent"] # type: ignore[typeddict-item] # user-provided hints for `natural_key` column should be respected r.apply_hints( - columns={"nk": {"x-foo": "foo"}}, + columns={"nk": {"x-foo": "foo"}}, # type: ignore[typeddict-unknown-key] write_disposition={ "disposition": "merge", "strategy": "scd2", From 5cfc5c8df3a38c14a52dd75534526d320fa342cb Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Tue, 17 Sep 2024 15:55:03 +0400 Subject: [PATCH 05/18] finetune scd2 typing --- dlt/common/schema/typing.py | 9 +++++++-- dlt/extract/hints.py | 3 +++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 855f26706a..e735190c92 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -231,8 +231,11 @@ class TWriteDispositionDict(TypedDict): disposition: TWriteDisposition -class TMergeDispositionDict(TWriteDispositionDict, total=False): +class TMergeDispositionDict(TWriteDispositionDict): strategy: Optional[TLoaderMergeStrategy] + + +class TScd2StrategyDict(TMergeDispositionDict, total=False): validity_column_names: Optional[List[str]] active_record_timestamp: Optional[TAnyDateTime] boundary_timestamp: Optional[TAnyDateTime] @@ -241,7 +244,9 @@ class TMergeDispositionDict(TWriteDispositionDict, total=False): natural_key: Optional[str] -TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict] +TWriteDispositionConfig = Union[ + TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict +] class _TTableSchemaBase(TTableProcessingHints, total=False): diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 22a7574796..f875c31634 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -12,6 +12,7 @@ TTableSchemaColumns, TWriteDispositionConfig, TMergeDispositionDict, + TScd2StrategyDict, TAnySchemaColumns, TTableFormat, TSchemaContract, @@ -454,6 +455,7 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: dict_["x-merge-strategy"] = merge_strategy if merge_strategy == "scd2": + md_dict = cast(TScd2StrategyDict, md_dict) if "boundary_timestamp" in md_dict: dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] if "retire_if_absent" in md_dict: @@ -532,6 +534,7 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf ) if wd.get("strategy") == "scd2": + wd = cast(TScd2StrategyDict, wd) for ts in ("active_record_timestamp", "boundary_timestamp"): if ( ts == "active_record_timestamp" From 54dfe148f0cceea00ca4c259b6444490ce4ff1f9 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Tue, 17 Sep 2024 16:15:42 +0400 Subject: [PATCH 06/18] update typeddict validation test --- tests/common/test_validation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 0ecbbea89d..3f8ccfc20f 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -334,8 +334,8 @@ def test_typeddict_friendly_exceptions() -> None: wrong_dict["write_disposition"] = {"strategy": "scd2"} validate_dict(EndpointResource, wrong_dict, ".") print(e.value) - # Union of 3 types and callable - assert len(e.value.nested_exceptions) == 4 + # Union of 4 types and callable + assert len(e.value.nested_exceptions) == 5 # this has wrong disposition string with pytest.raises(DictValidationException) as e: @@ -343,8 +343,8 @@ def test_typeddict_friendly_exceptions() -> None: wrong_dict["write_disposition"] = "unknown" # type: ignore[assignment] validate_dict(EndpointResource, wrong_dict, ".") print(e.value) - # Union of 3 types and callable - assert len(e.value.nested_exceptions) == 4 + # Union of 4 types and callable + assert len(e.value.nested_exceptions) == 5 # this has wrong nested type with pytest.raises(DictValidationException) as e: From c28f8ba4d3984aa5711085854c687a90f2f4dd8a Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Wed, 18 Sep 2024 23:20:07 +0400 Subject: [PATCH 07/18] rename to retire_absent_rows --- dlt/common/schema/typing.py | 2 +- dlt/destinations/sql_jobs.py | 6 +++--- dlt/extract/hints.py | 10 +++++----- tests/load/pipeline/test_scd2.py | 16 ++++++++-------- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index e735190c92..622f34bbc9 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -240,7 +240,7 @@ class TScd2StrategyDict(TMergeDispositionDict, total=False): active_record_timestamp: Optional[TAnyDateTime] boundary_timestamp: Optional[TAnyDateTime] row_version_column_name: Optional[str] - retire_if_absent: Optional[bool] + retire_absent_rows: Optional[bool] natural_key: Optional[str] diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 6e71458866..34fa928a2a 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -763,14 +763,14 @@ def gen_scd2_sql( is_active = f"{to} = {active_record_literal}" # retire records - # always retire updated records, retire deleted records only if `retire_if_absent` + # always retire updated records, retire deleted records only if `retire_absent_rows` retire_sql = f""" {cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal} WHERE {is_active} AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name}); """ - retire_if_absent = root_table.get("x-retire-if-absent", True) - if not retire_if_absent: + retire_absent_rows = root_table.get("x-retire-absent-rows", True) + if not retire_absent_rows: retire_sql = retire_sql.rstrip()[:-1] # remove semicolon nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key")) nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index f875c31634..0434858b6f 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -458,8 +458,8 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: md_dict = cast(TScd2StrategyDict, md_dict) if "boundary_timestamp" in md_dict: dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] - if "retire_if_absent" in md_dict: - dict_["x-retire-if-absent"] = md_dict["retire_if_absent"] + if "retire_absent_rows" in md_dict: + dict_["x-retire-absent-rows"] = md_dict["retire_absent_rows"] if "natural_key" in md_dict: nk = md_dict["natural_key"] if nk in dict_["columns"]: @@ -550,8 +550,8 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf ) if ( - "retire_if_absent" in wd - and not wd["retire_if_absent"] + "retire_absent_rows" in wd + and not wd["retire_absent_rows"] and "natural_key" not in wd ): - raise ValueError("`natural_key` is required when `retire_if_absent=False`") + raise ValueError("`natural_key` is required when `retire_absent_rows=False`") diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 8bbf30a47f..bde012bdf2 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -720,7 +720,7 @@ def r(data): destinations_configs(default_sql_configs=True, subset=["duckdb"]), ids=lambda x: x.name, ) -def test_retire_if_absent( +def test_retire_absent_rows( destination_config: DestinationTestConfiguration, ) -> None: p = destination_config.setup_pipeline("abstract", dev_mode=True) @@ -730,7 +730,7 @@ def test_retire_if_absent( write_disposition={ "disposition": "merge", "strategy": "scd2", - "retire_if_absent": False, + "retire_absent_rows": False, "natural_key": "nk", }, ) @@ -775,24 +775,24 @@ def r(data): # now test various configs with pytest.raises(ValueError): - # should raise because `natural_key` is required when `retire_if_absent=False` + # should raise because `natural_key` is required when `retire_absent_rows=False` r.apply_hints( write_disposition={ "disposition": "merge", "strategy": "scd2", - "retire_if_absent": False, + "retire_absent_rows": False, } ) - # `retire_if_absent=True` does not require `natural_key` + # `retire_absent_rows=True` does not require `natural_key` r.apply_hints( write_disposition={ "disposition": "merge", "strategy": "scd2", - "retire_if_absent": True, + "retire_absent_rows": True, } ) - assert r.compute_table_schema()["x-retire-if-absent"] # type: ignore[typeddict-item] + assert r.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item] # user-provided hints for `natural_key` column should be respected r.apply_hints( @@ -800,7 +800,7 @@ def r(data): write_disposition={ "disposition": "merge", "strategy": "scd2", - "retire_if_absent": False, + "retire_absent_rows": False, "natural_key": "nk", }, ) From 7d1aad9280f62972c1bb67c9383fb2ac774d288b Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Wed, 18 Sep 2024 23:34:48 +0400 Subject: [PATCH 08/18] add reinsert test case --- tests/load/pipeline/test_scd2.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index bde012bdf2..8d6de0ca22 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -766,10 +766,24 @@ def r(data): info = p.run(r(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 3 - boundary_ts = get_load_package_created_at(p, info) + ts3 = get_load_package_created_at(p, info) # natural key 1 should now have two records (one retired, one active) actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")] - expected = [{"nk": 1, to: boundary_ts}, {"nk": 1, to: None}, {"nk": 2, to: None}] + expected = [{"nk": 1, to: ts3}, {"nk": 1, to: None}, {"nk": 2, to: None}] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + # load 4 — natural key 2 is absent, natural key 1 has changed back to + # initial version + dim_snap = [ + {"nk": 1, "foo": "foo"}, + ] + info = p.run(r(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 4 + ts4 = get_load_package_created_at(p, info) + # natural key 1 should now have three records (two retired, one active) + actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")] + expected = [{"nk": 1, to: ts3}, {"nk": 1, to: ts4}, {"nk": 1, to: None}, {"nk": 2, to: None}] assert_records_as_set(actual, expected) # type: ignore[arg-type] # now test various configs From a0aa99c0b539dc8a3a101d42e5b034c6ab76f11e Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Thu, 19 Sep 2024 11:36:54 +0400 Subject: [PATCH 09/18] replace natural_key with merge_key --- .../impl/clickhouse/clickhouse.py | 2 +- dlt/destinations/sql_jobs.py | 11 ++- dlt/extract/hints.py | 9 ++- tests/load/pipeline/test_scd2.py | 75 +++++++++++++------ 4 files changed, 66 insertions(+), 31 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index b6f23ee221..04273919c6 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -195,7 +195,7 @@ def gen_key_table_clauses( @classmethod def gen_update_table_prefix(cls, table_name: str) -> str: - return f"ALTER TABLE {table_name} UPDATE" + return f"ALTER TABLE {table_name} UPDATE AS d" @classmethod def requires_temp_table_for_delete(cls) -> bool: diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 34fa928a2a..7f4622e406 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -367,7 +367,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: @classmethod def gen_update_table_prefix(cls, table_name: str) -> str: - return f"UPDATE {table_name} SET" + return f"UPDATE {table_name} AS d SET" @classmethod def requires_temp_table_for_delete(cls) -> bool: @@ -772,8 +772,13 @@ def gen_scd2_sql( retire_absent_rows = root_table.get("x-retire-absent-rows", True) if not retire_absent_rows: retire_sql = retire_sql.rstrip()[:-1] # remove semicolon - nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key")) - nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" + # merge keys act as natural key + merge_keys = cls._escape_list( + get_columns_names_with_prop(root_table, "merge_key"), + escape_column_id, + ) + keys_equal = cls._gen_key_table_clauses([], merge_keys)[0].format(d="d", s="s") + nk_present = f"EXISTS (SELECT 1 FROM {staging_root_table_name} AS s WHERE {keys_equal})" retire_sql += f" AND {nk_present};" sql.append(retire_sql) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 0434858b6f..dfa01eabcf 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -353,7 +353,7 @@ def _set_hints( self, hints_template: TResourceHints, create_table_variant: bool = False ) -> None: DltResourceHints.validate_dynamic_hints(hints_template) - DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition")) + DltResourceHints.validate_write_disposition_hint(hints_template) if create_table_variant: table_name: str = hints_template["name"] # type: ignore[assignment] # incremental cannot be specified in variant @@ -524,7 +524,8 @@ def validate_dynamic_hints(template: TResourceHints) -> None: ) @staticmethod - def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConfig]) -> None: + def validate_write_disposition_hint(template: TResourceHints) -> None: + wd = template.get("write_disposition") if isinstance(wd, dict) and wd["disposition"] == "merge": wd = cast(TMergeDispositionDict, wd) if "strategy" in wd and wd["strategy"] not in MERGE_STRATEGIES: @@ -552,6 +553,6 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf if ( "retire_absent_rows" in wd and not wd["retire_absent_rows"] - and "natural_key" not in wd + and template.get("merge_key") is None ): - raise ValueError("`natural_key` is required when `retire_absent_rows=False`") + raise ValueError("`merge_key` is required when `retire_absent_rows=False`") diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 8d6de0ca22..8948c21653 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -715,9 +715,10 @@ def r(data): ) +@pytest.mark.essential @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, subset=["duckdb"]), + destinations_configs(default_sql_configs=True, supports_merge=True), ids=lambda x: x.name, ) def test_retire_absent_rows( @@ -726,15 +727,14 @@ def test_retire_absent_rows( p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource( - table_name="dim_test", + merge_key="nk", write_disposition={ "disposition": "merge", "strategy": "scd2", "retire_absent_rows": False, - "natural_key": "nk", }, ) - def r(data): + def dim_test(data): yield data # load 1 — initial load @@ -742,7 +742,7 @@ def r(data): {"nk": 1, "foo": "foo"}, {"nk": 2, "foo": "foo"}, ] - info = p.run(r(dim_snap), **destination_config.run_kwargs) + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 2 _, to = DEFAULT_VALIDITY_COLUMN_NAMES @@ -753,7 +753,7 @@ def r(data): dim_snap = [ {"nk": 1, "foo": "foo"}, ] - info = p.run(r(dim_snap), **destination_config.run_kwargs) + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 2 # both records should still be active @@ -763,7 +763,7 @@ def r(data): dim_snap = [ {"nk": 1, "foo": "bar"}, ] - info = p.run(r(dim_snap), **destination_config.run_kwargs) + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 3 ts3 = get_load_package_created_at(p, info) @@ -777,7 +777,7 @@ def r(data): dim_snap = [ {"nk": 1, "foo": "foo"}, ] - info = p.run(r(dim_snap), **destination_config.run_kwargs) + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 4 ts4 = get_load_package_created_at(p, info) @@ -789,40 +789,69 @@ def r(data): # now test various configs with pytest.raises(ValueError): - # should raise because `natural_key` is required when `retire_absent_rows=False` - r.apply_hints( + # should raise because `merge_key` is required when `retire_absent_rows=False` + dim_test.apply_hints( + merge_key="", write_disposition={ "disposition": "merge", "strategy": "scd2", "retire_absent_rows": False, - } + }, ) - # `retire_absent_rows=True` does not require `natural_key` - r.apply_hints( + # `retire_absent_rows=True` does not require `merge_key` + dim_test.apply_hints( write_disposition={ "disposition": "merge", "strategy": "scd2", "retire_absent_rows": True, } ) - assert r.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item] + assert dim_test.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item] - # user-provided hints for `natural_key` column should be respected - r.apply_hints( - columns={"nk": {"x-foo": "foo"}}, # type: ignore[typeddict-unknown-key] + # test compound `merge_key` + + @dlt.resource( + merge_key=["first_name", "last_name"], write_disposition={ "disposition": "merge", "strategy": "scd2", "retire_absent_rows": False, - "natural_key": "nk", }, ) - assert r.compute_table_schema()["columns"]["nk"] == { - "x-foo": "foo", - "name": "nk", - "x-natural-key": True, - } + def dim_test_compound(data): + yield data + + # load 1 — initial load + dim_snap = [ + {"first_name": "John", "last_name": "Doe", "age": 20}, + {"first_name": "John", "last_name": "Dodo", "age": 20}, + ] + info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2 + # both records should be active (i.e. not retired) + assert [row[to] for row in get_table(p, "dim_test_compound")] == [None, None] + + # load 2 — natural key "John" + "Dodo" is absent, natural key "John" + "Doe" has changed + dim_snap = [ + {"first_name": "John", "last_name": "Doe", "age": 30}, + ] + info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 3 + ts3 = get_load_package_created_at(p, info) + # natural key "John" + "Doe" should now have two records (one retired, one active) + actual = [ + {k: v for k, v in row.items() if k in ("first_name", "last_name", to)} + for row in get_table(p, "dim_test_compound") + ] + expected = [ + {"first_name": "John", "last_name": "Doe", to: ts3}, + {"first_name": "John", "last_name": "Doe", to: None}, + {"first_name": "John", "last_name": "Dodo", to: None}, + ] + assert_records_as_set(actual, expected) # type: ignore[arg-type] @pytest.mark.parametrize( From 65838e498100f47c260d866b0b3f1f97bd3cb9bb Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Thu, 19 Sep 2024 17:17:04 +0400 Subject: [PATCH 10/18] rewrite natural key presence check --- dlt/destinations/impl/athena/athena.py | 6 +++ .../impl/clickhouse/clickhouse.py | 2 +- dlt/destinations/sql_jobs.py | 13 +++++-- tests/load/pipeline/test_scd2.py | 37 ++++++++++++++----- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 04078dd510..72611a9568 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -149,6 +149,12 @@ def gen_delete_temp_table_sql( sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""") return sql, temp_table_name + @classmethod + def gen_concat_sql(cls, columns: Sequence[str]) -> str: + # Athena requires explicit casting + columns = [f"CAST({c} AS VARCHAR)" for c in columns] + return f"CONCAT({', '.join(columns)})" + @classmethod def requires_temp_table_for_delete(cls) -> bool: return True diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 04273919c6..b6f23ee221 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -195,7 +195,7 @@ def gen_key_table_clauses( @classmethod def gen_update_table_prefix(cls, table_name: str) -> str: - return f"ALTER TABLE {table_name} UPDATE AS d" + return f"ALTER TABLE {table_name} UPDATE" @classmethod def requires_temp_table_for_delete(cls) -> bool: diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 7f4622e406..d40726b18e 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -339,6 +339,10 @@ def gen_delete_from_sql( ); """ + @classmethod + def gen_concat_sql(cls, columns: Sequence[str]) -> str: + return f"CONCAT({', '.join(columns)})" + @classmethod def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str: """Trims identifier to max length supported by sql_client. Used for dynamically constructed table names""" @@ -367,7 +371,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: @classmethod def gen_update_table_prefix(cls, table_name: str) -> str: - return f"UPDATE {table_name} AS d SET" + return f"UPDATE {table_name} SET" @classmethod def requires_temp_table_for_delete(cls) -> bool: @@ -777,8 +781,11 @@ def gen_scd2_sql( get_columns_names_with_prop(root_table, "merge_key"), escape_column_id, ) - keys_equal = cls._gen_key_table_clauses([], merge_keys)[0].format(d="d", s="s") - nk_present = f"EXISTS (SELECT 1 FROM {staging_root_table_name} AS s WHERE {keys_equal})" + if len(merge_keys) == 1: + nk = merge_keys[0] + else: + nk = cls.gen_concat_sql(merge_keys) # compound key + nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" retire_sql += f" AND {nk_present};" sql.append(retire_sql) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 8948c21653..37f3267f9a 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -9,6 +9,7 @@ from dlt.common.typing import TAnyDateTime from dlt.common.pendulum import pendulum from dlt.common.pipeline import LoadInfo +from dlt.common.data_types.typing import TDataType from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES from dlt.common.normalizers.json.relational import DataItemNormalizer from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention @@ -809,7 +810,19 @@ def dim_test(data): ) assert dim_test.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item] - # test compound `merge_key` + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize("key_type", ("text", "bigint")) +def test_retire_absent_rows_compound_key( + destination_config: DestinationTestConfiguration, + key_type: TDataType, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource( merge_key=["first_name", "last_name"], @@ -822,34 +835,40 @@ def dim_test(data): def dim_test_compound(data): yield data + # vary `first_name` type to test mixed compound `merge_key` + if key_type == "text": + first_name = "John" + elif key_type == "bigint": + first_name = 1 # type: ignore[assignment] # load 1 — initial load dim_snap = [ - {"first_name": "John", "last_name": "Doe", "age": 20}, - {"first_name": "John", "last_name": "Dodo", "age": 20}, + {"first_name": first_name, "last_name": "Doe", "age": 20}, + {"first_name": first_name, "last_name": "Dodo", "age": 20}, ] info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2 # both records should be active (i.e. not retired) + _, to = DEFAULT_VALIDITY_COLUMN_NAMES assert [row[to] for row in get_table(p, "dim_test_compound")] == [None, None] - # load 2 — natural key "John" + "Dodo" is absent, natural key "John" + "Doe" has changed + # load 2 — "Dodo" is absent, "Doe" has changed dim_snap = [ - {"first_name": "John", "last_name": "Doe", "age": 30}, + {"first_name": first_name, "last_name": "Doe", "age": 30}, ] info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 3 ts3 = get_load_package_created_at(p, info) - # natural key "John" + "Doe" should now have two records (one retired, one active) + # "Doe" should now have two records (one retired, one active) actual = [ {k: v for k, v in row.items() if k in ("first_name", "last_name", to)} for row in get_table(p, "dim_test_compound") ] expected = [ - {"first_name": "John", "last_name": "Doe", to: ts3}, - {"first_name": "John", "last_name": "Doe", to: None}, - {"first_name": "John", "last_name": "Dodo", to: None}, + {"first_name": first_name, "last_name": "Doe", to: ts3}, + {"first_name": first_name, "last_name": "Doe", to: None}, + {"first_name": first_name, "last_name": "Dodo", to: None}, ] assert_records_as_set(actual, expected) # type: ignore[arg-type] From 16e52cdb965fed6768ba2307be79b324048dc425 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Thu, 19 Sep 2024 19:54:10 +0400 Subject: [PATCH 11/18] simplify scd2 test and remove redundancy --- tests/load/pipeline/test_scd2.py | 101 ++++++++++++------------------- 1 file changed, 40 insertions(+), 61 deletions(-) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 37f3267f9a..8700379f84 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -73,40 +73,21 @@ def get_table( @pytest.mark.essential @pytest.mark.parametrize( - "destination_config,simple,validity_column_names,active_record_timestamp", - # test basic cases for alle SQL destinations supporting merge - [ - (dconf, True, None, None) - for dconf in destinations_configs(default_sql_configs=True, supports_merge=True) - ] - + [ - (dconf, True, None, pendulum.DateTime(2099, 12, 31, 22, 2, 59)) # arbitrary timestamp - for dconf in destinations_configs(default_sql_configs=True, supports_merge=True) - ] - + [ # test nested columns and validity column name configuration only for postgres and duckdb - (dconf, False, ["from", "to"], None) - for dconf in destinations_configs(default_sql_configs=True, subset=["postgres", "duckdb"]) - ] - + [ - (dconf, False, ["ValidFrom", "ValidTo"], None) - for dconf in destinations_configs(default_sql_configs=True, subset=["postgres", "duckdb"]) - ], - ids=lambda x: ( - x.name - if isinstance(x, DestinationTestConfiguration) - else (x[0] + "-" + x[1] if isinstance(x, list) else x) - ), + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize( + "validity_column_names", + [None, ["from", "to"], ["ValidFrom", "ValidTo"]], + ids=lambda x: x[0] + "-" + x[1] if isinstance(x, list) else x, ) def test_core_functionality( destination_config: DestinationTestConfiguration, - simple: bool, validity_column_names: List[str], - active_record_timestamp: Optional[pendulum.DateTime], ) -> None: - # somehow destination_config comes through as ParameterSet instead of - # DestinationTestConfiguration - destination_config = destination_config.values[0] # type: ignore[attr-defined] - + if validity_column_names is not None and destination_config.destination_type != "postgres": + pytest.skip("test `validity_column_names` configuration only for `postgres`") p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource( @@ -115,7 +96,6 @@ def test_core_functionality( "disposition": "merge", "strategy": "scd2", "validity_column_names": validity_column_names, - "active_record_timestamp": active_record_timestamp, }, ) def r(data): @@ -130,8 +110,8 @@ def r(data): # load 1 — initial load dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo" if simple else {"nc1": "foo"}}, - {"nk": 2, "c1": "bar", "c2": "bar" if simple else {"nc1": "bar"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo"}}, + {"nk": 2, "c1": "bar", "c2": {"nc1": "bar"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) assert_load_info(info) @@ -147,93 +127,92 @@ def r(data): # assert load results ts_1 = get_load_package_created_at(p, info) assert_load_info(info) - cname = "c2" if simple else "c2__nc1" - assert get_table(p, "dim_test", cname) == [ + assert get_table(p, "dim_test", "c2__nc1") == [ { from_: ts_1, - to: active_record_timestamp, + to: None, "nk": 2, "c1": "bar", - cname: "bar", + "c2__nc1": "bar", }, { from_: ts_1, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo", + "c2__nc1": "foo", }, ] # load 2 — update a record dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo_updated" if simple else {"nc1": "foo_updated"}}, - {"nk": 2, "c1": "bar", "c2": "bar" if simple else {"nc1": "bar"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo_updated"}}, + {"nk": 2, "c1": "bar", "c2": {"nc1": "bar"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) ts_2 = get_load_package_created_at(p, info) assert_load_info(info) - assert get_table(p, "dim_test", cname) == [ + assert get_table(p, "dim_test", "c2__nc1") == [ { from_: ts_1, - to: active_record_timestamp, + to: None, "nk": 2, "c1": "bar", - cname: "bar", + "c2__nc1": "bar", }, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "c2__nc1": "foo"}, { from_: ts_2, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo_updated", + "c2__nc1": "foo_updated", }, ] # load 3 — delete a record dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo_updated" if simple else {"nc1": "foo_updated"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo_updated"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) ts_3 = get_load_package_created_at(p, info) assert_load_info(info) - assert get_table(p, "dim_test", cname) == [ - {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + assert get_table(p, "dim_test", "c2__nc1") == [ + {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", "c2__nc1": "bar"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "c2__nc1": "foo"}, { from_: ts_2, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo_updated", + "c2__nc1": "foo_updated", }, ] # load 4 — insert a record dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo_updated" if simple else {"nc1": "foo_updated"}}, - {"nk": 3, "c1": "baz", "c2": "baz" if simple else {"nc1": "baz"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo_updated"}}, + {"nk": 3, "c1": "baz", "c2": {"nc1": "baz"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) ts_4 = get_load_package_created_at(p, info) assert_load_info(info) - assert get_table(p, "dim_test", cname) == [ - {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, + assert get_table(p, "dim_test", "c2__nc1") == [ + {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", "c2__nc1": "bar"}, { from_: ts_4, - to: active_record_timestamp, + to: None, "nk": 3, "c1": "baz", - cname: "baz", + "c2__nc1": "baz", }, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "c2__nc1": "foo"}, { from_: ts_2, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo_updated", + "c2__nc1": "foo_updated", }, ] From c826afc78e3a9197e300fae3f315956180121cfb Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Thu, 19 Sep 2024 20:07:51 +0400 Subject: [PATCH 12/18] set constants once --- tests/load/pipeline/test_scd2.py | 106 ++++++++++++++----------------- 1 file changed, 49 insertions(+), 57 deletions(-) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 8700379f84..c9fd4c628c 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -31,6 +31,7 @@ from tests.utils import TPythonTableFormat get_row_hash = DataItemNormalizer.get_row_hash +FROM, TO = DEFAULT_VALIDITY_COLUMN_NAMES def get_load_package_created_at(pipeline: dlt.Pipeline, load_info: LoadInfo) -> datetime: @@ -233,9 +234,6 @@ def test_child_table(destination_config: DestinationTestConfiguration, simple: b def r(data): yield data - # get validity column names - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES - # load 1 — initial load dim_snap: List[Dict[str, Any]] = [ l1_1 := {"nk": 1, "c1": "foo", "c2": [1] if simple else [{"cc1": 1}]}, @@ -245,8 +243,8 @@ def r(data): ts_1 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, - {from_: ts_1, to: None, "nk": 1, "c1": "foo"}, + {FROM: ts_1, TO: None, "nk": 2, "c1": "bar"}, + {FROM: ts_1, TO: None, "nk": 1, "c1": "foo"}, ] cname = "value" if simple else "cc1" assert get_table(p, "dim_test__c2", cname) == [ @@ -264,9 +262,9 @@ def r(data): ts_2 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, # updated - {from_: ts_2, to: None, "nk": 1, "c1": "foo_updated"}, # new + {FROM: ts_1, TO: None, "nk": 2, "c1": "bar"}, + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, # updated + {FROM: ts_2, TO: None, "nk": 1, "c1": "foo_updated"}, # new ] assert_records_as_set( get_table(p, "dim_test__c2"), @@ -293,10 +291,10 @@ def r(data): assert_records_as_set( get_table(p, "dim_test"), [ - {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, - {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, # updated - {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, # new + {FROM: ts_1, TO: None, "nk": 2, "c1": "bar"}, + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, + {FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"}, # updated + {FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"}, # new ], ) exp_3 = [ @@ -319,10 +317,10 @@ def r(data): assert_records_as_set( get_table(p, "dim_test"), [ - {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, # updated - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, - {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, - {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_1, TO: ts_4, "nk": 2, "c1": "bar"}, # updated + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, + {FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"}, ], ) assert_records_as_set( @@ -340,11 +338,11 @@ def r(data): assert_records_as_set( get_table(p, "dim_test"), [ - {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, - {from_: ts_5, to: None, "nk": 3, "c1": "baz"}, # new - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, - {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, - {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_1, TO: ts_4, "nk": 2, "c1": "bar"}, + {FROM: ts_5, TO: None, "nk": 3, "c1": "baz"}, # new + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, + {FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"}, ], ) assert_records_as_set( @@ -497,13 +495,12 @@ def r(data): ts_3 = get_load_package_created_at(p, info) # assert parent records - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES r1_no_child = {k: v for k, v in r1.items() if k != "child"} r2_no_child = {k: v for k, v in r2.items() if k != "child"} expected = [ - {**{from_: ts_1, to: ts_2}, **r1_no_child}, - {**{from_: ts_3, to: None}, **r1_no_child}, - {**{from_: ts_1, to: None}, **r2_no_child}, + {**{FROM: ts_1, TO: ts_2}, **r1_no_child}, + {**{FROM: ts_3, TO: None}, **r1_no_child}, + {**{FROM: ts_1, TO: None}, **r2_no_child}, ] assert_records_as_set(get_table(p, "dim_test"), expected) @@ -631,10 +628,9 @@ def r(data): info = p.run(r(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 2 - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES expected = [ - {**{from_: strip_timezone(ts1), to: None}, **l1_1}, - {**{from_: strip_timezone(ts1), to: None}, **l1_2}, + {**{FROM: strip_timezone(ts1), TO: None}, **l1_1}, + {**{FROM: strip_timezone(ts1), TO: None}, **l1_2}, ] assert get_table(p, "dim_test", "nk") == expected @@ -655,10 +651,10 @@ def r(data): assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 4 expected = [ - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_1}, # retired - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_2}, # retired - {**{from_: strip_timezone(ts2), to: None}, **l2_1}, # new - {**{from_: strip_timezone(ts2), to: None}, **l2_3}, # new + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # retired + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # retired + {**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # new + {**{FROM: strip_timezone(ts2), TO: None}, **l2_3}, # new ] assert_records_as_set(get_table(p, "dim_test"), expected) @@ -677,10 +673,10 @@ def r(data): assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 4 expected = [ - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_1}, # unchanged - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_2}, # unchanged - {**{from_: strip_timezone(ts2), to: None}, **l2_1}, # unchanged - {**{from_: strip_timezone(ts2), to: strip_timezone(ts3)}, **l2_3}, # retired + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # unchanged + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # unchanged + {**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # unchanged + {**{FROM: strip_timezone(ts2), TO: strip_timezone(ts3)}, **l2_3}, # retired ] assert_records_as_set(get_table(p, "dim_test"), expected) @@ -725,9 +721,8 @@ def dim_test(data): info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 2 - _, to = DEFAULT_VALIDITY_COLUMN_NAMES # both records should be active (i.e. not retired) - assert [row[to] for row in get_table(p, "dim_test")] == [None, None] + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None] # load 2 — natural key 2 is absent, natural key 1 is unchanged dim_snap = [ @@ -737,7 +732,7 @@ def dim_test(data): assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 2 # both records should still be active - assert [row[to] for row in get_table(p, "dim_test")] == [None, None] + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None] # load 3 — natural key 2 is absent, natural key 1 has changed dim_snap = [ @@ -748,8 +743,8 @@ def dim_test(data): assert load_table_counts(p, "dim_test")["dim_test"] == 3 ts3 = get_load_package_created_at(p, info) # natural key 1 should now have two records (one retired, one active) - actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")] - expected = [{"nk": 1, to: ts3}, {"nk": 1, to: None}, {"nk": 2, to: None}] + actual = [{k: v for k, v in row.items() if k in ("nk", TO)} for row in get_table(p, "dim_test")] + expected = [{"nk": 1, TO: ts3}, {"nk": 1, TO: None}, {"nk": 2, TO: None}] assert_records_as_set(actual, expected) # type: ignore[arg-type] # load 4 — natural key 2 is absent, natural key 1 has changed back to @@ -762,8 +757,8 @@ def dim_test(data): assert load_table_counts(p, "dim_test")["dim_test"] == 4 ts4 = get_load_package_created_at(p, info) # natural key 1 should now have three records (two retired, one active) - actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")] - expected = [{"nk": 1, to: ts3}, {"nk": 1, to: ts4}, {"nk": 1, to: None}, {"nk": 2, to: None}] + actual = [{k: v for k, v in row.items() if k in ("nk", TO)} for row in get_table(p, "dim_test")] + expected = [{"nk": 1, TO: ts3}, {"nk": 1, TO: ts4}, {"nk": 1, TO: None}, {"nk": 2, TO: None}] assert_records_as_set(actual, expected) # type: ignore[arg-type] # now test various configs @@ -828,8 +823,7 @@ def dim_test_compound(data): assert_load_info(info) assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2 # both records should be active (i.e. not retired) - _, to = DEFAULT_VALIDITY_COLUMN_NAMES - assert [row[to] for row in get_table(p, "dim_test_compound")] == [None, None] + assert [row[TO] for row in get_table(p, "dim_test_compound")] == [None, None] # load 2 — "Dodo" is absent, "Doe" has changed dim_snap = [ @@ -841,13 +835,13 @@ def dim_test_compound(data): ts3 = get_load_package_created_at(p, info) # "Doe" should now have two records (one retired, one active) actual = [ - {k: v for k, v in row.items() if k in ("first_name", "last_name", to)} + {k: v for k, v in row.items() if k in ("first_name", "last_name", TO)} for row in get_table(p, "dim_test_compound") ] expected = [ - {"first_name": first_name, "last_name": "Doe", to: ts3}, - {"first_name": first_name, "last_name": "Doe", to: None}, - {"first_name": first_name, "last_name": "Dodo", to: None}, + {"first_name": first_name, "last_name": "Doe", TO: ts3}, + {"first_name": first_name, "last_name": "Doe", TO: None}, + {"first_name": first_name, "last_name": "Dodo", TO: None}, ] assert_records_as_set(actual, expected) # type: ignore[arg-type] @@ -885,9 +879,8 @@ def _make_scd2_r(table_: Any) -> DltResource: # make sure we have scd2 columns in schema table_schema = p.default_schema.get_table("tabular") assert table_schema["x-merge-strategy"] == "scd2" # type: ignore[typeddict-item] - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES - assert table_schema["columns"][from_]["x-valid-from"] # type: ignore[typeddict-item] - assert table_schema["columns"][to]["x-valid-to"] # type: ignore[typeddict-item] + assert table_schema["columns"][FROM]["x-valid-from"] # type: ignore[typeddict-item] + assert table_schema["columns"][TO]["x-valid-to"] # type: ignore[typeddict-item] assert table_schema["columns"]["row_hash"]["x-row-version"] # type: ignore[typeddict-item] # 100 items in destination assert load_table_counts(p, "tabular")["tabular"] == 100 @@ -951,13 +944,12 @@ def r(data): ts_2 = get_load_package_created_at(p, info) # assert load results - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: ts_2, "nk": 2, "c1": "bar", "row_hash": "mocked_hash_2"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "row_hash": "mocked_hash_1"}, + {FROM: ts_1, TO: ts_2, "nk": 2, "c1": "bar", "row_hash": "mocked_hash_2"}, + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo", "row_hash": "mocked_hash_1"}, { - from_: ts_2, - to: None, + FROM: ts_2, + TO: None, "nk": 1, "c1": "foo_upd", "row_hash": "mocked_hash_1_upd", From ae7cd007e1a410420c870b01fdb344e4df46afa3 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Fri, 20 Sep 2024 00:40:46 +0400 Subject: [PATCH 13/18] document incremental scd2 --- .../docs/general-usage/incremental-loading.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 819ac2fb0c..dd165c2a41 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -250,7 +250,7 @@ that correct data is propagated on initial `replace` load so the future `merge` executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`. ### `scd2` strategy -`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead. +`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. By default, the resource is expected to provide a full extract of the source table each run, though [incremental extracts](#example-incremental-scd2) are also possible. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead. :::note The `unique` hint for `_dlt_id` in the root table is set to `false` when using `scd2`. This differs from [default behavior](./destination-tables.md#child-and-parent-tables). The reason is that the surrogate key stored in `_dlt_id` contains duplicates after an _insert-delete-reinsert_ pattern: @@ -327,6 +327,23 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403 | 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 | | 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 | +#### Example: incremental `scd2` +`retire_absent_rows` can be set to `False` to work with incremental extracts instead of full extracts: +```py +@dlt.resource( + merge_key="my_natural_key", + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_absent_rows": False, + } +) +def dim_customer(): + ... +... +``` +Using this setting, records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records. You need to specify the natural key as `merge_key` when `retire_absent_rows` is `False`. Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`. + #### Example: configure validity column names `_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows: ```py From 8ecf7c62688bd27c702f94e5959799c82c90ecc3 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Tue, 24 Sep 2024 15:54:22 +0400 Subject: [PATCH 14/18] remove natural_key remnants --- dlt/common/schema/typing.py | 1 - dlt/extract/hints.py | 6 ------ 2 files changed, 7 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index a70c354e17..dab502a272 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -241,7 +241,6 @@ class TScd2StrategyDict(TMergeDispositionDict, total=False): boundary_timestamp: Optional[TAnyDateTime] row_version_column_name: Optional[str] retire_absent_rows: Optional[bool] - natural_key: Optional[str] TWriteDispositionConfig = Union[ diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index dfa01eabcf..29f176b93a 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -460,12 +460,6 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] if "retire_absent_rows" in md_dict: dict_["x-retire-absent-rows"] = md_dict["retire_absent_rows"] - if "natural_key" in md_dict: - nk = md_dict["natural_key"] - if nk in dict_["columns"]: - dict_["columns"][nk]["x-natural-key"] = True - else: - dict_["columns"][nk] = {"name": nk, "x-natural-key": True} if md_dict.get("validity_column_names") is None: from_, to = DEFAULT_VALIDITY_COLUMN_NAMES else: From 1035ffa312fc68b1470738b176828cc73e9d6ef1 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Fri, 27 Sep 2024 19:37:23 +0400 Subject: [PATCH 15/18] remove `retire_absent_rows` flag --- dlt/common/schema/typing.py | 1 - dlt/destinations/sql_jobs.py | 28 +++++++++++------------ dlt/extract/hints.py | 9 -------- tests/load/pipeline/test_scd2.py | 39 ++++---------------------------- 4 files changed, 18 insertions(+), 59 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index dab502a272..dbea3026b1 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -240,7 +240,6 @@ class TScd2StrategyDict(TMergeDispositionDict, total=False): active_record_timestamp: Optional[TAnyDateTime] boundary_timestamp: Optional[TAnyDateTime] row_version_column_name: Optional[str] - retire_absent_rows: Optional[bool] TWriteDispositionConfig = Union[ diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index f5c8df8648..ae27213a7c 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -766,27 +766,27 @@ def gen_scd2_sql( ) is_active = f"{to} = {active_record_literal}" - # retire records - # always retire updated records, retire deleted records only if `retire_absent_rows` + # retire records: + # - no `merge_key`: retire all absent records + # - yes `merge_key`: retire those absent records whose `merge_key` + # is present in staging data retire_sql = f""" {cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal} WHERE {is_active} AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name}); """ - retire_absent_rows = root_table.get("x-retire-absent-rows", True) - if not retire_absent_rows: - retire_sql = retire_sql.rstrip()[:-1] # remove semicolon - # merge keys act as natural key - merge_keys = cls._escape_list( - get_columns_names_with_prop(root_table, "merge_key"), - escape_column_id, - ) + merge_keys = cls._escape_list( + get_columns_names_with_prop(root_table, "merge_key"), + escape_column_id, + ) + if len(merge_keys) > 0: if len(merge_keys) == 1: - nk = merge_keys[0] + key = merge_keys[0] else: - nk = cls.gen_concat_sql(merge_keys) # compound key - nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" - retire_sql += f" AND {nk_present};" + key = cls.gen_concat_sql(merge_keys) # compound key + key_present = f"{key} IN (SELECT {key} FROM {staging_root_table_name})" + retire_sql = retire_sql.rstrip()[:-1] # remove semicolon + retire_sql += f" AND {key_present};" sql.append(retire_sql) # insert new active records in root table diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 29f176b93a..2774e17353 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -458,8 +458,6 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: md_dict = cast(TScd2StrategyDict, md_dict) if "boundary_timestamp" in md_dict: dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] - if "retire_absent_rows" in md_dict: - dict_["x-retire-absent-rows"] = md_dict["retire_absent_rows"] if md_dict.get("validity_column_names") is None: from_, to = DEFAULT_VALIDITY_COLUMN_NAMES else: @@ -543,10 +541,3 @@ def validate_write_disposition_hint(template: TResourceHints) -> None: raise ValueError( f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] ) - - if ( - "retire_absent_rows" in wd - and not wd["retire_absent_rows"] - and template.get("merge_key") is None - ): - raise ValueError("`merge_key` is required when `retire_absent_rows=False`") diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index c9fd4c628c..f4c2365b1b 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -697,18 +697,14 @@ def r(data): destinations_configs(default_sql_configs=True, supports_merge=True), ids=lambda x: x.name, ) -def test_retire_absent_rows( +def test_merge_key_natural_key( destination_config: DestinationTestConfiguration, ) -> None: p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource( merge_key="nk", - write_disposition={ - "disposition": "merge", - "strategy": "scd2", - "retire_absent_rows": False, - }, + write_disposition={"disposition": "merge", "strategy": "scd2"}, ) def dim_test(data): yield data @@ -761,29 +757,6 @@ def dim_test(data): expected = [{"nk": 1, TO: ts3}, {"nk": 1, TO: ts4}, {"nk": 1, TO: None}, {"nk": 2, TO: None}] assert_records_as_set(actual, expected) # type: ignore[arg-type] - # now test various configs - - with pytest.raises(ValueError): - # should raise because `merge_key` is required when `retire_absent_rows=False` - dim_test.apply_hints( - merge_key="", - write_disposition={ - "disposition": "merge", - "strategy": "scd2", - "retire_absent_rows": False, - }, - ) - - # `retire_absent_rows=True` does not require `merge_key` - dim_test.apply_hints( - write_disposition={ - "disposition": "merge", - "strategy": "scd2", - "retire_absent_rows": True, - } - ) - assert dim_test.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item] - @pytest.mark.essential @pytest.mark.parametrize( @@ -792,7 +765,7 @@ def dim_test(data): ids=lambda x: x.name, ) @pytest.mark.parametrize("key_type", ("text", "bigint")) -def test_retire_absent_rows_compound_key( +def test_merge_key_compound_natural_key( destination_config: DestinationTestConfiguration, key_type: TDataType, ) -> None: @@ -800,11 +773,7 @@ def test_retire_absent_rows_compound_key( @dlt.resource( merge_key=["first_name", "last_name"], - write_disposition={ - "disposition": "merge", - "strategy": "scd2", - "retire_absent_rows": False, - }, + write_disposition={"disposition": "merge", "strategy": "scd2"}, ) def dim_test_compound(data): yield data From c5be436503a92a749dcb50699174c4978478f5ec Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Fri, 27 Sep 2024 19:37:47 +0400 Subject: [PATCH 16/18] add scd2 merge key partition test --- tests/load/pipeline/test_scd2.py | 66 ++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index f4c2365b1b..0b9f133d6e 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -815,6 +815,72 @@ def dim_test_compound(data): assert_records_as_set(actual, expected) # type: ignore[arg-type] +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +def test_merge_key_partition( + destination_config: DestinationTestConfiguration, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) + + @dlt.resource( + merge_key="date", + write_disposition={"disposition": "merge", "strategy": "scd2"}, + ) + def dim_test(data): + yield data + + # load 1 — "2024-01-01" partition + dim_snap = [ + {"date": "2024-01-01", "name": "a"}, + {"date": "2024-01-01", "name": "b"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + # both records should be active (i.e. not retired) + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None] + + # load 2 — "2024-01-02" partition + dim_snap = [ + {"date": "2024-01-02", "name": "c"}, + {"date": "2024-01-02", "name": "d"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 4 + # two "2024-01-01" should be untouched, two "2024-01-02" records should + # be added + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None, None, None] + + # load 2 — reload "2024-01-01" partition + dim_snap = [ + {"date": "2024-01-01", "name": "a"}, # unchanged + {"date": "2024-01-01", "name": "bb"}, # new + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + # "b" should be retired, "bb" should be added, "2024-01-02" partition + # should be untouched + assert load_table_counts(p, "dim_test")["dim_test"] == 5 + ts2 = get_load_package_created_at(p, info) + actual = [ + {k: v for k, v in row.items() if k in ("date", "name", TO)} + for row in get_table(p, "dim_test") + ] + expected = [ + {"date": "2024-01-01", "name": "a", TO: None}, + {"date": "2024-01-01", "name": "b", TO: ts2}, + {"date": "2024-01-01", "name": "bb", TO: None}, + {"date": "2024-01-02", "name": "c", TO: None}, + {"date": "2024-01-02", "name": "d", TO: None}, + ] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), From 21ce4f8fa44b33e3cbc4dbfc03d25bd6dd378e98 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Sat, 28 Sep 2024 13:41:58 +0400 Subject: [PATCH 17/18] fix typos --- tests/load/pipeline/test_scd2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 0b9f133d6e..3e08b792ed 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -852,11 +852,11 @@ def dim_test(data): info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 4 - # two "2024-01-01" should be untouched, two "2024-01-02" records should + # two "2024-01-01" records should be untouched, two "2024-01-02" records should # be added assert [row[TO] for row in get_table(p, "dim_test")] == [None, None, None, None] - # load 2 — reload "2024-01-01" partition + # load 3 — reload "2024-01-01" partition dim_snap = [ {"date": "2024-01-01", "name": "a"}, # unchanged {"date": "2024-01-01", "name": "bb"}, # new From 84c101bd7049967afca60e6fad703d09b61f62bb Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Sat, 28 Sep 2024 13:42:48 +0400 Subject: [PATCH 18/18] update incremental scd2 docs --- .../docs/general-usage/incremental-loading.md | 126 ++++++++++++++++-- 1 file changed, 117 insertions(+), 9 deletions(-) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index eac58e075c..c8f92cf154 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -301,21 +301,129 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403 | 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 | #### Example: incremental `scd2` -`retire_absent_rows` can be set to `False` to work with incremental extracts instead of full extracts: +A `merge_key` can be provided to work with incremental extracts instead of full extracts. The `merge_key` lets you define which absent rows are considered "deleted". Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`. + +*Case 1: do not retire absent records* + +You can set the natural key as `merge_key` to prevent retirement of absent rows. In this case you don't consider any absent row deleted. Records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records. + ```py @dlt.resource( - merge_key="my_natural_key", - write_disposition={ - "disposition": "merge", - "strategy": "scd2", - "retire_absent_rows": False, - } + merge_key="customer_key", + write_disposition={"disposition": "merge", "strategy": "scd2"} ) def dim_customer(): - ... + # initial load + yield [ + {"customer_key": 1, "c1": "foo", "c2": 1}, + {"customer_key": 2, "c1": "bar", "c2": 2} + ] + +pipeline.run(dim_customer()) # first run — 2024-04-09 18:27:53.734235 +... +``` +*`dim_customer` destination table after first run:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` | +| -- | -- | -- | -- | -- | +| 2024-04-09 18:27:53.734235 | NULL | 1 | foo | 1 | +| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 | + +```py +... +def dim_customer(): + # second load — record for customer_key 1 got updated, customer_key 2 absent + yield [ + {"customer_key": 1, "c1": "foo_updated", "c2": 1}, +] + +pipeline.run(dim_customer()) # second run — 2024-04-09 22:13:07.943703 +``` + +*`dim_customer` destination table after second run—customer key 2 was not retired:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` | +| -- | -- | -- | -- | -- | +| 2024-04-09 18:27:53.734235 | **2024-04-09 22:13:07.943703** | 1 | foo | 1 | +| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 | +| **2024-04-09 22:13:07.943703** | **NULL** | **1** | **foo_updated** | **1** | + +*Case 2: only retire records for given partitions* + +:::note +Technically this is not SCD2 because the key used to merge records is not a natural key. +::: + +You can set a "partition" column as `merge_key` to retire absent rows for given partitions. In this case you only consider absent rows deleted if their partition value is present in the extract. Physical partitioning of the table is not required—the word "partition" is used conceptually here. + +```py +@dlt.resource( + merge_key="date", + write_disposition={"disposition": "merge", "strategy": "scd2"} +) +def some_data(): + # load 1 — "2024-01-01" partition + yield [ + {"date": "2024-01-01", "name": "a"}, + {"date": "2024-01-01", "name": "b"}, + ] + +pipeline.run(some_data()) # first run — 2024-01-02 03:03:35.854305 +... +``` + +*`some_data` destination table after first run:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` | +| -- | -- | -- | -- | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | b | + +```py +... +def some_data(): + # load 2 — "2024-01-02" partition + yield [ + {"date": "2024-01-02", "name": "c"}, + {"date": "2024-01-02", "name": "d"}, + ] + +pipeline.run(some_data()) # second run — 2024-01-03 03:01:11.943703 ... ``` -Using this setting, records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records. You need to specify the natural key as `merge_key` when `retire_absent_rows` is `False`. Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`. + +*`some_data` destination table after second run—added 2024-01-02 records, did not touch 2024-01-01 records:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` | +| -- | -- | -- | -- | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | b | +| **2024-01-03 03:01:11.943703** | **NULL** | **2024-01-02** | **c** | +| **2024-01-03 03:01:11.943703** | **NULL** | **2024-01-02** | **d** | + +```py +... +def some_data(): + # load 3 — reload "2024-01-01" partition + yield [ + {"date": "2024-01-01", "name": "a"}, # unchanged + {"date": "2024-01-01", "name": "bb"}, # new + ] + +pipeline.run(some_data()) # third run — 2024-01-03 10:30:05.750356 +... +``` + +*`some_data` destination table after third run—retired b, added bb, did not touch 2024-01-02 partition:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` | +| -- | -- | -- | -- | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a | +| 2024-01-02 03:03:35.854305 | **2024-01-03 10:30:05.750356** | 2024-01-01 | b | +| 2024-01-03 03:01:11.943703 | NULL | 2024-01-02 | c | +| 2024-01-03 03:01:11.943703 | NULL | 2024-01-02 | d | +| **2024-01-03 10:30:05.750356** | **NULL** | **2024-01-01** | **bb** | + #### Example: configure validity column names `_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows: