Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize existing SQLAlchemy snippets in repository layer #145

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 34 additions & 57 deletions autosubmit_api/bgtasks/tasks/status_updater.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from datetime import datetime
import os
import time
from typing import Dict, List

from sqlalchemy import select
from autosubmit_api.bgtasks.bgtask import BackgroundTaskTemplate
from autosubmit_api.database import tables
from autosubmit_api.database.common import (
create_autosubmit_db_engine,
create_as_times_db_engine,
create_main_db_conn,
)
from autosubmit_api.database.models import ExperimentModel
from autosubmit_api.experiment.common_requests import _is_exp_running
from autosubmit_api.history.database_managers.database_models import RunningStatus
from autosubmit_api.persistance.experiment import ExperimentPaths
from autosubmit_api.repositories.experiment import (
create_experiment_repository,
ExperimentModel,
)
from autosubmit_api.repositories.experiment_status import (
create_experiment_status_repository,
)
from autosubmit_api.repositories.join.experiment_join import (
create_experiment_join_repository,
)


class StatusUpdater(BackgroundTaskTemplate):
Expand All @@ -26,38 +27,30 @@ def _clear_missing_experiments(cls):
"""
Clears the experiments that are not in the experiments table
"""
with create_main_db_conn() as conn:
try:
del_stmnt = tables.experiment_status_table.delete().where(
tables.experiment_status_table.c.exp_id.not_in(
select(tables.experiment_table.c.id)
)
)
conn.execute(del_stmnt)
conn.commit()
except Exception as exc:
conn.rollback()
cls.logger.error(
f"[{cls.id}] Error while clearing missing experiments status: {exc}"
)
try:
experiment_join_repo = create_experiment_join_repository()
experiment_join_repo.drop_status_from_deleted_experiments()
except Exception as exc:
cls.logger.error(
f"[{cls.id}] Error while clearing missing experiments status: {exc}"
)

@classmethod
def _get_experiments(cls) -> List[ExperimentModel]:
"""
Get the experiments list
"""
with create_autosubmit_db_engine().connect() as conn:
query_result = conn.execute(tables.experiment_table.select()).all()
return [ExperimentModel.model_validate(row._mapping) for row in query_result]
experiment_repository = create_experiment_repository()
return experiment_repository.get_all()

@classmethod
def _get_current_status(cls) -> Dict[str, str]:
"""
Get the current status of the experiments
"""
with create_as_times_db_engine().connect() as conn:
query_result = conn.execute(tables.experiment_status_table.select()).all()
return {row.name: row.status for row in query_result}
status_repository = create_experiment_status_repository()
experiment_statuses = status_repository.get_all()
return {row.name: row.status for row in experiment_statuses}

@classmethod
def _check_exp_running(cls, expid: str) -> bool:
Expand Down Expand Up @@ -87,30 +80,17 @@ def _check_exp_running(cls, expid: str) -> bool:

@classmethod
def _update_experiment_status(cls, experiment: ExperimentModel, is_running: bool):
with create_as_times_db_engine().connect() as conn:
try:
del_stmnt = tables.experiment_status_table.delete().where(
tables.experiment_status_table.c.exp_id == experiment.id
)
ins_stmnt = tables.experiment_status_table.insert().values(
exp_id=experiment.id,
name=experiment.name,
status=(
RunningStatus.RUNNING
if is_running
else RunningStatus.NOT_RUNNING
),
seconds_diff=0,
modified=datetime.now().isoformat(sep="-", timespec="seconds"),
)
conn.execute(del_stmnt)
conn.execute(ins_stmnt)
conn.commit()
except Exception as exc:
conn.rollback()
cls.logger.error(
f"[{cls.id}] Error while doing database operations on experiment {experiment.name}: {exc}"
)
status_repository = create_experiment_status_repository()
try:
status_repository.upsert_status(
experiment.id,
experiment.name,
RunningStatus.RUNNING if is_running else RunningStatus.NOT_RUNNING,
)
except Exception as exc:
cls.logger.error(
f"[{cls.id}] Error while doing database operations on experiment {experiment.name}: {exc}"
)

