Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into v0.13
Browse files Browse the repository at this point in the history
  • Loading branch information
elephantum committed Jun 30, 2024
2 parents b003d3d + e992183 commit d1dd298
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 159 deletions.
24 changes: 12 additions & 12 deletions .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ jobs:
- "3.10"

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down Expand Up @@ -50,12 +50,12 @@ jobs:
strategy:
matrix:
include:
# - python-version: "3.8"
# test-db-env: "postgres"
# pip-extra: "sqlalchemy <2"
- python-version: "3.8"
test-db-env: "postgres"
pip-extra: "sqlalchemy <2"
- python-version: "3.8"
test-db-env: "postgres"
pip-extra: "sqlalchemy >2"
pip-extra: "'sqlalchemy>2'"
# - python-version: "3.8"
# test-db-env: "sqlite"
# - python-version: "3.9"
Expand All @@ -68,13 +68,13 @@ jobs:
# test-db-env: "sqlite"
- python-version: "3.11"
test-db-env: "postgres"
pip-extra: "sqlalchemy <2"
pip-extra: '"sqlalchemy<2" "pandas<2.2"'
- python-version: "3.11"
test-db-env: "postgres"
pip-extra: "sqlalchemy >2"
pip-extra: '"sqlalchemy>2"'
- python-version: "3.11"
test-db-env: "sqlite"
pip-extra: "sqlalchemy >2"
pip-extra: '"sqlalchemy>2"'

services:
# Label used to access the service container
Expand Down Expand Up @@ -107,17 +107,17 @@ jobs:
- 6333:6333

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip

- name: Install dependencies
run: |
pip install "${{ matrix.pip-extra }}" ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp]" "pytest<8" "pytest_cases"
pip install ${{ matrix.pip-extra }} ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp]" "pytest<8" "pytest_cases"
- name: Test with pytest
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ jobs:
- RayExecutor

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
# 0.13.13

* Add `ComputeStep.get_status` method
* Remove restriction for Pandas < 2.2

# 0.13.12

* Add processing of an empty response in `QdrantStore`
* Add optional `index_schema` to `QdrantStore`
* Add redis cluster mode support in `RedisStore`

# 0.13.11

* Remove logging to database (`datapipe_events` table) from `EventLogger`

# 0.13.10

* Fix compatibility with SQLalchemy < 2 (ColumnClause in typing)
* Fix compatibility with Ray and SQLalchemy > 2 (serialization of Table)
* (post.1) Fix dependencies for MacOS; deprecate Python 3.8

# 0.13.9

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# Datapipe

