From 6a97e6d5b8df3071670f5dc163be48fbe8edc5a4 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Wed, 6 Nov 2024 16:29:39 +0100 Subject: [PATCH] refactor ExperimentGraphDrawing --- autosubmit_api/database/common.py | 13 +- autosubmit_api/database/db_jobdata.py | 154 +++----------------- autosubmit_api/database/tables.py | 28 ++-- autosubmit_api/repositories/__init__.py | 0 autosubmit_api/repositories/experiment.py | 73 ++++++++++ autosubmit_api/repositories/graph_layout.py | 72 +++++++++ tests/test_graph.py | 50 +++---- tests/test_repositories.py | 47 ++++++ 8 files changed, 260 insertions(+), 177 deletions(-) create mode 100644 autosubmit_api/repositories/__init__.py create mode 100644 autosubmit_api/repositories/experiment.py create mode 100644 autosubmit_api/repositories/graph_layout.py create mode 100644 tests/test_repositories.py diff --git a/autosubmit_api/database/common.py b/autosubmit_api/database/common.py index defe7056..8b99d96b 100644 --- a/autosubmit_api/database/common.py +++ b/autosubmit_api/database/common.py @@ -55,14 +55,19 @@ def create_main_db_conn() -> Connection: return builder.product +def create_sqlite_db_engine(db_path: str) -> Engine: + """ + Create an engine for a SQLite DDBB. + """ + return create_engine(f"sqlite:///{ os.path.abspath(db_path)}", poolclass=NullPool) + + def create_autosubmit_db_engine() -> Engine: """ Create an engine for the autosubmit DDBB. Usually named autosubmit.db """ APIBasicConfig.read() - return create_engine( - f"sqlite:///{ os.path.abspath(APIBasicConfig.DB_PATH)}", poolclass=NullPool - ) + return create_sqlite_db_engine(APIBasicConfig.DB_PATH) def create_as_times_db_engine() -> Engine: @@ -71,7 +76,7 @@ def create_as_times_db_engine() -> Engine: """ APIBasicConfig.read() db_path = os.path.join(APIBasicConfig.DB_DIR, APIBasicConfig.AS_TIMES_DB) - return create_engine(f"sqlite:///{ os.path.abspath(db_path)}", poolclass=NullPool) + return create_sqlite_db_engine(db_path) def execute_with_limit_offset( diff --git a/autosubmit_api/database/db_jobdata.py b/autosubmit_api/database/db_jobdata.py index 321bdf59..f67ca807 100644 --- a/autosubmit_api/database/db_jobdata.py +++ b/autosubmit_api/database/db_jobdata.py @@ -23,10 +23,11 @@ import traceback import sqlite3 import collections +from typing import List, Optional, Tuple import portalocker from datetime import datetime, timedelta from json import loads -from time import mktime +from autosubmit_api.logger import logger from autosubmit_api.components.jobs.utils import generate_job_html_title # from networkx import DiGraph from autosubmit_api.config.basicConfig import APIBasicConfig @@ -39,6 +40,7 @@ from bscearth.utils.date import Log from autosubmit_api.persistance.experiment import ExperimentPaths +from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository # Version 15 includes out err MaxRSS AveRSS and rowstatus @@ -425,41 +427,6 @@ def energy(self, energy): self._energy = energy if energy else 0 -class JobStepExtraData(): - def __init__(self, key, dict_data): - self.key = key - if isinstance(dict_data, dict): - # dict_data["ncpus"] if dict_data and "ncpus" in dict_data.keys( - self.ncpus = dict_data.get("ncpus", 0) if dict_data else 0 - # ) else 0 - self.nnodes = dict_data.get( - "nnodes", 0) if dict_data else 0 # and "nnodes" in dict_data.keys( - # ) else 0 - self.submit = int(mktime(datetime.strptime(dict_data["submit"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "submit" in list(dict_data.keys( - )) else 0 - self.start = int(mktime(datetime.strptime(dict_data["start"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "start" in list(dict_data.keys( - )) else 0 - self.finish = int(mktime(datetime.strptime(dict_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "finish" in list(dict_data.keys( - )) and dict_data["finish"] != "Unknown" else 0 - self.energy = parse_output_number(dict_data["energy"]) if dict_data and "energy" in list(dict_data.keys( - )) else 0 - # if dict_data and "MaxRSS" in dict_data.keys( - self.maxRSS = dict_data.get("MaxRSS", 0) - # ) else 0 - # if dict_data and "AveRSS" in dict_data.keys( - self.aveRSS = dict_data.get("AveRSS", 0) - # ) else 0 - else: - self.ncpus = 0 - self.nnodes = 0 - self.submit = 0 - self.start = 0 - self.finish = 0 - self.energy = 0 - self.maxRSS = 0 - self.aveRSS = 0 - - class MainDataBase(): def __init__(self, expid): self.expid = expid @@ -523,39 +490,17 @@ def create_index(self): return None -class ExperimentGraphDrawing(MainDataBase): - def __init__(self, expid): +class ExperimentGraphDrawing: + def __init__(self, expid: str): """ Sets and validates graph drawing. + :param expid: Name of experiment - :type expid: str - :param allJobs: list of all jobs objects (usually from job_list) - :type allJobs: list() """ - MainDataBase.__init__(self, expid) APIBasicConfig.read() self.expid = expid - exp_paths = ExperimentPaths(expid) self.folder_path = APIBasicConfig.LOCAL_ROOT_DIR - self.database_path = exp_paths.graph_data_db - self.create_table_query = textwrap.dedent( - '''CREATE TABLE - IF NOT EXISTS experiment_graph_draw ( - id INTEGER PRIMARY KEY, - job_name text NOT NULL, - x INTEGER NOT NULL, - y INTEGER NOT NULL - );''') - - if not os.path.exists(self.database_path): - os.umask(0) - if not os.path.exists(os.path.dirname(self.database_path)): - os.makedirs(os.path.dirname(self.database_path)) - os.open(self.database_path, os.O_WRONLY | os.O_CREAT, 0o777) - self.conn = self.create_connection(self.database_path) - self.create_table() - else: - self.conn = self.create_connection(self.database_path) + self.graph_data_repository = create_exp_graph_layout_repository(expid) self.lock_name = "calculation_in_progress.lock" self.current_position_dictionary = None self.current_jobs_set = set() @@ -607,7 +552,6 @@ def calculate_drawing(self, allJobs, independent=False, num_chunks=48, job_dicti lock_path_file = os.path.join(self.folder_path, lock_name) try: with portalocker.Lock(lock_path_file, timeout=1) as fh: - self.conn = self.create_connection(self.database_path) monitor = Monitor() graph = monitor.create_tree_list( self.expid, allJobs, None, dict(), False, job_dictionary) @@ -671,46 +615,35 @@ def set_current_position(self): self.current_position_dictionary = {row[1]: (row[2], row[3]) for row in current_table} self.current_jobs_set = set(self.current_position_dictionary.keys()) - def _get_current_position(self): + def _get_current_position(self) -> List[Tuple[int, str, int, int]]: """ Get all registers from experiment_graph_draw.\n :return: row content: id, job_name, x, y :rtype: 4-tuple (int, str, int, int) """ try: - if self.conn: - # conn = create_connection(DB_FILE_AS_TIMES) - self.conn.text_factory = str - cur = self.conn.cursor() - cur.execute( - "SELECT id, job_name, x, y FROM experiment_graph_draw") - rows = cur.fetchall() - return rows - return None + result = self.graph_data_repository.get_all() + return [ + (item.id, item.job_name, item.x, item.y) + for item in result + ] except Exception as exp: print((traceback.format_exc())) print((str(exp))) return None - def _insert_many_graph_coordinates(self, values): + def _insert_many_graph_coordinates( + self, values: List[Tuple[str, int, int]] + ) -> Optional[int]: """ Create many graph coordinates - :param conn: - :param details: - :return: """ try: - if self.conn: - # exp_id = self._get_id_db() - # conn = create_connection(DB_FILE_AS_TIMES) - # creation_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') - sql = ''' INSERT INTO experiment_graph_draw(job_name, x, y) VALUES(?,?,?) ''' - # print(row_content) - cur = self.conn.cursor() - cur.executemany(sql, values) - # print(cur) - self.conn.commit() - return cur.lastrowid + _vals = [ + {"job_name": item[0], "x": item[1], "y": item[2]} for item in values + ] + logger.debug(_vals) + return self.graph_data_repository.insert_many(_vals) except Exception as exp: print((traceback.format_exc())) Log.warning( @@ -722,19 +655,12 @@ def _clear_graph_database(self): Clear all content from graph drawing database """ try: - if self.conn: - # conn = create_connection(DB_FILE_AS_TIMES) - # modified_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') - sql = ''' DELETE FROM experiment_graph_draw ''' - cur = self.conn.cursor() - cur.execute(sql, ) - self.conn.commit() - return True - return False + self.graph_data_repository.delete_all() except Exception as exp: print((traceback.format_exc())) print(("Error on Database clear: {}".format(str(exp)))) return False + return True class JobDataStructure(MainDataBase): @@ -995,37 +921,3 @@ def _get_current_job_data(self, run_id, all_states=False): print(("Error on select job data: {0}".format( str(type(e).__name__)))) return None - - -def parse_output_number(string_number): - """ - Parses number in format 1.0K 1.0M 1.0G - - :param string_number: String representation of number - :type string_number: str - :return: number in float format - :rtype: float - """ - number = 0.0 - if (string_number): - if string_number == "NA": - return 0.0 - last_letter = string_number.strip()[-1] - multiplier = 1.0 - if last_letter == "G": - multiplier = 1000000000.0 - number = string_number[:-1] - elif last_letter == "M": - multiplier = 1000000.0 - number = string_number[:-1] - elif last_letter == "K": - multiplier = 1000.0 - number = string_number[:-1] - else: - number = string_number - try: - number = float(number) * multiplier - except Exception: - number = 0.0 - pass - return number diff --git a/autosubmit_api/database/tables.py b/autosubmit_api/database/tables.py index a1fd39dc..7f8816c3 100644 --- a/autosubmit_api/database/tables.py +++ b/autosubmit_api/database/tables.py @@ -1,4 +1,4 @@ -from sqlalchemy import MetaData, Integer, String, Text, Table +from sqlalchemy import Column, MetaData, Integer, String, Text, Table from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped @@ -52,18 +52,16 @@ class ExperimentStatusTable(BaseTable): modified: Mapped[str] = mapped_column(Text, nullable=False) -class GraphDataTable(BaseTable): - """ - Stores the coordinates and it is used exclusively to speed up the process - of generating the graph layout - """ - - __tablename__ = "experiment_graph_draw" - - id: Mapped[int] = mapped_column(Integer, primary_key=True) - job_name: Mapped[str] = mapped_column(Text, nullable=False) - x: Mapped[int] = mapped_column(Integer, nullable=False) - y: Mapped[int] = mapped_column(Integer, nullable=False) +GraphDataTable = Table( + "experiment_graph_draw", + metadata_obj, + Column("id", Integer, primary_key=True), + Column("job_name", Text, nullable=False), + Column("x", Integer, nullable=False), + Column("y", Integer, nullable=False), +) +"""Stores the coordinates and it is used exclusively +to speed up the process of generating the graph layout""" class JobPackageTable(BaseTable): @@ -103,8 +101,8 @@ class WrapperJobPackageTable(BaseTable): experiment_status_table: Table = ExperimentStatusTable.__table__ # Graph Data TABLES -graph_data_table: Table = GraphDataTable.__table__ +graph_data_table: Table = GraphDataTable # Job package TABLES job_package_table: Table = JobPackageTable.__table__ -wrapper_job_package_table: Table = WrapperJobPackageTable.__table__ \ No newline at end of file +wrapper_job_package_table: Table = WrapperJobPackageTable.__table__ diff --git a/autosubmit_api/repositories/__init__.py b/autosubmit_api/repositories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/autosubmit_api/repositories/experiment.py b/autosubmit_api/repositories/experiment.py new file mode 100644 index 00000000..f2053c0c --- /dev/null +++ b/autosubmit_api/repositories/experiment.py @@ -0,0 +1,73 @@ +from abc import ABC, abstractmethod +from typing import List, Optional +from pydantic import BaseModel +from sqlalchemy import Engine, Table +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_autosubmit_db_engine + + +class ExperimentModel(BaseModel): + id: int + name: str + description: Optional[str] = None + autosubmit_version: Optional[str] = None + + +class ExperimentRepository(ABC): + @abstractmethod + def get_all(self) -> List[ExperimentModel]: + """ + Get all the experiments + + :return experiments: The list of experiments + """ + pass + + @abstractmethod + def get_by_expid(self, expid: str) -> ExperimentModel: + """ + Get the experiment by expid + + :param expid: The experiment id + :return experiment: The experiment + :raises ValueError: If the experiment is not found + """ + pass + + +class ExperimentSQLRepository(ExperimentRepository): + def __init__(self, engine: Engine, table: Table): + self.engine = engine + self.table = table + + def get_all(self): + with self.engine.connect() as conn: + statement = self.table.select() + result = conn.execute(statement).all() + return [ + ExperimentModel( + id=row.id, + name=row.name, + description=row.description, + autosubmit_version=row.autosubmit_version, + ) + for row in result + ] + + def get_by_expid(self, expid: str): + with self.engine.connect() as conn: + statement = self.table.select().where(self.table.c.name == expid) + result = conn.execute(statement).first() + if result is None: + raise ValueError(f"Experiment with id {expid} not found") + return ExperimentModel( + id=result.id, + name=result.name, + description=result.description, + autosubmit_version=result.autosubmit_version, + ) + + +def create_experiment_repository() -> ExperimentRepository: + engine = create_autosubmit_db_engine() + return ExperimentSQLRepository(engine, tables.experiment_table) diff --git a/autosubmit_api/repositories/graph_layout.py b/autosubmit_api/repositories/graph_layout.py new file mode 100644 index 00000000..3e267265 --- /dev/null +++ b/autosubmit_api/repositories/graph_layout.py @@ -0,0 +1,72 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Union +from pydantic import BaseModel +from sqlalchemy import Engine, Table +from sqlalchemy.schema import CreateTable +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_sqlite_db_engine +from autosubmit_api.persistance.experiment import ExperimentPaths + + +class ExpGraphLayoutModel(BaseModel): + id: Union[int, Any] + job_name: Union[str, Any] + x: Union[float, Any] + y: Union[float, Any] + + +class ExpGraphLayoutRepository(ABC): + @abstractmethod + def get_all(self) -> List[ExpGraphLayoutModel]: + """ + Get all the graph layout data. + """ + + def delete_all(self) -> int: + """ + Delete all the graph layout data. + """ + + def insert_many(self, values: List[Dict[str, Any]]) -> int: + """ + Insert many graph layout data. + """ + + +class ExpGraphLayoutSQLRepository(ExpGraphLayoutRepository): + def __init__(self, expid: str, engine: Engine, table: Table): + self.expid = expid + self.engine = engine + self.table = table + + with self.engine.connect() as conn: + conn.execute(CreateTable(self.table, if_not_exists=True)) + conn.commit() + + def get_all(self) -> List[ExpGraphLayoutModel]: + with self.engine.connect() as conn: + statement = self.table.select() + result = conn.execute(statement).all() + return [ + ExpGraphLayoutModel(id=row.id, job_name=row.job_name, x=row.x, y=row.y) + for row in result + ] + + def delete_all(self) -> int: + with self.engine.connect() as conn: + statement = self.table.delete() + result = conn.execute(statement) + conn.commit() + return result.rowcount + + def insert_many(self, values) -> int: + with self.engine.connect() as conn: + statement = self.table.insert() + result = conn.execute(statement, values) + conn.commit() + return result.rowcount + + +def create_exp_graph_layout_repository(expid: str) -> ExpGraphLayoutRepository: + engine = create_sqlite_db_engine(ExperimentPaths(expid).graph_data_db) + return ExpGraphLayoutSQLRepository(expid, engine, tables.GraphDataTable) diff --git a/tests/test_graph.py b/tests/test_graph.py index 0dc5beda..71f6a025 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -1,6 +1,3 @@ -import os - -from sqlalchemy import create_engine from autosubmit_api.builders.configuration_facade_builder import ( AutosubmitConfigurationFacadeBuilder, ConfigurationFacadeDirector, @@ -9,14 +6,12 @@ JobListLoaderBuilder, JobListLoaderDirector, ) -from autosubmit_api.database import tables from autosubmit_api.database.db_jobdata import ExperimentGraphDrawing from autosubmit_api.monitor.monitor import Monitor -from autosubmit_api.persistance.experiment import ExperimentPaths +from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository class TestPopulateDB: - def test_monitor_dot(self, fixture_mock_basic_config): expid = "a003" job_list_loader = JobListLoaderDirector( @@ -44,32 +39,33 @@ def test_process_graph(self, fixture_mock_basic_config): JobListLoaderBuilder(expid) ).build_loaded_joblist_loader() + assert len(job_list_loader.jobs) == 8 + autosubmit_configuration_facade = ConfigurationFacadeDirector( AutosubmitConfigurationFacadeBuilder(expid) ).build_autosubmit_configuration_facade() - exp_paths = ExperimentPaths(expid) - with create_engine( - f"sqlite:///{ os.path.abspath(exp_paths.graph_data_db)}" - ).connect() as conn: - conn.execute(tables.graph_data_table.delete()) - conn.commit() + # Create repository handler + graph_draw_db = create_exp_graph_layout_repository(expid) - experimentGraphDrawing.calculate_drawing( - allJobs=job_list_loader.jobs, - independent=False, - num_chunks=autosubmit_configuration_facade.chunk_size, - job_dictionary=job_list_loader.job_dictionary, - ) + # Delete content of table + graph_draw_db.delete_all() - assert ( - experimentGraphDrawing.coordinates - and len(experimentGraphDrawing.coordinates) == 8 - ) + experimentGraphDrawing.calculate_drawing( + allJobs=job_list_loader.jobs, + independent=False, + num_chunks=autosubmit_configuration_facade.chunk_size, + job_dictionary=job_list_loader.job_dictionary, + ) + + assert ( + isinstance(experimentGraphDrawing.coordinates, list) + and len(experimentGraphDrawing.coordinates) == 8 + ) - rows = conn.execute(tables.graph_data_table.select()).all() + rows = graph_draw_db.get_all() - assert len(rows) == 8 - for job in rows: - job_name: str = job.job_name - assert job_name.startswith(expid) + assert len(rows) == 8 + for job in rows: + job_name: str = job.job_name + assert job_name.startswith(expid) diff --git a/tests/test_repositories.py b/tests/test_repositories.py new file mode 100644 index 00000000..b9c06586 --- /dev/null +++ b/tests/test_repositories.py @@ -0,0 +1,47 @@ +from autosubmit_api.repositories.experiment import create_experiment_repository +from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository + + +class TestExperimentRepository: + def test_operations(self, fixture_mock_basic_config): + experiment_db = create_experiment_repository() + + EXPIDS = ["a003", "a007", "a3tb", "a6zj"] + + # Check get_all + rows = experiment_db.get_all() + assert len(rows) >= 4 + for expid in EXPIDS: + assert expid in [row.name for row in rows] + + # Check get_by_expid + for expid in EXPIDS: + row = experiment_db.get_by_expid(expid) + assert row.name == expid + + +class TestExpGraphLayoutRepository: + def test_operations(self, fixture_mock_basic_config): + expid = "g001" + graph_draw_db = create_exp_graph_layout_repository(expid) + + # Table exists and is empty + assert graph_draw_db.get_all() == [] + + # Insert data + data = [ + {"id": 1, "job_name": "job1", "x": 1, "y": 2}, + {"id": 2, "job_name": "job2", "x": 2, "y": 3}, + ] + assert graph_draw_db.insert_many(data) == len(data) + + # Get data + graph_data = [x.model_dump() for x in graph_draw_db.get_all()] + assert graph_data == data + + # Delete data + assert graph_draw_db.delete_all() == len(data) + + # Table is empty + graph_data = [x.model_dump() for x in graph_draw_db.get_all()] + assert graph_data == []