From f109a87d51c31b82902068a6aff379f54c1173b5 Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 15 Dec 2024 22:04:49 +0100 Subject: [PATCH] add test for testing incremental with limit --- tests/extract/test_incremental.py | 51 ++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 0a0c1f8c2d..205df41a81 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -3854,6 +3854,7 @@ def some_data(): for col in table_schema["columns"].values(): assert "incremental" not in col + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) @pytest.mark.parametrize("last_value_func", [min, max]) def test_start_range_open(item_type: TestDataItemFormat, last_value_func: Any) -> None: @@ -3961,17 +3962,57 @@ def some_data( # Includes values 5-10 inclusive assert items == expected_items -def test_incremental_and_limit(): - @dlt.resource(incremental=dlt.sources.incremental(cursor_path="id", initial_value=0, last_value_func=min)) - def resource(): - for i in range(100): +@pytest.mark.parametrize("offset_by_last_value", [True, False]) +def test_incremental_and_limit(offset_by_last_value: bool): + resource_called = 0 + + # here we check incremental and limit when incremental once when last value cannot be used + # to offset the source, and once when it can. + + @dlt.resource( + table_name="items", + ) + def resource( + incremental=dlt.sources.incremental(cursor_path="id", initial_value=-1, row_order="asc") + ): + range_iterator = ( + range(incremental.start_value + 1, 1000) if offset_by_last_value else range(1000) + ) + for i in range_iterator: + nonlocal resource_called + resource_called += 1 yield { "id": i, "value": str(i), } + resource.add_limit(10) + p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True) - p.run(resource().add_limit(10)) + p.run(resource()) + + # check we have the right number of items + assert len(p.dataset().items.df()) == 10 + assert resource_called == 10 + # check that we have items 0-9 + assert p.dataset().items.df().id.tolist() == list(range(10)) + + # run the next ten + p.run(resource()) + + # check we have the right number of items + assert len(p.dataset().items.df()) == 20 + assert resource_called == 20 if offset_by_last_value else 30 + # check that we have items 0-19 + assert p.dataset().items.df().id.tolist() == list(range(20)) + + # run the next batch + p.run(resource()) + # check we have the right number of items + assert len(p.dataset().items.df()) == 30 + assert resource_called == 30 if offset_by_last_value else 60 + # check that we have items 0-29 + assert p.dataset().items.df().id.tolist() == list(range(30))