-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
incremental scd2
with merge_key
#1818
Changes from 14 commits
a3a29ac
aa62983
bcbd0c7
70d036d
5cfc5c8
54dfe14
c28f8ba
7d1aad9
a0aa99c
65838e4
16e52cd
c826afc
ae7cd00
5888a07
8ecf7c6
1035ffa
c5be436
21ce4f8
84c101b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
TTableSchemaColumns, | ||
TWriteDispositionConfig, | ||
TMergeDispositionDict, | ||
TScd2StrategyDict, | ||
TAnySchemaColumns, | ||
TTableFormat, | ||
TSchemaContract, | ||
|
@@ -352,7 +353,7 @@ def _set_hints( | |
self, hints_template: TResourceHints, create_table_variant: bool = False | ||
) -> None: | ||
DltResourceHints.validate_dynamic_hints(hints_template) | ||
DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition")) | ||
DltResourceHints.validate_write_disposition_hint(hints_template) | ||
if create_table_variant: | ||
table_name: str = hints_template["name"] # type: ignore[assignment] | ||
# incremental cannot be specified in variant | ||
|
@@ -452,10 +453,19 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: | |
md_dict: TMergeDispositionDict = dict_.pop("write_disposition") | ||
if merge_strategy := md_dict.get("strategy"): | ||
dict_["x-merge-strategy"] = merge_strategy | ||
if "boundary_timestamp" in md_dict: | ||
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] | ||
# add columns for `scd2` merge strategy | ||
|
||
if merge_strategy == "scd2": | ||
md_dict = cast(TScd2StrategyDict, md_dict) | ||
if "boundary_timestamp" in md_dict: | ||
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] | ||
if "retire_absent_rows" in md_dict: | ||
dict_["x-retire-absent-rows"] = md_dict["retire_absent_rows"] | ||
if "natural_key" in md_dict: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also no longer needed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also removed |
||
nk = md_dict["natural_key"] | ||
if nk in dict_["columns"]: | ||
dict_["columns"][nk]["x-natural-key"] = True | ||
else: | ||
dict_["columns"][nk] = {"name": nk, "x-natural-key": True} | ||
if md_dict.get("validity_column_names") is None: | ||
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES | ||
else: | ||
|
@@ -514,7 +524,8 @@ def validate_dynamic_hints(template: TResourceHints) -> None: | |
) | ||
|
||
@staticmethod | ||
def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConfig]) -> None: | ||
def validate_write_disposition_hint(template: TResourceHints) -> None: | ||
wd = template.get("write_disposition") | ||
if isinstance(wd, dict) and wd["disposition"] == "merge": | ||
wd = cast(TMergeDispositionDict, wd) | ||
if "strategy" in wd and wd["strategy"] not in MERGE_STRATEGIES: | ||
|
@@ -523,13 +534,25 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf | |
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}.""" | ||
) | ||
|
||
for ts in ("active_record_timestamp", "boundary_timestamp"): | ||
if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None: | ||
continue # None is allowed for active_record_timestamp | ||
if ts in wd: | ||
try: | ||
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] | ||
except Exception: | ||
raise ValueError( | ||
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] | ||
) | ||
if wd.get("strategy") == "scd2": | ||
wd = cast(TScd2StrategyDict, wd) | ||
for ts in ("active_record_timestamp", "boundary_timestamp"): | ||
if ( | ||
ts == "active_record_timestamp" | ||
and wd.get("active_record_timestamp") is None | ||
): | ||
continue # None is allowed for active_record_timestamp | ||
if ts in wd: | ||
try: | ||
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] | ||
except Exception: | ||
raise ValueError( | ||
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] | ||
) | ||
|
||
if ( | ||
"retire_absent_rows" in wd | ||
and not wd["retire_absent_rows"] | ||
and template.get("merge_key") is None | ||
): | ||
raise ValueError("`merge_key` is required when `retire_absent_rows=False`") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -223,7 +223,7 @@ info = pipeline.run(fb_ads.with_resources("ads"), write_disposition="merge") | |
In the example above, we enforce the root key propagation with `fb_ads.root_key = True`. This ensures that the correct data is propagated on the initial `replace` load so the future `merge` load can be executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`. | ||
|
||
### `scd2` strategy | ||
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as a surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g., 9999-12-31 00:00:00.000000) instead. | ||
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. By default, the resource is expected to provide a full extract of the source table each run, but [incremental extracts](#example-incremental-scd2) are also possible. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead. | ||
|
||
:::note | ||
The `unique` hint for `_dlt_id` in the root table is set to `false` when using `scd2`. This differs from [default behavior](./destination-tables.md#child-and-parent-tables). The reason is that the surrogate key stored in `_dlt_id` contains duplicates after an _insert-delete-reinsert_ pattern: | ||
|
@@ -300,6 +300,23 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403 | |
| 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 | | ||
| 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 | | ||
|
||
#### Example: incremental `scd2` | ||
`retire_absent_rows` can be set to `False` to work with incremental extracts instead of full extracts: | ||
```py | ||
@dlt.resource( | ||
merge_key="my_natural_key", | ||
write_disposition={ | ||
"disposition": "merge", | ||
"strategy": "scd2", | ||
"retire_absent_rows": False, | ||
} | ||
) | ||
def dim_customer(): | ||
... | ||
... | ||
``` | ||
Using this setting, records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records. You need to specify the natural key as `merge_key` when `retire_absent_rows` is `False`. Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rudolfix we are using only the merge_key as opposed to a combination of primary key and merge key here, let us know if this is ok. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jorritsandbrink @sh-rp I think there's some kind of conceptual confusion. Example 1: I load data day by day. I set the merge key to My take:
WDYT? IMO this is really powerful functionality if done this way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you're right :) I'll adapt per your suggestions. |
||
|
||
#### Example: configure validity column names | ||
`_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows: | ||
```py | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is no longer needed, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops. Removed now.