`datapipe` is a real-time, incremental ETL library for Python with record-level dependency tracking.
[Datapipe](https://datapipe.dev/) is a real-time, incremental ETL library for Python with record-level dependency tracking.

The library is designed for describing data processing pipelines and is capable
of tracking dependencies for each record in the pipeline. This ensures that
tasks within the pipeline receive only the data that has been modified, thereby
improving the overall efficiency of data handling.

https://datapipe.dev/

# Development

At the moment these branches are active:
Expand Down
23 changes: 10 additions & 13 deletions datapipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,19 +348,16 @@ def list(ctx: click.Context, status: bool) -> None: # noqa
extra_args = {}

if status:
if len(step.input_dts) > 0:
try:
if isinstance(step, BaseBatchTransformStep):
changed_idx_count = step.get_changed_idx_count(ds=app.ds)

if changed_idx_count > 0:
extra_args[
"changed_idx_count"
] = f"[red]{changed_idx_count}[/red]"

except NotImplementedError:
# Currently we do not support empty join_keys
extra_args["changed_idx_count"] = "[red]N/A[/red]"
try:
step_status = step.get_status(ds=app.ds)
extra_args["total_idx_count"] = str(step_status.total_idx_count)
extra_args["changed_idx_count"] = (
f"[red]{step_status.changed_idx_count}[/red]"
)
except NotImplementedError:
# Currently we do not support empty join_keys
extra_args["total_idx_count"] = "[red]N/A[/red]"
extra_args["changed_idx_count"] = "[red]N/A[/red]"

rprint(to_human_repr(step, extra_args=extra_args))
rprint("")
Expand Down
10 changes: 10 additions & 0 deletions datapipe/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ def get_datatable(self, ds: DataStore, name: str) -> DataTable:
return ds.get_or_create_table(name=name, table_store=self.catalog[name].store)


@dataclass
class StepStatus:
name: str
total_idx_count: int
changed_idx_count: int


class ComputeStep:
"""
Шаг вычислений в графе вычислений.
Expand Down Expand Up @@ -91,6 +98,9 @@ def name(self) -> str:
def labels(self) -> Labels:
return self._labels if self._labels else []

def get_status(self, ds: DataStore) -> StepStatus:
raise NotImplementedError

# TODO: move to lints
def validate(self) -> None:
inp_p_keys_arr = [set(inp.primary_keys) for inp in self.input_dts if inp]
Expand Down
4 changes: 1 addition & 3 deletions datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,7 @@ def __init__(
create_meta_table: bool = False,
) -> None:
self.meta_dbconn = meta_dbconn
self.event_logger = EventLogger(
self.meta_dbconn, create_table=create_meta_table
)
self.event_logger = EventLogger()
self.tables: Dict[str, DataTable] = {}

self.create_meta_table = create_meta_table
Expand Down
104 changes: 2 additions & 102 deletions datapipe/event_logger.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,14 @@
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import Optional

from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func
from sqlalchemy.sql.schema import Column, Table
from sqlalchemy.sql.sqltypes import JSON, DateTime, Integer, String
from traceback_with_variables import format_exc

from datapipe.run_config import RunConfig

logger = logging.getLogger("datapipe.event_logger")

if TYPE_CHECKING:
from datapipe.store.database import DBConn


class EventTypes(Enum):
STATE = "state"
ERROR = "error"


class StepEventTypes(Enum):
RUN_FULL_COMPLETE = "run_full_complete"


class EventLogger:
def __init__(self, dbconn: "DBConn", create_table: bool = False):
self.dbconn = dbconn

self.events_table = Table(
"datapipe_events",
dbconn.sqla_metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("event_ts", DateTime, server_default=func.now()),
Column("type", String(100)),
Column("event", JSON if dbconn.con.name == "sqlite" else JSONB),
)

self.step_events_table = Table(
"datapipe_step_events",
dbconn.sqla_metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("step", String(100)),
Column("event_ts", DateTime, server_default=func.now()),
Column("event", String(100)),
Column("event_payload", JSON if dbconn.con.name == "sqlite" else JSONB),
)

if create_table:
self.events_table.create(self.dbconn.con, checkfirst=True)
self.step_events_table.create(self.dbconn.con, checkfirst=True)

def __reduce__(self) -> Tuple[Any, ...]:
return self.__class__, (self.dbconn,)

def log_state(
self,
table_name,
Expand All @@ -66,34 +20,9 @@ def log_state(
):
logger.debug(
f'Table "{table_name}": added = {added_count}; updated = {updated_count}; '
f"deleted = {deleted_count}, processed_count = {deleted_count}"
f"deleted = {deleted_count}, processed_count = {processed_count}"
)

if run_config is not None:
meta = {
"labels": run_config.labels,
"filters": run_config.filters,
}
else:
meta = {}

ins = self.events_table.insert().values(
type=EventTypes.STATE.value,
event={
"meta": meta,
"data": {
"table_name": table_name,
"added_count": added_count,
"updated_count": updated_count,
"deleted_count": deleted_count,
"processed_count": processed_count,
},
},
)

with self.dbconn.con.begin() as con:
con.execute(ins)

def log_error(
self,
type,
Expand All @@ -106,29 +35,8 @@ def log_error(
logger.error(
f'Error in step {run_config.labels.get("step_name")}: {type} {message}\n{description}'
)
meta = {
"labels": run_config.labels,
"filters": run_config.filters,
}
else:
logger.error(f"Error: {type} {message}\n{description}")
meta = {}

ins = self.events_table.insert().values(
type=EventTypes.ERROR.value,
event={
"meta": meta,
"data": {
"type": type,
"message": message,
"description": description,
"params": params,
},
},
)

with self.dbconn.con.begin() as con:
con.execute(ins)

def log_exception(
self,
Expand All @@ -148,11 +56,3 @@ def log_step_full_complete(
step_name: str,
) -> None:
logger.debug(f"Step {step_name} is marked complete")

ins = self.step_events_table.insert().values(
step=step_name,
event=StepEventTypes.RUN_FULL_COMPLETE.value,
)

with self.dbconn.con.begin() as con:
con.execute(ins)
12 changes: 9 additions & 3 deletions datapipe/step/batch_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from sqlalchemy.sql.expression import select
from tqdm_loggable.auto import tqdm

from datapipe.compute import Catalog, ComputeStep, PipelineStep
from datapipe.compute import Catalog, ComputeStep, PipelineStep, StepStatus
from datapipe.datatable import DataStore, DataTable, MetaTable
from datapipe.executor import Executor, ExecutorConfig, SingleThreadExecutor
from datapipe.run_config import LabelDict, RunConfig
Expand Down Expand Up @@ -77,8 +77,7 @@ def __call__(
input_dts: List[DataTable],
run_config: Optional[RunConfig] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> TransformResult:
...
) -> TransformResult: ...


BatchTransformFunc = Callable[..., TransformResult]
Expand Down Expand Up @@ -517,6 +516,13 @@ def _apply_filters_to_run_config(
run_config.filters = filters
return run_config

def get_status(self, ds: DataStore) -> StepStatus:
return StepStatus(
name=self.name,
total_idx_count=self.meta_table.get_metadata_size(),
changed_idx_count=self.get_changed_idx_count(ds),
)

def get_changed_idx_count(
self,
ds: DataStore,
Expand Down
Loading

0 comments on commit d1dd298

Please sign in to comment.