Skip to content

Commit

Permalink
test: incremental lag datetime
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Oct 16, 2024
1 parent 17dc69a commit a06d552
Showing 1 changed file with 30 additions and 26 deletions.
56 changes: 30 additions & 26 deletions tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,9 @@ def events_timezone_unset():

@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_staging_configs=True, default_sql_configs=True),
destinations_configs(
default_staging_configs=True, default_sql_configs=True, subset=["duckdb", "postgres"]
),
ids=lambda x: x.name,
)
def test_pipeline_resource_incremental_int_lag(
Expand Down Expand Up @@ -1057,19 +1059,21 @@ def r3(_=dlt.sources.incremental("id", lag=1)):

@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_staging_configs=True, default_sql_configs=True),
destinations_configs(
default_staging_configs=True, default_sql_configs=True, subset=["duckdb", "postgres"]
),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("lag", [3602, 3601, 3600, 3599, -1])
def test_pipeline_resource_incremental_datetime_lag(
destination_config: DestinationTestConfiguration,
destination_config: DestinationTestConfiguration, lag: float
) -> None:
"""
Test incremental lag behavior for datetime data while using `id` as the primary key.
- `r1` loads entries with `id` and `created_at` timestamps.
- `r2` adds new entries.
- `r3`, with a lag of 3600 seconds (1 hour), updates entries but only considers records
where `created_at` is within the 1-hour lag window.
- `r2`, with different lag values, updates entries but only considers records
where `created_at` is within the lag window.
We validate that the final dataset reflects the correct data updates, taking the lag into account.
"""
Expand All @@ -1082,37 +1086,37 @@ def test_pipeline_resource_incremental_datetime_lag(
@dlt.resource(name=name, primary_key="id")
def r1(_=dlt.sources.incremental("created_at")):
yield from [
{"id": 1, "created_at": "2023-03-03T00:00:00Z", "event": "event1"},
{"id": 2, "created_at": "2023-03-03T00:15:00Z", "event": "event2"},
{"id": 3, "created_at": "2023-03-03T00:30:00Z", "event": "event3"},
{"id": 4, "created_at": "2023-03-03T00:45:00Z", "event": "event4"},
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "1"},
{"id": 2, "created_at": "2023-03-03T01:00:01Z", "event": "2"},
{
"id": 3,
"created_at": "2023-03-03T02:00:01Z",
"event": "3",
}, # the lag will be applied here
]

@dlt.resource(name=name, primary_key="id")
def r2(_=dlt.sources.incremental("created_at")):
yield from [
{"id": 5, "created_at": "2023-03-03T01:00:00Z", "event": "event5"},
{"id": 6, "created_at": "2023-03-03T01:15:00Z", "event": "event6"},
]

# r3 with a lag of 3600 seconds (1 hour) will affect datetime updates
updated_events = [
{"id": 3, "created_at": "2023-03-03T06:00:00Z", "event": "changed"},
{"id": 5, "created_at": "2023-03-03T06:00:00Z", "event": "changed"},
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "updated"},
{"id": 2, "created_at": "2023-03-03T01:00:01Z", "event": "updated"},
]

@dlt.resource(name=name, primary_key="id", write_disposition="merge")
def r3(_=dlt.sources.incremental("created_at", lag=3600)): # 1-hour lag in seconds
yield from [
{"id": 7, "created_at": "2023-03-03T06:00:00Z", "event": "event7"}
] + updated_events
def r2(_=dlt.sources.incremental("created_at", lag=lag)):
yield from [{"id": 4, "created_at": "2023-03-03T03:00:00Z", "event": "4"}] + updated_events

pipeline.run(r1)
pipeline.run(r2)
pipeline.run(r3)

# Validate final dataset
results = {
3602: ["updated", "updated", "3", "4"],
3601: ["updated", "updated", "3", "4"],
3600: ["1", "updated", "3", "4"],
3599: ["1", "2", "3", "4"],
-1: ["1", "2", "3", "4"],
}

with pipeline.sql_client() as sql_client:
assert [
row[0] for row in sql_client.execute_sql(f"SELECT event FROM {name} ORDER BY id")
] == ["event1", "event2", "changed", "event4", "changed", "event6", "event7"]
] == results[int(lag)]

0 comments on commit a06d552

Please sign in to comment.