Skip to content

Commit

Permalink
refactor status_updater.py
Browse files Browse the repository at this point in the history
  • Loading branch information
LuiggiTenorioK committed Nov 20, 2024
1 parent b53e85f commit 21ad183
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 57 deletions.
91 changes: 34 additions & 57 deletions autosubmit_api/bgtasks/tasks/status_updater.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from datetime import datetime
import os
import time
from typing import Dict, List

from sqlalchemy import select
from autosubmit_api.bgtasks.bgtask import BackgroundTaskTemplate
from autosubmit_api.database import tables
from autosubmit_api.database.common import (
create_autosubmit_db_engine,
create_as_times_db_engine,
create_main_db_conn,
)
from autosubmit_api.database.models import ExperimentModel
from autosubmit_api.experiment.common_requests import _is_exp_running
from autosubmit_api.history.database_managers.database_models import RunningStatus
from autosubmit_api.persistance.experiment import ExperimentPaths
from autosubmit_api.repositories.experiment import (
create_experiment_repository,
ExperimentModel,
)
from autosubmit_api.repositories.experiment_status import (
create_experiment_status_repository,
)
from autosubmit_api.repositories.join.experiment_join import (
create_experiment_join_repository,
)


class StatusUpdater(BackgroundTaskTemplate):
Expand All @@ -26,38 +27,30 @@ def _clear_missing_experiments(cls):
"""
Clears the experiments that are not in the experiments table
"""
with create_main_db_conn() as conn:
try:
del_stmnt = tables.experiment_status_table.delete().where(
tables.experiment_status_table.c.exp_id.not_in(
select(tables.experiment_table.c.id)
)
)
conn.execute(del_stmnt)
conn.commit()
except Exception as exc:
conn.rollback()
cls.logger.error(
f"[{cls.id}] Error while clearing missing experiments status: {exc}"
)
try:
experiment_join_repo = create_experiment_join_repository()
experiment_join_repo.drop_status_from_deleted_experiments()
except Exception as exc:
cls.logger.error(
f"[{cls.id}] Error while clearing missing experiments status: {exc}"
)

@classmethod
def _get_experiments(cls) -> List[ExperimentModel]:
"""
Get the experiments list
"""
with create_autosubmit_db_engine().connect() as conn:
query_result = conn.execute(tables.experiment_table.select()).all()
return [ExperimentModel.model_validate(row._mapping) for row in query_result]
experiment_repository = create_experiment_repository()
return experiment_repository.get_all()

@classmethod
def _get_current_status(cls) -> Dict[str, str]:
"""
Get the current status of the experiments
"""
with create_as_times_db_engine().connect() as conn:
query_result = conn.execute(tables.experiment_status_table.select()).all()
return {row.name: row.status for row in query_result}
status_repository = create_experiment_status_repository()
experiment_statuses = status_repository.get_all()
return {row.name: row.status for row in experiment_statuses}

@classmethod
def _check_exp_running(cls, expid: str) -> bool:
Expand Down Expand Up @@ -87,30 +80,17 @@ def _check_exp_running(cls, expid: str) -> bool:

@classmethod
def _update_experiment_status(cls, experiment: ExperimentModel, is_running: bool):
with create_as_times_db_engine().connect() as conn:
try:
del_stmnt = tables.experiment_status_table.delete().where(
tables.experiment_status_table.c.exp_id == experiment.id
)
ins_stmnt = tables.experiment_status_table.insert().values(
exp_id=experiment.id,
name=experiment.name,
status=(
RunningStatus.RUNNING
if is_running
else RunningStatus.NOT_RUNNING
),
seconds_diff=0,
modified=datetime.now().isoformat(sep="-", timespec="seconds"),
)
conn.execute(del_stmnt)
conn.execute(ins_stmnt)
conn.commit()
except Exception as exc:
conn.rollback()
cls.logger.error(
f"[{cls.id}] Error while doing database operations on experiment {experiment.name}: {exc}"
)
status_repository = create_experiment_status_repository()
try:
status_repository.upsert_status(
experiment.id,
experiment.name,
RunningStatus.RUNNING if is_running else RunningStatus.NOT_RUNNING,
)
except Exception as exc:
cls.logger.error(
f"[{cls.id}] Error while doing database operations on experiment {experiment.name}: {exc}"
)

@classmethod
def procedure(cls):
Expand All @@ -131,10 +111,7 @@ def procedure(cls):
new_status = (
RunningStatus.RUNNING if is_running else RunningStatus.NOT_RUNNING
)
if (
current_status.get(experiment.name, RunningStatus.NOT_RUNNING)
!= new_status
):
if current_status.get(experiment.name) != new_status:
cls.logger.info(
f"[{cls.id}] Updating status of {experiment.name} to {new_status}"
)
Expand Down
75 changes: 75 additions & 0 deletions autosubmit_api/repositories/experiment_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, List
from pydantic import BaseModel
from sqlalchemy import Engine, Table, delete, insert
from sqlalchemy.schema import CreateTable
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_as_times_db_engine


class ExperimentStatusModel(BaseModel):
exp_id: int
name: str
status: str
seconds_diff: Any
modified: Any


class ExperimentStatusRepository(ABC):
@abstractmethod
def get_all(self) -> List[ExperimentStatusModel]:
"""
Get all experiments status
"""

@abstractmethod
def upsert_status(self, exp_id: int, expid: str, status: str) -> int:
"""
Delete and insert experiment status by expid
"""


class ExperimentStatusSQLRepository(ExperimentStatusRepository):
def __init__(self, engine: Engine, table: Table):
self.engine = engine
self.table = table

with self.engine.connect() as conn:
conn.execute(CreateTable(self.table, if_not_exists=True))
conn.commit()

def get_all(self):
with self.engine.connect() as conn:
statement = self.table.select()
result = conn.execute(statement).all()
return [
ExperimentStatusModel.model_validate(row, from_attributes=True)
for row in result
]

def upsert_status(self, exp_id: int, expid: str, status: str):
with self.engine.connect() as conn:
with conn.begin():
try:
del_stmnt = delete(self.table).where(self.table.c.id == exp_id)
ins_stmnt = insert(self.table).values(
exp_id=exp_id,
name=expid,
status=status,
seconds_diff=0,
modified=datetime.now().isoformat(sep="-", timespec="seconds"),
)
conn.execute(del_stmnt)
result = conn.execute(ins_stmnt)
conn.commit()
except Exception as exc:
conn.rollback()
raise exc

return result.rowcount


def create_experiment_status_repository() -> ExperimentStatusRepository:
engine = create_as_times_db_engine()
return ExperimentStatusSQLRepository(engine, tables.experiment_status_table)
Empty file.
33 changes: 33 additions & 0 deletions autosubmit_api/repositories/join/experiment_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
from sqlalchemy import select
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_main_db_conn


class ExperimentJoinRepository(ABC):
@abstractmethod
def drop_status_from_deleted_experiments(self) -> int:
"""
Drop status records from experiments that are not in the experiments table
"""


class ExperimentJoinSQLRepository(ExperimentJoinRepository):
def _get_connection(self):
return create_main_db_conn()

def drop_status_from_deleted_experiments(self) -> int:
with self._get_connection() as conn:
del_stmnt = tables.experiment_status_table.delete().where(
tables.experiment_status_table.c.exp_id.not_in(
select(tables.experiment_table.c.id)
)
)
result = conn.execute(del_stmnt)
conn.commit()

return result.rowcount


def create_experiment_join_repository() -> ExperimentJoinRepository:
return ExperimentJoinSQLRepository()

0 comments on commit 21ad183

Please sign in to comment.