Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]: Datapipe ml opts: добавлена динамическая фильтрация индексов #308

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
809e812
Merge branch 'fix-join-of-aux-table' into fix-asc-desc-merge-fix-join…
bobokvsky Sep 26, 2023
912c19b
Merge branch 'add-kwargs-to-dbconn' into fix-asc-desc-merge-fix-join-…
bobokvsky Sep 26, 2023
17a6213
Merge branch 'add-kwargs-to-dbconn' into fix-asc-desc-merge-fix-join-…
bobokvsky Sep 26, 2023
effbc5a
add executor_config to DatatableBatchTransform
bobokvsky Oct 6, 2023
17b2d4d
Merge branch 'fix-join-of-aux-table' into datapipe-ml-opts
bobokvsky Oct 19, 2023
295eb89
refactor filters as list of LabelDict
bobokvsky Oct 23, 2023
43ad666
added Filters to str as some table support
bobokvsky Oct 23, 2023
4064147
added filters to cli
bobokvsky Oct 23, 2023
7daaae0
fix filter in transform tables
bobokvsky Oct 23, 2023
fdde324
fix or_ -> and_
bobokvsky Oct 23, 2023
abf3725
fix or_ -> and_
bobokvsky Oct 23, 2023
0df0f22
*
bobokvsky Nov 13, 2023
e7f9578
Merge branch 'v0.13' into datapipe-ml-opts
bobokvsky Jan 22, 2024
e4db5ea
fix delete_stale
bobokvsky Feb 1, 2024
9997d04
fix suffix problem
bobokvsky Apr 1, 2024
55849d0
fix bug when reading multiply suffixes
bobokvsky Jul 15, 2024
208ddf8
fix2
bobokvsky Jul 15, 2024
f46cc66
Merge branch 'master' into datapipe-ml-opts
bobokvsky Aug 12, 2024
b579061
*
bobokvsky Aug 12, 2024
3dc9de4
WIPg
bobokvsky Aug 14, 2024
526b1ab
fix typing
bobokvsky Aug 14, 2024
50c8f9f
mypy fixs + add IndexDF support
bobokvsky Aug 16, 2024
0008bf9
add tests, part 1
bobokvsky Aug 16, 2024
181cf40
fix tests
bobokvsky Aug 19, 2024
a775700
fix mypyg
bobokvsky Aug 19, 2024
fab83ca
sql filters change
bobokvsky Aug 19, 2024
a458dce
fix tests
bobokvsky Aug 19, 2024
9ec0675
fix test
bobokvsky Aug 19, 2024
220e344
fix tests
bobokvsky Aug 19, 2024
c4b97fb
fix filedir
bobokvsky Aug 19, 2024
18d3f93
revert changes
bobokvsky Aug 19, 2024
ecc68fd
refactoring filters
bobokvsky Aug 19, 2024
0838d0b
rename function
bobokvsky Aug 19, 2024
d2045ad
fix tests
bobokvsky Aug 19, 2024
7310152
rm print
bobokvsky Aug 19, 2024
267ca06
fix tests
bobokvsky Aug 19, 2024
a61aedb
add tests examples
bobokvsky Aug 19, 2024
38ba3a8
fix tests
bobokvsky Aug 19, 2024
255756c
Merge remote-tracking branch 'origin/master' into datapipe-ml-opts
elephantum Aug 20, 2024
d1f6a99
add test_complex_transform_with_filters2
bobokvsky Aug 20, 2024
586afc0
fix tests
bobokvsky Aug 20, 2024
33b482d
fix tests, added new ValueError
bobokvsky Aug 20, 2024
bf69bf8
add new tests
bobokvsky Aug 20, 2024
5cb433e
*
elephantum Aug 22, 2024
4c98525
Merge branch 'datapipe-ml-opts' of github.com:epoch8/datapipe into da…
elephantum Aug 22, 2024
195c808
Merge branch 'master' into datapipe-ml-opts-merge-v0.14.1-alpha.1
bobokvsky Sep 5, 2024
17cffbb
fix tests
bobokvsky Sep 5, 2024
d95b2af
fix mypy
bobokvsky Sep 5, 2024
97dd2d2
fix tests
bobokvsky Sep 6, 2024
b1de4b8
*
bobokvsky Sep 6, 2024
8055860
removed Keys from filters must be in transform_keys error
bobokvsky Sep 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
- many_to_zero
- model_inference
- one_to_many_pipeline
- batch_transform_with_filters/simple-example
- batch_transform_with_filters/filters-as-function
executor:
- SingleThreadExecutor
- RayExecutor
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Major changes:

See "Migration from v0.13 to v0.14" for more details

* `BatchTransform` has new argument `filters`. It's using to filter processing transform indexes using only that indexes that as indicated in `filters`. See `docs/concepts.md` for more details.

# 0.13.14

* Fix [#334](https://github.com/epoch8/datapipe/issues/334)
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
black:
autoflake -r --in-place --remove-all-unused-imports steps/ *.py brandlink_utils/
black --verbose --config black.toml steps/ alembic *.py brandlink_utils/

mypy:
mypy -p datapipe --ignore-missing-imports --follow-imports=silent --namespace-packages
2 changes: 2 additions & 0 deletions black.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tool.black]
line-length = 120
5 changes: 4 additions & 1 deletion datapipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, List, Optional, cast

import click
from datapipe.run_config import RunConfig
import pandas as pd
import rich
from opentelemetry import trace
Expand Down Expand Up @@ -318,6 +319,7 @@ def step(
steps = filter_steps_by_labels_and_name(app, labels=labels_list, name_prefix=name)

ctx.obj["steps"] = steps
ctx.obj["labels"] = labels_list


def to_human_repr(step: ComputeStep, extra_args: Optional[Dict] = None) -> str:
Expand Down Expand Up @@ -379,6 +381,7 @@ def step_list(ctx: click.Context, status: bool) -> None: # noqa
def step_run(ctx: click.Context, loop: bool, loop_delay: int) -> None:
app: DatapipeApp = ctx.obj["pipeline"]
steps_to_run: List[ComputeStep] = ctx.obj["steps"]
run_config = RunConfig(labels={k: v for k, v in ctx.obj["labels"]})

executor: Executor = ctx.obj["executor"]

Expand All @@ -387,7 +390,7 @@ def step_run(ctx: click.Context, loop: bool, loop_delay: int) -> None:

while True:
if len(steps_to_run) > 0:
run_steps(app.ds, steps_to_run, executor=executor)
run_steps(app.ds, steps_to_run, run_config=run_config, executor=executor)

if not loop:
break
Expand Down
5 changes: 5 additions & 0 deletions datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from datapipe.event_logger import EventLogger
from datapipe.meta.sql_meta import MetaTable
from datapipe.run_config import RunConfig
from datapipe.sql_util import (
sql_apply_idx_filter_to_table,
sql_apply_runconfig_filters,
)
from datapipe.store.database import DBConn, MetaKey
from datapipe.store.database import DBConn
from datapipe.store.table_store import TableStore
from datapipe.types import DataDF, IndexDF, MetadataDF, data_to_index, index_difference
Expand Down
21 changes: 17 additions & 4 deletions datapipe/meta/sql_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import sqlalchemy as sa

from datapipe.run_config import RunConfig
from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filter
from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filters
from datapipe.store.database import DBConn, MetaKey
from datapipe.types import (
DataDF,
Expand Down Expand Up @@ -389,7 +389,7 @@ def get_stale_idx(
)
)

sql = sql_apply_runconfig_filter(
sql = sql_apply_runconfig_filters(
sql, self.sql_table, self.primary_keys, run_config
)

Expand Down Expand Up @@ -447,7 +447,7 @@ def get_agg_cte(
sql = sql.group_by(*key_cols)

sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx)
sql = sql_apply_runconfig_filter(sql, tbl, self.primary_keys, run_config)
sql = sql_apply_runconfig_filters_to_subquery(sql, self.primary_keys, run_config)

return (keys, sql.cte(name=f"{tbl.name}__update"))

Expand Down Expand Up @@ -649,7 +649,7 @@ def mark_all_rows_unprocessed(
.where(self.sql_table.c.is_success == True)
)

sql = sql_apply_runconfig_filter(
sql = sql_apply_runconfig_filters(
update_sql, self.sql_table, self.primary_keys, run_config
)

Expand Down Expand Up @@ -680,6 +680,18 @@ def sql_apply_filters_idx_to_subquery(
return sql


def sql_apply_runconfig_filters_to_subquery(
sql: Any,
keys: List[str],
run_config: Optional[RunConfig] = None,
) -> Any:
if run_config is not None:
filters_idx = pd.DataFrame(run_config.filters)
sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx)

return sql


@dataclass
class ComputeInputCTE:
cte: Any
Expand Down Expand Up @@ -795,6 +807,7 @@ def build_changed_idx_sql(
)

out = sql_apply_filters_idx_to_subquery(out, transform_keys, filters_idx)
out = sql_apply_runconfig_filters_to_subquery(out, transform_keys, run_config)

out = out.cte(name="transform")

Expand Down
20 changes: 17 additions & 3 deletions datapipe/run_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from typing import Any, List, Dict, Optional

LabelDict = Dict[str, Any]
import pandas as pd
from datapipe.types import LabelDict


@dataclass
Expand All @@ -10,7 +11,7 @@ class RunConfig:
# если не пуст, то во время запуска обрабатываются только те строки,
# которые строго соответствуют фильтру
# (в случае, если у таблицы есть идентификатор с совпадающим именем).
filters: LabelDict = field(default_factory=dict)
filters: List[LabelDict] = field(default_factory=list)
labels: LabelDict = field(default_factory=dict)

@classmethod
Expand All @@ -22,3 +23,16 @@ def add_labels(cls, rc: Optional["RunConfig"], labels: LabelDict) -> "RunConfig"
)
else:
return RunConfig(labels=labels)

@classmethod
def add_filters(cls, rc: Optional["RunConfig"], filters: List[LabelDict]) -> "RunConfig":
if rc is not None:
return RunConfig(
filters=list(
pd.concat([pd.DataFrame(rc.filters), pd.DataFrame(filters)], ignore_index=True)
.drop_duplicates()
.apply(lambda row : row.dropna().to_dict(), axis=1)
),
)
else:
return RunConfig(filters=filters)
30 changes: 19 additions & 11 deletions datapipe/sql_util.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Any, Dict, List, Optional
from collections import defaultdict
from typing import Any, Dict, List, Optional, cast

from sqlalchemy import Column, Integer, String, Table, tuple_
import pandas as pd
from sqlalchemy import Column, Integer, String, Table, column, tuple_
from sqlalchemy.sql.expression import and_, or_

from datapipe.run_config import RunConfig
from datapipe.types import IndexDF

from datapipe.types import IndexDF, LabelDict

def sql_apply_idx_filter_to_table(
sql: Any,
Expand All @@ -22,22 +24,28 @@ def sql_apply_idx_filter_to_table(
keys = tuple_(*[table.c[key] for key in primary_keys]) # type: ignore

sql = sql.where(
keys.in_([tuple([r[key] for key in primary_keys]) for r in idx.to_dict(orient="records")]) # type: ignore
keys.in_(
[
tuple([r[key] for key in primary_keys]) # type: ignore
for r in idx.to_dict(orient="records")
]
)
)

return sql


def sql_apply_runconfig_filter(
def sql_apply_runconfig_filters(
sql: Any,
table: Table,
primary_keys: List[str],
keys: List[str],
run_config: Optional[RunConfig] = None,
) -> Any:
if run_config is not None:
for k, v in run_config.filters.items():
if k in primary_keys:
sql = sql.where(table.c[k] == v)
filters_idx = pd.DataFrame(run_config.filters)
primary_keys = [key for key in keys if key in table.c and key in filters_idx.columns]
if len(filters_idx) > 0 and len(primary_keys) > 0:
sql = sql_apply_idx_filter_to_table(sql, table, primary_keys, cast(IndexDF, filters_idx))

return sql

Expand All @@ -49,4 +57,4 @@ def sql_apply_runconfig_filter(


def sql_schema_to_dtype(schema: List[Column]) -> Dict[str, Any]:
return {i.name: SCHEMA_TO_DTYPE_LOOKUP[i.type.__class__] for i in schema} # type: ignore
return {i.name: SCHEMA_TO_DTYPE_LOOKUP[i.type.__class__] for i in schema}
Loading