Skip to content

Commit

Permalink
refactor ExperimentGraphDrawing
Browse files Browse the repository at this point in the history
  • Loading branch information
LuiggiTenorioK committed Nov 13, 2024
1 parent 2579e8b commit 6a97e6d
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 177 deletions.
13 changes: 9 additions & 4 deletions autosubmit_api/database/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,19 @@ def create_main_db_conn() -> Connection:
return builder.product


def create_sqlite_db_engine(db_path: str) -> Engine:
"""
Create an engine for a SQLite DDBB.
"""
return create_engine(f"sqlite:///{ os.path.abspath(db_path)}", poolclass=NullPool)


def create_autosubmit_db_engine() -> Engine:
"""
Create an engine for the autosubmit DDBB. Usually named autosubmit.db
"""
APIBasicConfig.read()
return create_engine(
f"sqlite:///{ os.path.abspath(APIBasicConfig.DB_PATH)}", poolclass=NullPool
)
return create_sqlite_db_engine(APIBasicConfig.DB_PATH)


def create_as_times_db_engine() -> Engine:
Expand All @@ -71,7 +76,7 @@ def create_as_times_db_engine() -> Engine:
"""
APIBasicConfig.read()
db_path = os.path.join(APIBasicConfig.DB_DIR, APIBasicConfig.AS_TIMES_DB)
return create_engine(f"sqlite:///{ os.path.abspath(db_path)}", poolclass=NullPool)
return create_sqlite_db_engine(db_path)


def execute_with_limit_offset(
Expand Down
154 changes: 23 additions & 131 deletions autosubmit_api/database/db_jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import traceback
import sqlite3
import collections
from typing import List, Optional, Tuple
import portalocker
from datetime import datetime, timedelta
from json import loads
from time import mktime
from autosubmit_api.logger import logger
from autosubmit_api.components.jobs.utils import generate_job_html_title
# from networkx import DiGraph
from autosubmit_api.config.basicConfig import APIBasicConfig
Expand All @@ -39,6 +40,7 @@
from bscearth.utils.date import Log

from autosubmit_api.persistance.experiment import ExperimentPaths
from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository


# Version 15 includes out err MaxRSS AveRSS and rowstatus
Expand Down Expand Up @@ -425,41 +427,6 @@ def energy(self, energy):
self._energy = energy if energy else 0


class JobStepExtraData():
def __init__(self, key, dict_data):
self.key = key
if isinstance(dict_data, dict):
# dict_data["ncpus"] if dict_data and "ncpus" in dict_data.keys(
self.ncpus = dict_data.get("ncpus", 0) if dict_data else 0
# ) else 0
self.nnodes = dict_data.get(
"nnodes", 0) if dict_data else 0 # and "nnodes" in dict_data.keys(
# ) else 0
self.submit = int(mktime(datetime.strptime(dict_data["submit"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "submit" in list(dict_data.keys(
)) else 0
self.start = int(mktime(datetime.strptime(dict_data["start"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "start" in list(dict_data.keys(
)) else 0
self.finish = int(mktime(datetime.strptime(dict_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "finish" in list(dict_data.keys(
)) and dict_data["finish"] != "Unknown" else 0
self.energy = parse_output_number(dict_data["energy"]) if dict_data and "energy" in list(dict_data.keys(
)) else 0
# if dict_data and "MaxRSS" in dict_data.keys(
self.maxRSS = dict_data.get("MaxRSS", 0)
# ) else 0
# if dict_data and "AveRSS" in dict_data.keys(
self.aveRSS = dict_data.get("AveRSS", 0)
# ) else 0
else:
self.ncpus = 0
self.nnodes = 0
self.submit = 0
self.start = 0
self.finish = 0
self.energy = 0
self.maxRSS = 0
self.aveRSS = 0


class MainDataBase():
def __init__(self, expid):
self.expid = expid
Expand Down Expand Up @@ -523,39 +490,17 @@ def create_index(self):
return None


class ExperimentGraphDrawing(MainDataBase):
def __init__(self, expid):
class ExperimentGraphDrawing:
def __init__(self, expid: str):
"""
Sets and validates graph drawing.
:param expid: Name of experiment
:type expid: str
:param allJobs: list of all jobs objects (usually from job_list)
:type allJobs: list()
"""
MainDataBase.__init__(self, expid)
APIBasicConfig.read()
self.expid = expid
exp_paths = ExperimentPaths(expid)
self.folder_path = APIBasicConfig.LOCAL_ROOT_DIR
self.database_path = exp_paths.graph_data_db
self.create_table_query = textwrap.dedent(
'''CREATE TABLE
IF NOT EXISTS experiment_graph_draw (
id INTEGER PRIMARY KEY,
job_name text NOT NULL,
x INTEGER NOT NULL,
y INTEGER NOT NULL
);''')

if not os.path.exists(self.database_path):
os.umask(0)
if not os.path.exists(os.path.dirname(self.database_path)):
os.makedirs(os.path.dirname(self.database_path))
os.open(self.database_path, os.O_WRONLY | os.O_CREAT, 0o777)
self.conn = self.create_connection(self.database_path)
self.create_table()
else:
self.conn = self.create_connection(self.database_path)
self.graph_data_repository = create_exp_graph_layout_repository(expid)
self.lock_name = "calculation_in_progress.lock"
self.current_position_dictionary = None
self.current_jobs_set = set()
Expand Down Expand Up @@ -607,7 +552,6 @@ def calculate_drawing(self, allJobs, independent=False, num_chunks=48, job_dicti
lock_path_file = os.path.join(self.folder_path, lock_name)
try:
with portalocker.Lock(lock_path_file, timeout=1) as fh:
self.conn = self.create_connection(self.database_path)
monitor = Monitor()
graph = monitor.create_tree_list(
self.expid, allJobs, None, dict(), False, job_dictionary)
Expand Down Expand Up @@ -671,46 +615,35 @@ def set_current_position(self):
self.current_position_dictionary = {row[1]: (row[2], row[3]) for row in current_table}
self.current_jobs_set = set(self.current_position_dictionary.keys())

def _get_current_position(self):
def _get_current_position(self) -> List[Tuple[int, str, int, int]]:
"""
Get all registers from experiment_graph_draw.\n
:return: row content: id, job_name, x, y
:rtype: 4-tuple (int, str, int, int)
"""
try:
if self.conn:
# conn = create_connection(DB_FILE_AS_TIMES)
self.conn.text_factory = str
cur = self.conn.cursor()
cur.execute(
"SELECT id, job_name, x, y FROM experiment_graph_draw")
rows = cur.fetchall()
return rows
return None
result = self.graph_data_repository.get_all()
return [
(item.id, item.job_name, item.x, item.y)
for item in result
]
except Exception as exp:
print((traceback.format_exc()))
print((str(exp)))
return None

def _insert_many_graph_coordinates(self, values):
def _insert_many_graph_coordinates(
self, values: List[Tuple[str, int, int]]
) -> Optional[int]:
"""
Create many graph coordinates
:param conn:
:param details:
:return:
"""
try:
if self.conn:
# exp_id = self._get_id_db()
# conn = create_connection(DB_FILE_AS_TIMES)
# creation_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
sql = ''' INSERT INTO experiment_graph_draw(job_name, x, y) VALUES(?,?,?) '''
# print(row_content)
cur = self.conn.cursor()
cur.executemany(sql, values)
# print(cur)
self.conn.commit()
return cur.lastrowid
_vals = [
{"job_name": item[0], "x": item[1], "y": item[2]} for item in values
]
logger.debug(_vals)
return self.graph_data_repository.insert_many(_vals)
except Exception as exp:
print((traceback.format_exc()))
Log.warning(
Expand All @@ -722,19 +655,12 @@ def _clear_graph_database(self):
Clear all content from graph drawing database
"""
try:
if self.conn:
# conn = create_connection(DB_FILE_AS_TIMES)
# modified_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
sql = ''' DELETE FROM experiment_graph_draw '''
cur = self.conn.cursor()
cur.execute(sql, )
self.conn.commit()
return True
return False
self.graph_data_repository.delete_all()
except Exception as exp:
print((traceback.format_exc()))
print(("Error on Database clear: {}".format(str(exp))))
return False
return True

