Skip to content

Commit

Permalink
Merge pull request #324 from epoch8/feature/delete-datapipe-events-lo…
Browse files Browse the repository at this point in the history
…gging

Remove logging in datapipe_events
  • Loading branch information
elephantum authored Jun 25, 2024
2 parents 53cee1b + b325736 commit 91bafbe
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 112 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ jobs:
- "3.10"

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down Expand Up @@ -107,10 +107,10 @@ jobs:
- 6333:6333

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ jobs:
- RayExecutor

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.13.11

* Remove logging to database (`datapipe_events` table) from `EventLogger`

# 0.13.10

* Fix compatibility with SQLalchemy < 2 (ColumnClause in typing)
Expand Down
4 changes: 1 addition & 3 deletions datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,7 @@ def __init__(
create_meta_table: bool = False,
) -> None:
self.meta_dbconn = meta_dbconn
self.event_logger = EventLogger(
self.meta_dbconn, create_table=create_meta_table
)
self.event_logger = EventLogger()
self.tables: Dict[str, DataTable] = {}

self.create_meta_table = create_meta_table
Expand Down
104 changes: 2 additions & 102 deletions datapipe/event_logger.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,14 @@
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import Optional

from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func
from sqlalchemy.sql.schema import Column, Table
from sqlalchemy.sql.sqltypes import JSON, DateTime, Integer, String
from traceback_with_variables import format_exc

from datapipe.run_config import RunConfig

logger = logging.getLogger("datapipe.event_logger")

if TYPE_CHECKING:
from datapipe.store.database import DBConn


class EventTypes(Enum):
STATE = "state"
ERROR = "error"


class StepEventTypes(Enum):
RUN_FULL_COMPLETE = "run_full_complete"


class EventLogger:
def __init__(self, dbconn: "DBConn", create_table: bool = False):
self.dbconn = dbconn

self.events_table = Table(
"datapipe_events",
dbconn.sqla_metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("event_ts", DateTime, server_default=func.now()),
Column("type", String(100)),
Column("event", JSON if dbconn.con.name == "sqlite" else JSONB),
)

self.step_events_table = Table(
"datapipe_step_events",
dbconn.sqla_metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("step", String(100)),
Column("event_ts", DateTime, server_default=func.now()),
Column("event", String(100)),
Column("event_payload", JSON if dbconn.con.name == "sqlite" else JSONB),
)

if create_table:
self.events_table.create(self.dbconn.con, checkfirst=True)
self.step_events_table.create(self.dbconn.con, checkfirst=True)

def __reduce__(self) -> Tuple[Any, ...]:
return self.__class__, (self.dbconn,)

def log_state(
self,
table_name,
Expand All @@ -66,34 +20,9 @@ def log_state(
):
logger.debug(
f'Table "{table_name}": added = {added_count}; updated = {updated_count}; '
f"deleted = {deleted_count}, processed_count = {deleted_count}"
f"deleted = {deleted_count}, processed_count = {processed_count}"
)

if run_config is not None:
meta = {
"labels": run_config.labels,
"filters": run_config.filters,
}
else:
meta = {}

ins = self.events_table.insert().values(
type=EventTypes.STATE.value,
event={
"meta": meta,
"data": {
"table_name": table_name,
"added_count": added_count,
"updated_count": updated_count,
"deleted_count": deleted_count,
"processed_count": processed_count,
},
},
)

with self.dbconn.con.begin() as con:
con.execute(ins)

def log_error(
self,
type,
Expand All @@ -106,29 +35,8 @@ def log_error(
logger.error(
f'Error in step {run_config.labels.get("step_name")}: {type} {message}\n{description}'
)
meta = {
"labels": run_config.labels,
"filters": run_config.filters,
}
else:
logger.error(f"Error: {type} {message}\n{description}")
meta = {}

ins = self.events_table.insert().values(
type=EventTypes.ERROR.value,
event={
"meta": meta,
"data": {
"type": type,
"message": message,
"description": description,
"params": params,
},
},
)

with self.dbconn.con.begin() as con:
con.execute(ins)

def log_exception(
self,
Expand All @@ -148,11 +56,3 @@ def log_step_full_complete(
step_name: str,
) -> None:
logger.debug(f"Step {step_name} is marked complete")

ins = self.step_events_table.insert().values(
step=step_name,
event=StepEventTypes.RUN_FULL_COMPLETE.value,
)

with self.dbconn.con.begin() as con:
con.execute(ins)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "datapipe-core"
version = "0.13.10-post.1"
version = "0.13.11"
description = "`datapipe` is a realtime incremental ETL library for Python application"
readme = "README.md"
repository = "https://github.com/epoch8/datapipe"
Expand Down

0 comments on commit 91bafbe

Please sign in to comment.