@classmethod
def procedure(cls):
Expand All @@ -131,10 +111,7 @@ def procedure(cls):
new_status = (
RunningStatus.RUNNING if is_running else RunningStatus.NOT_RUNNING
)
if (
current_status.get(experiment.name, RunningStatus.NOT_RUNNING)
!= new_status
):
if current_status.get(experiment.name) != new_status:
cls.logger.info(
f"[{cls.id}] Updating status of {experiment.name} to {new_status}"
)
Expand Down
33 changes: 14 additions & 19 deletions autosubmit_api/builders/experiment_builder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import datetime
from autosubmit_api.logger import logger
from autosubmit_api.builders import BaseBuilder
from autosubmit_api.database import tables
from autosubmit_api.database.common import (
create_autosubmit_db_engine,
)
from autosubmit_api.database.models import ExperimentModel
from autosubmit_api.persistance.pkl_reader import PklReader
from autosubmit_api.repositories.experiment import create_experiment_repository
from autosubmit_api.repositories.experiment_details import (
create_experiment_details_repository,
)


class ExperimentBuilder(BaseBuilder):
Expand All @@ -15,7 +16,8 @@ def produce_pkl_modified_time(self):
"""
try:
self._product.modified = datetime.datetime.fromtimestamp(
PklReader(self._product.name).get_modified_time()
PklReader(self._product.name).get_modified_time(),
tz=datetime.timezone.utc
).isoformat()
except Exception:
self._product.modified = None
Expand All @@ -30,12 +32,7 @@ def produce_base(self, expid):
"""
Produce basic information from the main experiment table
"""
with create_autosubmit_db_engine().connect() as conn:
result = conn.execute(
tables.experiment_table.select().where(
tables.experiment_table.c.name == expid
)
).one()
result = create_experiment_repository().get_by_expid(expid)

# Set new product
self._product = ExperimentModel(
Expand All @@ -50,20 +47,18 @@ def produce_details(self):
Produce data from the details table
"""
exp_id = self._product.id
with create_autosubmit_db_engine().connect() as conn:
result = conn.execute(
tables.details_table.select().where(
tables.details_table.c.exp_id == exp_id
)
).one_or_none()

# Set details props
if result:
try:
result = create_experiment_details_repository().get_by_exp_id(exp_id)

# Set details props
self._product.user = result.user
self._product.created = result.created
self._product.model = result.model
self._product.branch = result.branch
self._product.hpc = result.hpc
except Exception:
logger.error(f"Error getting details for exp_id {exp_id}")

@property
def product(self) -> ExperimentModel:
Expand Down
34 changes: 5 additions & 29 deletions autosubmit_api/experiment/common_db_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import traceback
import sqlite3
from datetime import datetime
from typing import Tuple
from autosubmit_api.logger import logger
from autosubmit_api.config.basicConfig import APIBasicConfig
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_as_times_db_engine
from autosubmit_api.repositories.experiment_status import create_experiment_status_repository

