Skip to content

Commit

Permalink
Centralize existing SQLAlchemy snippets in repository layer (#145)
Browse files Browse the repository at this point in the history
* refactor status_updater.py

* refactor populate_details

* refactor JobPackageReader

* refactor ExperimentBuilder

* refactor common_db_requests

* refactor process_active_graphs

* fix bugs and add tests
  • Loading branch information
LuiggiTenorioK authored Dec 12, 2024
1 parent 00a1d06 commit 00c1821
Show file tree
Hide file tree
Showing 15 changed files with 639 additions and 326 deletions.
91 changes: 34 additions & 57 deletions autosubmit_api/bgtasks/tasks/status_updater.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from datetime import datetime
import os
import time
from typing import Dict, List

from sqlalchemy import select
from autosubmit_api.bgtasks.bgtask import BackgroundTaskTemplate
from autosubmit_api.database import tables
from autosubmit_api.database.common import (
create_autosubmit_db_engine,
create_as_times_db_engine,
create_main_db_conn,
)
from autosubmit_api.database.models import ExperimentModel
from autosubmit_api.experiment.common_requests import _is_exp_running
from autosubmit_api.history.database_managers.database_models import RunningStatus
from autosubmit_api.persistance.experiment import ExperimentPaths
from autosubmit_api.repositories.experiment import (
create_experiment_repository,
ExperimentModel,
)
from autosubmit_api.repositories.experiment_status import (
create_experiment_status_repository,
)
from autosubmit_api.repositories.join.experiment_join import (
create_experiment_join_repository,
)


class StatusUpdater(BackgroundTaskTemplate):
Expand All @@ -26,38 +27,30 @@ def _clear_missing_experiments(cls):
"""
Clears the experiments that are not in the experiments table
"""
with create_main_db_conn() as conn:
try:
del_stmnt = tables.experiment_status_table.delete().where(
tables.experiment_status_table.c.exp_id.not_in(
select(tables.experiment_table.c.id)
)
)
conn.execute(del_stmnt)
conn.commit()
except Exception as exc:
conn.rollback()
cls.logger.error(
f"[{cls.id}] Error while clearing missing experiments status: {exc}"
)
try:
experiment_join_repo = create_experiment_join_repository()
experiment_join_repo.drop_status_from_deleted_experiments()
except Exception as exc:
cls.logger.error(
f"[{cls.id}] Error while clearing missing experiments status: {exc}"
)

@classmethod
def _get_experiments(cls) -> List[ExperimentModel]:
"""
Get the experiments list
"""
with create_autosubmit_db_engine().connect() as conn:
query_result = conn.execute(tables.experiment_table.select()).all()
return [ExperimentModel.model_validate(row._mapping) for row in query_result]
experiment_repository = create_experiment_repository()
return experiment_repository.get_all()

@classmethod
def _get_current_status(cls) -> Dict[str, str]:
"""
Get the current status of the experiments
"""
with create_as_times_db_engine().connect() as conn:
query_result = conn.execute(tables.experiment_status_table.select()).all()
return {row.name: row.status for row in query_result}
status_repository = create_experiment_status_repository()
experiment_statuses = status_repository.get_all()
return {row.name: row.status for row in experiment_statuses}

@classmethod
def _check_exp_running(cls, expid: str) -> bool:
Expand Down Expand Up @@ -87,30 +80,17 @@ def _check_exp_running(cls, expid: str) -> bool:

@classmethod
def _update_experiment_status(cls, experiment: ExperimentModel, is_running: bool):
with create_as_times_db_engine().connect() as conn:
try:
del_stmnt = tables.experiment_status_table.delete().where(
tables.experiment_status_table.c.exp_id == experiment.id
)
ins_stmnt = tables.experiment_status_table.insert().values(
exp_id=experiment.id,
name=experiment.name,
status=(
RunningStatus.RUNNING
if is_running
else RunningStatus.NOT_RUNNING
),
seconds_diff=0,
modified=datetime.now().isoformat(sep="-", timespec="seconds"),
)
conn.execute(del_stmnt)
conn.execute(ins_stmnt)
conn.commit()
except Exception as exc:
conn.rollback()
cls.logger.error(
f"[{cls.id}] Error while doing database operations on experiment {experiment.name}: {exc}"
)
status_repository = create_experiment_status_repository()
try:
status_repository.upsert_status(
experiment.id,
experiment.name,
RunningStatus.RUNNING if is_running else RunningStatus.NOT_RUNNING,
)
except Exception as exc:
cls.logger.error(
f"[{cls.id}] Error while doing database operations on experiment {experiment.name}: {exc}"
)

@classmethod
def procedure(cls):
Expand All @@ -131,10 +111,7 @@ def procedure(cls):
new_status = (
RunningStatus.RUNNING if is_running else RunningStatus.NOT_RUNNING
)
if (
current_status.get(experiment.name, RunningStatus.NOT_RUNNING)
!= new_status
):
if current_status.get(experiment.name) != new_status:
cls.logger.info(
f"[{cls.id}] Updating status of {experiment.name} to {new_status}"
)
Expand Down
33 changes: 14 additions & 19 deletions autosubmit_api/builders/experiment_builder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import datetime
from autosubmit_api.logger import logger
from autosubmit_api.builders import BaseBuilder
from autosubmit_api.database import tables
from autosubmit_api.database.common import (
create_autosubmit_db_engine,
)
from autosubmit_api.database.models import ExperimentModel
from autosubmit_api.persistance.pkl_reader import PklReader
from autosubmit_api.repositories.experiment import create_experiment_repository
from autosubmit_api.repositories.experiment_details import (
create_experiment_details_repository,
)


class ExperimentBuilder(BaseBuilder):
Expand All @@ -15,7 +16,8 @@ def produce_pkl_modified_time(self):
"""
try:
self._product.modified = datetime.datetime.fromtimestamp(
PklReader(self._product.name).get_modified_time()
PklReader(self._product.name).get_modified_time(),
tz=datetime.timezone.utc
).isoformat()
except Exception:
self._product.modified = None
Expand All @@ -30,12 +32,7 @@ def produce_base(self, expid):
"""
Produce basic information from the main experiment table
"""
with create_autosubmit_db_engine().connect() as conn:
result = conn.execute(
tables.experiment_table.select().where(
tables.experiment_table.c.name == expid
)
).one()
result = create_experiment_repository().get_by_expid(expid)

# Set new product
self._product = ExperimentModel(
Expand All @@ -50,20 +47,18 @@ def produce_details(self):
Produce data from the details table
"""
exp_id = self._product.id
with create_autosubmit_db_engine().connect() as conn:
result = conn.execute(
tables.details_table.select().where(
tables.details_table.c.exp_id == exp_id
)
).one_or_none()

# Set details props
if result:
try:
result = create_experiment_details_repository().get_by_exp_id(exp_id)

# Set details props
self._product.user = result.user
self._product.created = result.created
self._product.model = result.model
self._product.branch = result.branch
self._product.hpc = result.hpc
except Exception:
logger.error(f"Error getting details for exp_id {exp_id}")

@property
def product(self) -> ExperimentModel:
Expand Down
34 changes: 5 additions & 29 deletions autosubmit_api/experiment/common_db_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import traceback
import sqlite3
from datetime import datetime
from typing import Tuple
from autosubmit_api.logger import logger
from autosubmit_api.config.basicConfig import APIBasicConfig
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_as_times_db_engine
from autosubmit_api.repositories.experiment_status import create_experiment_status_repository

APIBasicConfig.read()
DB_FILES_STATUS = os.path.join(
Expand Down Expand Up @@ -76,25 +76,7 @@ def get_last_read_archive_status():

# SELECTS


def get_experiment_status():
"""
Gets table experiment_status as dictionary
conn is expected to reference as_times.db
"""
experiment_status = dict()
try:
with create_as_times_db_engine().connect() as conn:
cursor = conn.execute(tables.experiment_status_table.select())
for row in cursor:
experiment_status[row.name] = row.status
except Exception as exc:
logger.error(f"Exception while reading experiment_status: {exc}")
logger.error(traceback.format_exc())
return experiment_status


def get_specific_experiment_status(expid):
def get_specific_experiment_status(expid: str) -> Tuple[str, str]:
"""
Gets the current status from database.\n
:param expid: Experiment name
Expand All @@ -103,14 +85,8 @@ def get_specific_experiment_status(expid):
:rtype: 2-tuple (name, status)
"""
try:
with create_as_times_db_engine().connect() as conn:
row = conn.execute(
tables.experiment_status_table.select().where(
tables.experiment_status_table.c.name == expid
)
).one_or_none()
if row:
return (row.name, row.status)
row = create_experiment_status_repository().get_by_expid(expid)
return (row.name, row.status)
except Exception as exc:
logger.error(f"Exception while reading experiment_status for {expid}: {exc}")
logger.error(traceback.format_exc())
Expand Down
33 changes: 12 additions & 21 deletions autosubmit_api/persistance/job_package_reader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import Dict, List
from sqlalchemy import select
from autosubmit_api.logger import logger
from autosubmit_api.database import tables
from autosubmit_api.database.common import AttachedDatabaseConnBuilder
from autosubmit_api.persistance.experiment import ExperimentPaths
from autosubmit_api.repositories.job_packages import create_job_packages_repository


class JobPackageReader:
Expand All @@ -17,23 +14,17 @@ def __init__(self, expid: str) -> None:
self._package_to_symbol: Dict[str, str] = {}

def read(self):
conn_builder = AttachedDatabaseConnBuilder()
conn_builder.attach_db(
ExperimentPaths(self.expid).job_packages_db, "job_packages"
)

with conn_builder.product as conn:
try:
statement = select(tables.JobPackageTable)
self._content = [x._mapping for x in conn.execute(statement).all()]
if len(self._content) == 0:
raise Warning(
"job_packages table empty, trying wrapper_job_packages"
)
except Exception as exc:
logger.warning(exc)
statement = select(tables.WrapperJobPackageTable)
self._content = [x._mapping for x in conn.execute(statement).all()]
try:
raw_content = create_job_packages_repository(self.expid).get_all()
self._content = [x.model_dump() for x in raw_content]
if len(self._content) == 0:
raise Warning("job_packages table empty, trying wrapper_job_packages")
except Exception as exc:
logger.warning(exc)
raw_content = create_job_packages_repository(
self.expid, wrapper=True
).get_all()
self._content = [x.model_dump() for x in raw_content]

self._build_job_to_package()
self._build_package_to_jobs()
Expand Down
79 changes: 79 additions & 0 deletions autosubmit_api/repositories/experiment_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Any
from pydantic import BaseModel
from sqlalchemy import Engine, Table
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_autosubmit_db_engine


class ExperimentDetailsModel(BaseModel):
exp_id: Any
user: Any
created: Any
model: Any
branch: Any
hpc: Any


class ExperimentDetailsRepository(ABC):
@abstractmethod
def insert_many(self, values: List[Dict[str, Any]]) -> int:
"""
Insert many rows into the details table.
"""

@abstractmethod
def delete_all(self) -> int:
"""
Clear the details table.
"""

@abstractmethod
def get_by_exp_id(self, exp_id: int) -> ExperimentDetailsModel:
"""
Get the experiment details by exp_id
:param exp_id: The numerical experiment id
:return experiment: The experiment details
:raises ValueError: If the experiment detail is not found
"""


class ExperimentDetailsSQLRepository(ExperimentDetailsRepository):
def __init__(self, engine: Engine, table: Table):
self.engine = engine
self.table = table

def insert_many(self, values: List[Dict[str, Any]]) -> int:
with self.engine.connect() as conn:
statement = self.table.insert()
result = conn.execute(statement, values)
conn.commit()
return result.rowcount

def delete_all(self) -> int:
with self.engine.connect() as conn:
statement = self.table.delete()
result = conn.execute(statement)
conn.commit()
return result.rowcount

def get_by_exp_id(self, exp_id):
with self.engine.connect() as conn:
statement = self.table.select().where(self.table.c.exp_id == exp_id)
result = conn.execute(statement).first()
if not result:
raise ValueError(f"Experiment detail with exp_id {exp_id} not found")
return ExperimentDetailsModel(
exp_id=result.exp_id,
user=result.user,
created=result.created,
model=result.model,
branch=result.branch,
hpc=result.hpc,
)


def create_experiment_details_repository() -> ExperimentDetailsRepository:
engine = create_autosubmit_db_engine()
return ExperimentDetailsSQLRepository(engine, tables.details_table)
Loading

0 comments on commit 00c1821

Please sign in to comment.