From b53e85fb6eaf7da0d304d305b9570a62c28800ff Mon Sep 17 00:00:00 2001 From: ltenorio Date: Thu, 14 Nov 2024 14:43:25 +0100 Subject: [PATCH] refactor JobDataStructure --- .../autosubmit_legacy/job/job_list.py | 3 +- autosubmit_api/database/db_jobdata.py | 393 +++++------------- autosubmit_api/database/tables.py | 68 ++- autosubmit_api/repositories/experiment_run.py | 92 ++++ autosubmit_api/repositories/job_data.py | 194 +++++++++ tests/test_jobdata.py | 36 ++ tests/test_repositories.py | 14 + 7 files changed, 509 insertions(+), 291 deletions(-) create mode 100644 autosubmit_api/repositories/experiment_run.py create mode 100644 autosubmit_api/repositories/job_data.py create mode 100644 tests/test_jobdata.py diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py index 828f2f78..c66e0ba6 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list.py @@ -113,8 +113,7 @@ def get_tree_structured_from_previous_run(expid, BasicConfig, run_id, chunk_unit chunk_size = experiment_run.chunk_size else: raise Exception("Autosubmit couldn't fin the experiment header information necessary to complete this request.") - job_list = job_data_structure.get_current_job_data( - run_id, all_states=True) + job_list = job_data_structure.get_current_job_data(run_id) if not job_list: return [], [], {} else: diff --git a/autosubmit_api/database/db_jobdata.py b/autosubmit_api/database/db_jobdata.py index f67ca807..08ff8bf8 100644 --- a/autosubmit_api/database/db_jobdata.py +++ b/autosubmit_api/database/db_jobdata.py @@ -19,9 +19,7 @@ import os import time -import textwrap import traceback -import sqlite3 import collections from typing import List, Optional, Tuple import portalocker @@ -29,18 +27,25 @@ from json import loads from autosubmit_api.logger import logger from autosubmit_api.components.jobs.utils import generate_job_html_title + # from networkx import DiGraph from autosubmit_api.config.basicConfig import APIBasicConfig from autosubmit_api.monitor.monitor import Monitor from autosubmit_api.performance.utils import calculate_ASYPD_perjob from autosubmit_api.components.jobs.job_factory import SimJob -from autosubmit_api.common.utils import get_jobs_with_no_outliers, Status, datechunk_to_year +from autosubmit_api.common.utils import ( + get_jobs_with_no_outliers, + Status, + datechunk_to_year, +) + # from autosubmitAPIwu.job.job_list # import autosubmitAPIwu.experiment.common_db_requests as DbRequests from bscearth.utils.date import Log -from autosubmit_api.persistance.experiment import ExperimentPaths +from autosubmit_api.repositories.experiment_run import create_experiment_run_repository from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository +from autosubmit_api.repositories.job_data import create_experiment_job_data_repository # Version 15 includes out err MaxRSS AveRSS and rowstatus @@ -427,69 +432,6 @@ def energy(self, energy): self._energy = energy if energy else 0 -class MainDataBase(): - def __init__(self, expid): - self.expid = expid - self.conn = None - self.conn_ec = None - self.create_table_query = None - self.db_version = None - - def create_connection(self, db_file): - """ - Create a database connection to the SQLite database specified by db_file. - :param db_file: database file name - :return: Connection object or None - """ - try: - conn = sqlite3.connect(db_file) - return conn - except Exception: - return None - - def create_table(self): - """ create a table from the create_table_sql statement - :param conn: Connection object - :param create_table_sql: a CREATE TABLE statement - :return: - """ - try: - if self.conn: - c = self.conn.cursor() - c.execute(self.create_table_query) - self.conn.commit() - else: - raise IOError("Not a valid connection") - except IOError as exp: - Log.warning(exp) - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.warning("Error on create table : " + str(type(e).__name__)) - return None - - def create_index(self): - """ Creates index from statement defined in child class - """ - try: - if self.conn: - c = self.conn.cursor() - c.execute(self.create_index_query) - self.conn.commit() - else: - raise IOError("Not a valid connection") - except IOError as exp: - Log.warning(exp) - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.debug(str(type(e).__name__)) - Log.warning("Error on create index . create_index") - return None - - class ExperimentGraphDrawing: def __init__(self, expid: str): """ @@ -662,262 +604,137 @@ def _clear_graph_database(self): return False return True -class JobDataStructure(MainDataBase): - +class JobDataStructure: def __init__(self, expid: str, basic_config: APIBasicConfig): """Initializes the object based on the unique identifier of the experiment. Args: expid (str): Experiment identifier """ - MainDataBase.__init__(self, expid) - # BasicConfig.read() - # self.expid = expid - self.folder_path = basic_config.JOBDATA_DIR - exp_paths = ExperimentPaths(expid) - self.database_path = exp_paths.job_data_db - # self.conn = None - self.db_version = None - # self.jobdata_list = JobDataList(self.expid) - self.create_index_query = textwrap.dedent(''' - CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name); - ''') - if not os.path.exists(self.database_path): - self.conn = None - else: - self.conn = self.create_connection(self.database_path) - self.db_version = self._select_pragma_version() - # self.query_job_historic = None - # Historic only working on DB 12 now - self.query_job_historic = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data WHERE job_name=? ORDER BY counter DESC" - - if self.db_version < DB_VERSION_SCHEMA_CHANGES: - try: - self.create_index() - except Exception as exp: - print(exp) - pass + self.expid = expid + self.experiment_run_data_repository = create_experiment_run_repository(expid) + self.experiment_job_data_repository = create_experiment_job_data_repository( + expid + ) def __str__(self): - return '{} {}'.format("Data structure. Version:", self.db_version) + return f"Run and job data of experiment {self.expid}" - def get_max_id_experiment_run(self): + def get_max_id_experiment_run(self) -> Optional[ExperimentRun]: """ Get last (max) experiment run object. :return: ExperimentRun data :rtype: ExperimentRun object """ try: - # expe = list() - if not os.path.exists(self.database_path): - raise Exception("Job data folder not found {0} or the database version is outdated.".format(str(self.database_path))) - if self.db_version < DB_VERSION_SCHEMA_CHANGES: - print(("Job database version {0} outdated.".format(str(self.db_version)))) - if os.path.exists(self.database_path) and self.db_version >= DB_VERSION_SCHEMA_CHANGES: - modified_time = int(os.stat(self.database_path).st_mtime) - current_experiment_run = self._get_max_id_experiment_run() - if current_experiment_run: - exprun_item = ExperimentRunItem_14( - *current_experiment_run) if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else ExperimentRunItem(*current_experiment_run) - return ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, exprun_item.running, exprun_item.submitted, exprun_item.suspended if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else 0, exprun_item.metadata if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else "", modified_time) - else: - return None - else: - raise Exception("Job data folder not found {0} or the database version is outdated.".format( - str(self.database_path))) - except Exception as exp: - print((str(exp))) + current_experiment_run = self.experiment_run_data_repository.get_last_run() + return ExperimentRun( + current_experiment_run.run_id, + current_experiment_run.created, + current_experiment_run.start, + current_experiment_run.finish, + current_experiment_run.chunk_unit, + current_experiment_run.chunk_size, + current_experiment_run.completed, + current_experiment_run.total, + current_experiment_run.failed, + current_experiment_run.queuing, + current_experiment_run.running, + current_experiment_run.submitted, + current_experiment_run.suspended, + current_experiment_run.metadata, + current_experiment_run.modified, + ) + except Exception as exc: + print((str(exc))) print((traceback.format_exc())) return None - def get_experiment_run_by_id(self, run_id): + def get_experiment_run_by_id(self, run_id: int) -> Optional[ExperimentRun]: """ Get experiment run stored in database by run_id """ try: - # expe = list() - if os.path.exists(self.folder_path) and self.db_version >= DB_VERSION_SCHEMA_CHANGES: - result = None - current_experiment_run = self._get_experiment_run_by_id(run_id) - if current_experiment_run: - # for run in current_experiment_run: - exprun_item = ExperimentRunItem_14( - *current_experiment_run) if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else ExperimentRunItem(*current_experiment_run) - result = ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, - exprun_item.running, exprun_item.submitted, exprun_item.suspended if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else 0, exprun_item.metadata if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else "") - return result - else: - return None - else: - raise Exception("Job data folder not found {0} or the database version is outdated.".format( - str(self.database_path))) - except Exception as exp: + current_experiment_run = self.experiment_run_data_repository.get_run_by_id( + run_id + ) + return ExperimentRun( + current_experiment_run.run_id, + current_experiment_run.created, + current_experiment_run.start, + current_experiment_run.finish, + current_experiment_run.chunk_unit, + current_experiment_run.chunk_size, + current_experiment_run.completed, + current_experiment_run.total, + current_experiment_run.failed, + current_experiment_run.queuing, + current_experiment_run.running, + current_experiment_run.submitted, + current_experiment_run.suspended, + current_experiment_run.metadata, + current_experiment_run.modified, + ) + except Exception as exc: if _debug is True: Log.info(traceback.format_exc()) Log.debug(traceback.format_exc()) Log.warning( - "Autosubmit couldn't retrieve experiment run. get_experiment_run_by_id. Exception {0}".format(str(exp))) + "Autosubmit couldn't retrieve experiment run. get_experiment_run_by_id. Exception {0}".format( + str(exc) + ) + ) return None - def get_current_job_data(self, run_id, all_states=False): + def get_current_job_data(self, run_id: int) -> Optional[List[JobData]]: """ Gets the job historical data for a run_id. :param run_id: Run identifier :type run_id: int - :param all_states: False if only last=1 should be included, otherwise all rows - :return: List of jobdata rows - :rtype: list() of JobData objects """ try: + current_job_data = ( + self.experiment_job_data_repository.get_last_job_data_by_run_id(run_id) + ) + current_collection = [] - if self.db_version < DB_VERSION_SCHEMA_CHANGES: - raise Exception("This function requieres a newer DB version.") - if os.path.exists(self.folder_path): - current_job_data = self._get_current_job_data( - run_id, all_states) - if current_job_data: - for job_data in current_job_data: - if self.db_version >= CURRENT_DB_VERSION: - jobitem = JobItem_15(*job_data) - current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus, - jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes, jobitem.run_id, jobitem.MaxRSS, jobitem.AveRSS, jobitem.out, jobitem.err, jobitem.rowstatus)) - else: - jobitem = JobItem_12(*job_data) - current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus, - jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes, jobitem.run_id)) - return current_collection - return None + for job_data in current_job_data: + current_collection.append( + JobData( + _id=job_data.id, + counter=job_data.counter, + job_name=job_data.job_name, + created=job_data.created, + modified=job_data.modified, + submit=job_data.submit, + start=job_data.start, + finish=job_data.finish, + status=job_data.status, + rowtype=job_data.rowtype, + ncpus=job_data.ncpus, + wallclock=job_data.wallclock, + qos=job_data.qos, + energy=job_data.energy, + date=job_data.date, + section=job_data.section, + member=job_data.member, + chunk=job_data.chunk, + last=job_data.last, + platform=job_data.platform, + job_id=job_data.job_id, + extra_data=job_data.extra_data, + nnodes=job_data.nnodes, + run_id=job_data.run_id, + MaxRSS=job_data.MaxRSS, + AveRSS=job_data.AveRSS, + out=job_data.out, + err=job_data.err, + rowstatus=job_data.rowstatus, + ) + ) + + return current_collection except Exception: print((traceback.format_exc())) - print(( - "Error on returning current job data. run_id {0}".format(run_id))) - return None - - def _get_experiment_run_by_id(self, run_id): - """ - :param run_id: Run Identifier - :type run_id: int - :return: First row that matches the run_id - :rtype: Row as Tuple - """ - try: - if self.conn: - self.conn.text_factory = str - cur = self.conn.cursor() - if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted,suspended, metadata FROM experiment_run WHERE run_id=? and total > 0 ORDER BY run_id DESC", (run_id,)) - else: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted FROM experiment_run WHERE run_id=? and total > 0 ORDER BY run_id DESC", (run_id,)) - rows = cur.fetchall() - if len(rows) > 0: - return rows[0] - else: - return None - else: - raise Exception("Not a valid connection.") - except sqlite3.Error: - if _debug is True: - print((traceback.format_exc())) - print(("Error while retrieving run {0} information. {1}".format( - run_id, "_get_experiment_run_by_id"))) - return None - - def _select_pragma_version(self): - """ Retrieves user_version from database - """ - try: - if self.conn: - self.conn.text_factory = str - cur = self.conn.cursor() - cur.execute("pragma user_version;") - rows = cur.fetchall() - # print("Result {0}".format(str(rows))) - if len(rows) > 0: - # print(rows) - # print("Row " + str(rows[0])) - result, = rows[0] - # print(result) - return int(result) if result >= 0 else None - else: - # Starting value - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.debug(traceback.format_exc()) - Log.warning("Error while retrieving version: " + - str(type(e).__name__)) - return None - - def _get_max_id_experiment_run(self): - """Return the max id from experiment_run - - :return: max run_id, None - :rtype: int, None - """ - try: - if self.conn: - self.conn.text_factory = str - cur = self.conn.cursor() - if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted,suspended, metadata from experiment_run ORDER BY run_id DESC LIMIT 0, 1") - else: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted from experiment_run ORDER BY run_id DESC LIMIT 0, 1") - rows = cur.fetchall() - if len(rows) > 0: - return rows[0] - else: - return None - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.debug(traceback.format_exc()) - Log.warning("Error on select max run_id : " + - str(type(e).__name__)) - return None - - def _get_current_job_data(self, run_id, all_states=False): - """ - Get JobData by run_id. - :param run_id: Run Identifier - :type run_id: int - :param all_states: False if only last=1, True all - :type all_states: bool - """ - try: - if self.conn: - # print("Run {0} states {1} db {2}".format( - # run_id, all_states, self.db_version)) - self.conn.text_factory = str - cur = self.conn.cursor() - request_string = "" - if all_states is False: - if self.db_version >= CURRENT_DB_VERSION: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id" - else: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id" - - else: - if self.db_version >= CURRENT_DB_VERSION: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and rowtype >= 2 ORDER BY id" - else: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id from job_data WHERE run_id=? and rowtype >= 2 ORDER BY id" - - cur.execute(request_string, (run_id,)) - rows = cur.fetchall() - # print(rows) - if len(rows) > 0: - return rows - else: - return None - except sqlite3.Error as e: - if _debug is True: - print((traceback.format_exc())) - print(("Error on select job data: {0}".format( - str(type(e).__name__)))) + print(("Error on returning current job data. run_id {0}".format(run_id))) return None diff --git a/autosubmit_api/database/tables.py b/autosubmit_api/database/tables.py index 7f8816c3..510a99eb 100644 --- a/autosubmit_api/database/tables.py +++ b/autosubmit_api/database/tables.py @@ -1,4 +1,13 @@ -from sqlalchemy import Column, MetaData, Integer, String, Text, Table +from sqlalchemy import ( + Column, + Float, + MetaData, + Integer, + String, + Text, + Table, + UniqueConstraint, +) from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped @@ -106,3 +115,60 @@ class WrapperJobPackageTable(BaseTable): # Job package TABLES job_package_table: Table = JobPackageTable.__table__ wrapper_job_package_table: Table = WrapperJobPackageTable.__table__ + +ExperimentRunTable = Table( + "experiment_run", + metadata_obj, + Column("run_id", Integer, primary_key=True), + Column("created", Text, nullable=False), + Column("modified", Text, nullable=True), + Column("start", Integer, nullable=False), + Column("finish", Integer), + Column("chunk_unit", Text, nullable=False), + Column("chunk_size", Integer, nullable=False), + Column("completed", Integer, nullable=False), + Column("total", Integer, nullable=False), + Column("failed", Integer, nullable=False), + Column("queuing", Integer, nullable=False), + Column("running", Integer, nullable=False), + Column("submitted", Integer, nullable=False), + Column("suspended", Integer, nullable=False, default=0), + Column("metadata", Text), +) + +JobDataTable = Table( + "job_data", + metadata_obj, + Column("id", Integer, nullable=False, primary_key=True), + Column("counter", Integer, nullable=False), + Column("job_name", Text, nullable=False, index=True), + Column("created", Text, nullable=False), + Column("modified", Text, nullable=False), + Column("submit", Integer, nullable=False), + Column("start", Integer, nullable=False), + Column("finish", Integer, nullable=False), + Column("status", Text, nullable=False), + Column("rowtype", Integer, nullable=False), + Column("ncpus", Integer, nullable=False), + Column("wallclock", Text, nullable=False), + Column("qos", Text, nullable=False), + Column("energy", Integer, nullable=False), + Column("date", Text, nullable=False), + Column("section", Text, nullable=False), + Column("member", Text, nullable=False), + Column("chunk", Integer, nullable=False), + Column("last", Integer, nullable=False), + Column("platform", Text, nullable=False), + Column("job_id", Integer, nullable=False), + Column("extra_data", Text, nullable=False), + Column("nnodes", Integer, nullable=False, default=0), + Column("run_id", Integer), + Column("MaxRSS", Float, nullable=False, default=0.0), + Column("AveRSS", Float, nullable=False, default=0.0), + Column("out", Text, nullable=False), + Column("err", Text, nullable=False), + Column("rowstatus", Integer, nullable=False, default=0), + Column("children", Text, nullable=True), + Column("platform_output", Text, nullable=True), + UniqueConstraint("counter", "job_name", name="unique_counter_and_job_name"), +) diff --git a/autosubmit_api/repositories/experiment_run.py b/autosubmit_api/repositories/experiment_run.py new file mode 100644 index 00000000..f52cc82c --- /dev/null +++ b/autosubmit_api/repositories/experiment_run.py @@ -0,0 +1,92 @@ +from abc import ABC, abstractmethod +from typing import Any, List +from pydantic import BaseModel +from sqlalchemy import Engine, Table +from sqlalchemy.schema import CreateTable +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_sqlite_db_engine +from autosubmit_api.persistance.experiment import ExperimentPaths + + +class ExperimentRunModel(BaseModel): + run_id: Any + created: Any + modified: Any + start: Any + finish: Any + chunk_unit: Any + chunk_size: Any + completed: Any + total: Any + failed: Any + queuing: Any + running: Any + submitted: Any + suspended: Any + metadata: Any + + +class ExperimentRunRepository(ABC): + @abstractmethod + def get_all(self) -> List[ExperimentRunModel]: + """ + Gets all runs of the experiment + """ + + @abstractmethod + def get_last_run(self) -> ExperimentRunModel: + """ + Gets last run of the experiment. Raises ValueError if no runs found. + """ + + @abstractmethod + def get_run_by_id(self, run_id: int) -> ExperimentRunModel: + """ + Gets run by id. Raises ValueError if run not found. + """ + + +class ExperimentRunSQLRepository(ExperimentRunRepository): + def __init__(self, expid: str, engine: Engine, table: Table): + self.engine = engine + self.table = table + self.expid = expid + + with self.engine.connect() as conn: + conn.execute(CreateTable(self.table, if_not_exists=True)) + conn.commit() + + def get_all(self): + with self.engine.connect() as conn: + statement = self.table.select() + result = conn.execute(statement).all() + + return [ + ExperimentRunModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_last_run(self): + with self.engine.connect() as conn: + statement = ( + self.table.select().order_by(self.table.c.run_id.desc()) + ) + result = conn.execute(statement).first() + if result is None: + raise ValueError(f"No runs found for experiment {self.expid}") + return ExperimentRunModel.model_validate(result, from_attributes=True) + + def get_run_by_id(self, run_id: int): + with self.engine.connect() as conn: + statement = self.table.select().where(self.table.c.run_id == run_id) + result = conn.execute(statement).first() + if result is None: + raise ValueError( + f"Run with id {run_id} not found for experiment {self.expid}" + ) + return ExperimentRunModel.model_validate(result, from_attributes=True) + + +def create_experiment_run_repository(expid: str): + engine = create_sqlite_db_engine(ExperimentPaths(expid).job_data_db) + return ExperimentRunSQLRepository(expid, engine, tables.ExperimentRunTable) diff --git a/autosubmit_api/repositories/job_data.py b/autosubmit_api/repositories/job_data.py new file mode 100644 index 00000000..72cac369 --- /dev/null +++ b/autosubmit_api/repositories/job_data.py @@ -0,0 +1,194 @@ +from abc import ABC, abstractmethod +from typing import Any, List +from pydantic import BaseModel +from sqlalchemy import Engine, Table, or_, Index +from sqlalchemy.schema import CreateTable +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_sqlite_db_engine +from autosubmit_api.persistance.experiment import ExperimentPaths + + +class ExperimentJobDataModel(BaseModel): + id: Any + counter: Any + job_name: Any + created: Any + modified: Any + submit: Any + start: Any + finish: Any + status: Any + rowtype: Any + ncpus: Any + wallclock: Any + qos: Any + energy: Any + date: Any + section: Any + member: Any + chunk: Any + last: Any + platform: Any + job_id: Any + extra_data: Any + nnodes: Any + run_id: Any + MaxRSS: Any + AveRSS: Any + out: Any + err: Any + rowstatus: Any + children: Any + + +class ExperimentJobDataRepository(ABC): + @abstractmethod + def get_last_job_data_by_run_id(self, run_id: int) -> List[ExperimentJobDataModel]: + """ + Gets last job data of an specific run id + """ + + @abstractmethod + def get_last_job_data(self) -> List[ExperimentJobDataModel]: + """ + Gets last job data + """ + + @abstractmethod + def get_jobs_by_name(self, job_name: str) -> List[ExperimentJobDataModel]: + """ + Gets historical job data by job_name + """ + + @abstractmethod + def get_all(self) -> List[ExperimentJobDataModel]: + """ + Gets all job data + """ + + @abstractmethod + def get_job_data_COMPLETED_by_rowtype_run_id( + self, rowtype: int, run_id: int + ) -> List[ExperimentJobDataModel]: + """ + Gets job data by rowtype and run id + """ + + @abstractmethod + def get_job_data_COMPLETD_by_section( + self, section: str + ) -> List[ExperimentJobDataModel]: + """ + Gets job data by section + """ + + +class ExperimentJobDataSQLRepository(ExperimentJobDataRepository): + def __init__(self, expid: str, engine: Engine, table: Table): + self.engine = engine + self.table = table + self.expid = expid + + with self.engine.connect() as conn: + conn.execute(CreateTable(self.table, if_not_exists=True)) + Index("ID_JOB_NAME", self.table.c.job_name).create(conn, checkfirst=True) + conn.commit() + + def get_last_job_data_by_run_id(self, run_id: int): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where( + (self.table.c.run_id == run_id), + (self.table.c.rowtype >= 2), + ) + .order_by(self.table.c.id.desc()) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_last_job_data(self): + with self.engine.connect() as conn: + statement = self.table.select().where( + (self.table.c.last == 1), + (self.table.c.rowtype >= 2), + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_jobs_by_name(self, job_name: str): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where(self.table.c.job_name == job_name) + .order_by(self.table.c.counter.desc()) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_all(self): + with self.engine.connect() as conn: + statement = ( + self.table.select().where(self.table.c.id > 0).order_by(self.table.c.id) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_job_data_COMPLETED_by_rowtype_run_id(self, rowtype: int, run_id: int): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where( + (self.table.c.rowtype == rowtype), + (self.table.c.run_id == run_id), + (self.table.c.status == "COMPLETED"), + ) + .order_by(self.table.c.id) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_job_data_COMPLETD_by_section(self, section: str): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where( + (self.table.c.status == "COMPLETED"), + or_( + (self.table.c.section == section), + (self.table.c.member == section), + ), + ) + .order_by(self.table.c.id) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + +def create_experiment_job_data_repository(expid: str): + engine = create_sqlite_db_engine(ExperimentPaths(expid).job_data_db) + return ExperimentJobDataSQLRepository(expid, engine, tables.JobDataTable) diff --git a/tests/test_jobdata.py b/tests/test_jobdata.py new file mode 100644 index 00000000..a74d13e5 --- /dev/null +++ b/tests/test_jobdata.py @@ -0,0 +1,36 @@ +from autosubmit_api.database.db_jobdata import JobDataStructure, ExperimentRun + + +class TestJobDataStructure: + def test_valid_operations(self, fixture_mock_basic_config): + expid = "a003" + job_data_db = JobDataStructure(expid, None) + + last_exp_run = job_data_db.get_max_id_experiment_run() + + assert isinstance(last_exp_run, ExperimentRun) + assert last_exp_run.run_id == 3 + assert last_exp_run.total == 8 + + exp_run = job_data_db.get_experiment_run_by_id(2) + assert isinstance(exp_run, ExperimentRun) + assert exp_run.run_id == 2 + assert exp_run.total == 8 + + # Run greater that the last one + exp_run = job_data_db.get_experiment_run_by_id(4) + assert exp_run is None + + job_data = job_data_db.get_current_job_data(3) + assert isinstance(job_data, list) + assert len(job_data) == 8 + + def test_invalid_operations(self, fixture_mock_basic_config): + expid = "404" + job_data_db = JobDataStructure(expid, None) + + last_exp_run = job_data_db.get_max_id_experiment_run() + assert last_exp_run is None + + exp_run = job_data_db.get_experiment_run_by_id(2) + assert exp_run is None diff --git a/tests/test_repositories.py b/tests/test_repositories.py index b9c06586..cea59f4c 100644 --- a/tests/test_repositories.py +++ b/tests/test_repositories.py @@ -1,4 +1,6 @@ +from sqlalchemy import inspect from autosubmit_api.repositories.experiment import create_experiment_repository +from autosubmit_api.repositories.job_data import create_experiment_job_data_repository from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository @@ -45,3 +47,15 @@ def test_operations(self, fixture_mock_basic_config): # Table is empty graph_data = [x.model_dump() for x in graph_draw_db.get_all()] assert graph_data == [] + + +class TestExperimentJobDataRepository: + def test_sql_init(self, fixture_mock_basic_config): + exp_run_repository = create_experiment_job_data_repository("any") + + # Check if index exists and is correct + inspector = inspect(exp_run_repository.engine) + indexes = inspector.get_indexes(exp_run_repository.table.name) + assert len(indexes) == 1 + assert indexes[0]["name"] == "ID_JOB_NAME" + assert indexes[0]["column_names"] == ["job_name"]