APIBasicConfig.read()
DB_FILES_STATUS = os.path.join(
Expand Down Expand Up @@ -76,25 +76,7 @@ def get_last_read_archive_status():

# SELECTS


def get_experiment_status():
"""
Gets table experiment_status as dictionary
conn is expected to reference as_times.db
"""
experiment_status = dict()
try:
with create_as_times_db_engine().connect() as conn:
cursor = conn.execute(tables.experiment_status_table.select())
for row in cursor:
experiment_status[row.name] = row.status
except Exception as exc:
logger.error(f"Exception while reading experiment_status: {exc}")
logger.error(traceback.format_exc())
return experiment_status


def get_specific_experiment_status(expid):
def get_specific_experiment_status(expid: str) -> Tuple[str, str]:
"""
Gets the current status from database.\n
:param expid: Experiment name
Expand All @@ -103,14 +85,8 @@ def get_specific_experiment_status(expid):
:rtype: 2-tuple (name, status)
"""
try:
with create_as_times_db_engine().connect() as conn:
row = conn.execute(
tables.experiment_status_table.select().where(
tables.experiment_status_table.c.name == expid
)
).one_or_none()
if row:
return (row.name, row.status)
row = create_experiment_status_repository().get_by_expid(expid)
return (row.name, row.status)
except Exception as exc:
logger.error(f"Exception while reading experiment_status for {expid}: {exc}")
logger.error(traceback.format_exc())
Expand Down
33 changes: 12 additions & 21 deletions autosubmit_api/persistance/job_package_reader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import Dict, List
from sqlalchemy import select
from autosubmit_api.logger import logger
from autosubmit_api.database import tables
from autosubmit_api.database.common import AttachedDatabaseConnBuilder
from autosubmit_api.persistance.experiment import ExperimentPaths
from autosubmit_api.repositories.job_packages import create_job_packages_repository


class JobPackageReader:
Expand All @@ -17,23 +14,17 @@ def __init__(self, expid: str) -> None:
self._package_to_symbol: Dict[str, str] = {}

def read(self):
conn_builder = AttachedDatabaseConnBuilder()
conn_builder.attach_db(
ExperimentPaths(self.expid).job_packages_db, "job_packages"
)

with conn_builder.product as conn:
try:
statement = select(tables.JobPackageTable)
self._content = [x._mapping for x in conn.execute(statement).all()]
if len(self._content) == 0:
raise Warning(
"job_packages table empty, trying wrapper_job_packages"
)
except Exception as exc:
logger.warning(exc)
statement = select(tables.WrapperJobPackageTable)
self._content = [x._mapping for x in conn.execute(statement).all()]
try:
raw_content = create_job_packages_repository(self.expid).get_all()
self._content = [x.model_dump() for x in raw_content]
if len(self._content) == 0:
raise Warning("job_packages table empty, trying wrapper_job_packages")
except Exception as exc:
logger.warning(exc)
raw_content = create_job_packages_repository(
self.expid, wrapper=True
).get_all()
self._content = [x.model_dump() for x in raw_content]

self._build_job_to_package()
self._build_package_to_jobs()
Expand Down
79 changes: 79 additions & 0 deletions autosubmit_api/repositories/experiment_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Any
from pydantic import BaseModel
from sqlalchemy import Engine, Table
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_autosubmit_db_engine


class ExperimentDetailsModel(BaseModel):
exp_id: Any
user: Any
created: Any
model: Any
branch: Any
hpc: Any


class ExperimentDetailsRepository(ABC):
@abstractmethod
def insert_many(self, values: List[Dict[str, Any]]) -> int:
"""
Insert many rows into the details table.
"""

@abstractmethod
def delete_all(self) -> int:
"""
Clear the details table.
"""

@abstractmethod
def get_by_exp_id(self, exp_id: int) -> ExperimentDetailsModel:
"""
Get the experiment details by exp_id

:param exp_id: The numerical experiment id
:return experiment: The experiment details
:raises ValueError: If the experiment detail is not found
"""


class ExperimentDetailsSQLRepository(ExperimentDetailsRepository):
def __init__(self, engine: Engine, table: Table):
self.engine = engine
self.table = table

def insert_many(self, values: List[Dict[str, Any]]) -> int:
with self.engine.connect() as conn:
statement = self.table.insert()
result = conn.execute(statement, values)
conn.commit()
return result.rowcount

def delete_all(self) -> int:
with self.engine.connect() as conn:
statement = self.table.delete()
result = conn.execute(statement)
conn.commit()
return result.rowcount

def get_by_exp_id(self, exp_id):
with self.engine.connect() as conn:
statement = self.table.select().where(self.table.c.exp_id == exp_id)
result = conn.execute(statement).first()
if not result:
raise ValueError(f"Experiment detail with exp_id {exp_id} not found")
return ExperimentDetailsModel(
exp_id=result.exp_id,
user=result.user,
created=result.created,
model=result.model,
branch=result.branch,
hpc=result.hpc,
)


def create_experiment_details_repository() -> ExperimentDetailsRepository:
engine = create_autosubmit_db_engine()
return ExperimentDetailsSQLRepository(engine, tables.details_table)
Loading
Loading