class JobDataStructure(MainDataBase):

Expand Down Expand Up @@ -995,37 +921,3 @@ def _get_current_job_data(self, run_id, all_states=False):
print(("Error on select job data: {0}".format(
str(type(e).__name__))))
return None


def parse_output_number(string_number):
"""
Parses number in format 1.0K 1.0M 1.0G
:param string_number: String representation of number
:type string_number: str
:return: number in float format
:rtype: float
"""
number = 0.0
if (string_number):
if string_number == "NA":
return 0.0
last_letter = string_number.strip()[-1]
multiplier = 1.0
if last_letter == "G":
multiplier = 1000000000.0
number = string_number[:-1]
elif last_letter == "M":
multiplier = 1000000.0
number = string_number[:-1]
elif last_letter == "K":
multiplier = 1000.0
number = string_number[:-1]
else:
number = string_number
try:
number = float(number) * multiplier
except Exception:
number = 0.0
pass
return number
28 changes: 13 additions & 15 deletions autosubmit_api/database/tables.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import MetaData, Integer, String, Text, Table
from sqlalchemy import Column, MetaData, Integer, String, Text, Table
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped


Expand Down Expand Up @@ -52,18 +52,16 @@ class ExperimentStatusTable(BaseTable):
modified: Mapped[str] = mapped_column(Text, nullable=False)


class GraphDataTable(BaseTable):
"""
Stores the coordinates and it is used exclusively to speed up the process
of generating the graph layout
"""

__tablename__ = "experiment_graph_draw"

id: Mapped[int] = mapped_column(Integer, primary_key=True)
job_name: Mapped[str] = mapped_column(Text, nullable=False)
x: Mapped[int] = mapped_column(Integer, nullable=False)
y: Mapped[int] = mapped_column(Integer, nullable=False)
GraphDataTable = Table(
"experiment_graph_draw",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("job_name", Text, nullable=False),
Column("x", Integer, nullable=False),
Column("y", Integer, nullable=False),
)
"""Stores the coordinates and it is used exclusively
to speed up the process of generating the graph layout"""


class JobPackageTable(BaseTable):
Expand Down Expand Up @@ -103,8 +101,8 @@ class WrapperJobPackageTable(BaseTable):
experiment_status_table: Table = ExperimentStatusTable.__table__

# Graph Data TABLES
graph_data_table: Table = GraphDataTable.__table__
graph_data_table: Table = GraphDataTable

# Job package TABLES
job_package_table: Table = JobPackageTable.__table__
wrapper_job_package_table: Table = WrapperJobPackageTable.__table__
wrapper_job_package_table: Table = WrapperJobPackageTable.__table__
Empty file.
73 changes: 73 additions & 0 deletions autosubmit_api/repositories/experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from abc import ABC, abstractmethod
from typing import List, Optional
from pydantic import BaseModel
from sqlalchemy import Engine, Table
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_autosubmit_db_engine


class ExperimentModel(BaseModel):
id: int
name: str
description: Optional[str] = None
autosubmit_version: Optional[str] = None


class ExperimentRepository(ABC):
@abstractmethod
def get_all(self) -> List[ExperimentModel]:
"""
Get all the experiments
:return experiments: The list of experiments
"""
pass

@abstractmethod
def get_by_expid(self, expid: str) -> ExperimentModel:
"""
Get the experiment by expid
:param expid: The experiment id
:return experiment: The experiment
:raises ValueError: If the experiment is not found
"""
pass


class ExperimentSQLRepository(ExperimentRepository):
def __init__(self, engine: Engine, table: Table):
self.engine = engine
self.table = table

def get_all(self):
with self.engine.connect() as conn:
statement = self.table.select()
result = conn.execute(statement).all()
return [
ExperimentModel(
id=row.id,
name=row.name,
description=row.description,
autosubmit_version=row.autosubmit_version,
)
for row in result
]

def get_by_expid(self, expid: str):
with self.engine.connect() as conn:
statement = self.table.select().where(self.table.c.name == expid)
result = conn.execute(statement).first()
if result is None:
raise ValueError(f"Experiment with id {expid} not found")
return ExperimentModel(
id=result.id,
name=result.name,
description=result.description,
autosubmit_version=result.autosubmit_version,
)


def create_experiment_repository() -> ExperimentRepository:
engine = create_autosubmit_db_engine()
return ExperimentSQLRepository(engine, tables.experiment_table)
Loading

0 comments on commit 6a97e6d

Please sign in to comment.