diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 0e1928204a..daf1361c4c 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -1001,7 +1001,9 @@ def events_timezone_unset(): @pytest.mark.parametrize( "destination_config", - destinations_configs(default_staging_configs=True, default_sql_configs=True), + destinations_configs( + default_staging_configs=True, default_sql_configs=True, subset=["duckdb", "postgres"] + ), ids=lambda x: x.name, ) def test_pipeline_resource_incremental_int_lag( @@ -1057,19 +1059,21 @@ def r3(_=dlt.sources.incremental("id", lag=1)): @pytest.mark.parametrize( "destination_config", - destinations_configs(default_staging_configs=True, default_sql_configs=True), + destinations_configs( + default_staging_configs=True, default_sql_configs=True, subset=["duckdb", "postgres"] + ), ids=lambda x: x.name, ) +@pytest.mark.parametrize("lag", [3602, 3601, 3600, 3599, -1]) def test_pipeline_resource_incremental_datetime_lag( - destination_config: DestinationTestConfiguration, + destination_config: DestinationTestConfiguration, lag: float ) -> None: """ Test incremental lag behavior for datetime data while using `id` as the primary key. - `r1` loads entries with `id` and `created_at` timestamps. - - `r2` adds new entries. - - `r3`, with a lag of 3600 seconds (1 hour), updates entries but only considers records - where `created_at` is within the 1-hour lag window. + - `r2`, with different lag values, updates entries but only considers records + where `created_at` is within the lag window. We validate that the final dataset reflects the correct data updates, taking the lag into account. """ @@ -1082,37 +1086,37 @@ def test_pipeline_resource_incremental_datetime_lag( @dlt.resource(name=name, primary_key="id") def r1(_=dlt.sources.incremental("created_at")): yield from [ - {"id": 1, "created_at": "2023-03-03T00:00:00Z", "event": "event1"}, - {"id": 2, "created_at": "2023-03-03T00:15:00Z", "event": "event2"}, - {"id": 3, "created_at": "2023-03-03T00:30:00Z", "event": "event3"}, - {"id": 4, "created_at": "2023-03-03T00:45:00Z", "event": "event4"}, + {"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "1"}, + {"id": 2, "created_at": "2023-03-03T01:00:01Z", "event": "2"}, + { + "id": 3, + "created_at": "2023-03-03T02:00:01Z", + "event": "3", + }, # the lag will be applied here ] - @dlt.resource(name=name, primary_key="id") - def r2(_=dlt.sources.incremental("created_at")): - yield from [ - {"id": 5, "created_at": "2023-03-03T01:00:00Z", "event": "event5"}, - {"id": 6, "created_at": "2023-03-03T01:15:00Z", "event": "event6"}, - ] - - # r3 with a lag of 3600 seconds (1 hour) will affect datetime updates updated_events = [ - {"id": 3, "created_at": "2023-03-03T06:00:00Z", "event": "changed"}, - {"id": 5, "created_at": "2023-03-03T06:00:00Z", "event": "changed"}, + {"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "updated"}, + {"id": 2, "created_at": "2023-03-03T01:00:01Z", "event": "updated"}, ] @dlt.resource(name=name, primary_key="id", write_disposition="merge") - def r3(_=dlt.sources.incremental("created_at", lag=3600)): # 1-hour lag in seconds - yield from [ - {"id": 7, "created_at": "2023-03-03T06:00:00Z", "event": "event7"} - ] + updated_events + def r2(_=dlt.sources.incremental("created_at", lag=lag)): + yield from [{"id": 4, "created_at": "2023-03-03T03:00:00Z", "event": "4"}] + updated_events pipeline.run(r1) pipeline.run(r2) - pipeline.run(r3) # Validate final dataset + results = { + 3602: ["updated", "updated", "3", "4"], + 3601: ["updated", "updated", "3", "4"], + 3600: ["1", "updated", "3", "4"], + 3599: ["1", "2", "3", "4"], + -1: ["1", "2", "3", "4"], + } + with pipeline.sql_client() as sql_client: assert [ row[0] for row in sql_client.execute_sql(f"SELECT event FROM {name} ORDER BY id") - ] == ["event1", "event2", "changed", "event4", "changed", "event6", "event7"] + ] == results[int(lag)]