diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 8e7f8a45..e263e947 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -19,10 +19,10 @@ jobs: - "3.10" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip @@ -107,10 +107,10 @@ jobs: - 6333:6333 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip diff --git a/.github/workflows/test_examples.yaml b/.github/workflows/test_examples.yaml index c16799c0..96176804 100644 --- a/.github/workflows/test_examples.yaml +++ b/.github/workflows/test_examples.yaml @@ -28,10 +28,10 @@ jobs: - RayExecutor steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip diff --git a/CHANGELOG.md b/CHANGELOG.md index 92782717..c053d1ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.13.11 + +* Remove logging to database (`datapipe_events` table) from `EventLogger` + # 0.13.10 * Fix compatibility with SQLalchemy < 2 (ColumnClause in typing) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 6c42e9bd..6f34215a 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -609,9 +609,7 @@ def __init__( create_meta_table: bool = False, ) -> None: self.meta_dbconn = meta_dbconn - self.event_logger = EventLogger( - self.meta_dbconn, create_table=create_meta_table - ) + self.event_logger = EventLogger() self.tables: Dict[str, DataTable] = {} self.create_meta_table = create_meta_table diff --git a/datapipe/event_logger.py b/datapipe/event_logger.py index a678a591..5cc763c9 100644 --- a/datapipe/event_logger.py +++ b/datapipe/event_logger.py @@ -1,60 +1,14 @@ import logging -from enum import Enum -from typing import TYPE_CHECKING, Any, Optional, Tuple +from typing import Optional -from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.sql import func -from sqlalchemy.sql.schema import Column, Table -from sqlalchemy.sql.sqltypes import JSON, DateTime, Integer, String from traceback_with_variables import format_exc from datapipe.run_config import RunConfig logger = logging.getLogger("datapipe.event_logger") -if TYPE_CHECKING: - from datapipe.store.database import DBConn - - -class EventTypes(Enum): - STATE = "state" - ERROR = "error" - - -class StepEventTypes(Enum): - RUN_FULL_COMPLETE = "run_full_complete" - class EventLogger: - def __init__(self, dbconn: "DBConn", create_table: bool = False): - self.dbconn = dbconn - - self.events_table = Table( - "datapipe_events", - dbconn.sqla_metadata, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("event_ts", DateTime, server_default=func.now()), - Column("type", String(100)), - Column("event", JSON if dbconn.con.name == "sqlite" else JSONB), - ) - - self.step_events_table = Table( - "datapipe_step_events", - dbconn.sqla_metadata, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("step", String(100)), - Column("event_ts", DateTime, server_default=func.now()), - Column("event", String(100)), - Column("event_payload", JSON if dbconn.con.name == "sqlite" else JSONB), - ) - - if create_table: - self.events_table.create(self.dbconn.con, checkfirst=True) - self.step_events_table.create(self.dbconn.con, checkfirst=True) - - def __reduce__(self) -> Tuple[Any, ...]: - return self.__class__, (self.dbconn,) - def log_state( self, table_name, @@ -66,34 +20,9 @@ def log_state( ): logger.debug( f'Table "{table_name}": added = {added_count}; updated = {updated_count}; ' - f"deleted = {deleted_count}, processed_count = {deleted_count}" + f"deleted = {deleted_count}, processed_count = {processed_count}" ) - if run_config is not None: - meta = { - "labels": run_config.labels, - "filters": run_config.filters, - } - else: - meta = {} - - ins = self.events_table.insert().values( - type=EventTypes.STATE.value, - event={ - "meta": meta, - "data": { - "table_name": table_name, - "added_count": added_count, - "updated_count": updated_count, - "deleted_count": deleted_count, - "processed_count": processed_count, - }, - }, - ) - - with self.dbconn.con.begin() as con: - con.execute(ins) - def log_error( self, type, @@ -106,29 +35,8 @@ def log_error( logger.error( f'Error in step {run_config.labels.get("step_name")}: {type} {message}\n{description}' ) - meta = { - "labels": run_config.labels, - "filters": run_config.filters, - } else: logger.error(f"Error: {type} {message}\n{description}") - meta = {} - - ins = self.events_table.insert().values( - type=EventTypes.ERROR.value, - event={ - "meta": meta, - "data": { - "type": type, - "message": message, - "description": description, - "params": params, - }, - }, - ) - - with self.dbconn.con.begin() as con: - con.execute(ins) def log_exception( self, @@ -148,11 +56,3 @@ def log_step_full_complete( step_name: str, ) -> None: logger.debug(f"Step {step_name} is marked complete") - - ins = self.step_events_table.insert().values( - step=step_name, - event=StepEventTypes.RUN_FULL_COMPLETE.value, - ) - - with self.dbconn.con.begin() as con: - con.execute(ins) diff --git a/pyproject.toml b/pyproject.toml index 0e41566d..29acf267 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.10-post.1" +version = "0.13.11" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe"