Skip to content

Commit

Permalink
Merge pull request #331 from epoch8/fix-batch-generate-delete-stale
Browse files Browse the repository at this point in the history
fix: passed delete_stale to do_batch_generate
  • Loading branch information
elephantum authored Aug 11, 2024
2 parents 1ae3712 + e2f8aee commit 6602a40
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.13.14

* Fix [#334](https://github.com/epoch8/datapipe/issues/334)

# 0.13.13

* Add `ComputeStep.get_status` method
Expand Down
3 changes: 2 additions & 1 deletion datapipe/step/batch_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def do_batch_generate(
ds: DataStore,
output_dts: List[DataTable],
run_config: Optional[RunConfig] = None,
kwargs: Optional[Dict] = None,
delete_stale: bool = True,
kwargs: Optional[Dict] = None,
) -> None:
"""
Создание новой таблицы из результатов запуска `proc_func`.
Expand Down Expand Up @@ -104,6 +104,7 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]:
ds=ds,
output_dts=output_dts,
run_config=run_config,
delete_stale=self.delete_stale,
kwargs=kwargs,
),
),
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "datapipe-core"
version = "0.13.13"
version = "0.13.14"
description = "`datapipe` is a realtime incremental ETL library for Python application"
readme = "README.md"
repository = "https://github.com/epoch8/datapipe"
Expand Down
120 changes: 120 additions & 0 deletions tests/test_chunked_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,123 @@ def transform(df, idx, ds, run_config, transform_count):
changelist = ChangeList.create("inp", change_idx)
run_steps_changelist(ds, steps, changelist, RunConfig())
assert transform_count["value"] == 2


def test_stale_records_deletion_with_batch_generate(dbconn):
ds = DataStore(dbconn, create_meta_table=True)
catalog = Catalog(
{
"inp_del": Table(
store=TableStoreDB(
dbconn=dbconn,
name="inp_data_del",
data_sql_schema=TEST_SCHEMA,
create_table=True,
)
),
}
)

bg_count = {"value": 0}
bg_chunk_size = 5

def add_inp_table(ds: DataStore, bg_count):
assert isinstance(ds, DataStore)
bg_count["value"] += 1
yield pd.DataFrame(
{
"id": range(
bg_count["value"] * bg_chunk_size,
(bg_count["value"] + 1) * bg_chunk_size,
),
"a": range(
bg_count["value"] * bg_chunk_size,
(bg_count["value"] + 1) * bg_chunk_size,
),
}
)

pipeline = Pipeline(
[
BatchGenerate(
func=add_inp_table,
outputs=["inp_del"],
delete_stale=True, # Default behavior, deletes records that are not yielded by func
kwargs=dict(bg_count=bg_count), # to avoid double counting
),
]
)

steps = build_compute(ds, catalog, pipeline)

# First run
run_steps(ds, steps)

# Second run
run_steps(ds, steps)

# Check table shapes
df_del = catalog.get_datatable(ds, "inp_del").get_data()

assert df_del.shape[0] == bg_chunk_size
# additionally, check that delete_stale=True deletes previous chunks and keeps the last one
assert df_del.iloc[0]["id"] == bg_chunk_size * bg_count["value"]


def test_stale_records_keep_with_batch_generate(dbconn):
ds = DataStore(dbconn, create_meta_table=True)
catalog = Catalog(
{
"inp_keep": Table(
store=TableStoreDB(
dbconn=dbconn,
name="inp_data_keep",
data_sql_schema=TEST_SCHEMA,
create_table=True,
)
),
}
)

bg_count = {"value": 0}
bg_chunk_size = 5

def add_inp_table(ds: DataStore, bg_count):
assert isinstance(ds, DataStore)
bg_count["value"] += 1
yield pd.DataFrame(
{
"id": range(
bg_count["value"] * bg_chunk_size,
(bg_count["value"] + 1) * bg_chunk_size,
),
"a": range(
bg_count["value"] * bg_chunk_size,
(bg_count["value"] + 1) * bg_chunk_size,
),
}
)

pipeline = Pipeline(
[
BatchGenerate(
func=add_inp_table,
outputs=["inp_keep"],
delete_stale=False, # keeps records that are not yielded by func
kwargs=dict(bg_count=bg_count),
),
]
)

steps = build_compute(ds, catalog, pipeline)

# First run
run_steps(ds, steps)

# Second run
run_steps(ds, steps)

# Check table shapes
df_keep = catalog.get_datatable(ds, "inp_keep").get_data()

assert df_keep.shape[0] == bg_count["value"] * bg_chunk_size

0 comments on commit 6602a40

Please sign in to comment.