Skip to content

Commit

Permalink
refactor JobDataStructure
Browse files Browse the repository at this point in the history
  • Loading branch information
LuiggiTenorioK committed Nov 19, 2024
1 parent 3375cb0 commit b53e85f
Show file tree
Hide file tree
Showing 7 changed files with 509 additions and 291 deletions.
3 changes: 1 addition & 2 deletions autosubmit_api/autosubmit_legacy/job/job_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ def get_tree_structured_from_previous_run(expid, BasicConfig, run_id, chunk_unit
chunk_size = experiment_run.chunk_size
else:
raise Exception("Autosubmit couldn't fin the experiment header information necessary to complete this request.")
job_list = job_data_structure.get_current_job_data(
run_id, all_states=True)
job_list = job_data_structure.get_current_job_data(run_id)
if not job_list:
return [], [], {}
else:
Expand Down
393 changes: 105 additions & 288 deletions autosubmit_api/database/db_jobdata.py

Large diffs are not rendered by default.

68 changes: 67 additions & 1 deletion autosubmit_api/database/tables.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
from sqlalchemy import Column, MetaData, Integer, String, Text, Table
from sqlalchemy import (
Column,
Float,
MetaData,
Integer,
String,
Text,
Table,
UniqueConstraint,
)
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped


Expand Down Expand Up @@ -106,3 +115,60 @@ class WrapperJobPackageTable(BaseTable):
# Job package TABLES
job_package_table: Table = JobPackageTable.__table__
wrapper_job_package_table: Table = WrapperJobPackageTable.__table__

ExperimentRunTable = Table(
"experiment_run",
metadata_obj,
Column("run_id", Integer, primary_key=True),
Column("created", Text, nullable=False),
Column("modified", Text, nullable=True),
Column("start", Integer, nullable=False),
Column("finish", Integer),
Column("chunk_unit", Text, nullable=False),
Column("chunk_size", Integer, nullable=False),
Column("completed", Integer, nullable=False),
Column("total", Integer, nullable=False),
Column("failed", Integer, nullable=False),
Column("queuing", Integer, nullable=False),
Column("running", Integer, nullable=False),
Column("submitted", Integer, nullable=False),
Column("suspended", Integer, nullable=False, default=0),
Column("metadata", Text),
)

JobDataTable = Table(
"job_data",
metadata_obj,
Column("id", Integer, nullable=False, primary_key=True),
Column("counter", Integer, nullable=False),
Column("job_name", Text, nullable=False, index=True),
Column("created", Text, nullable=False),
Column("modified", Text, nullable=False),
Column("submit", Integer, nullable=False),
Column("start", Integer, nullable=False),
Column("finish", Integer, nullable=False),
Column("status", Text, nullable=False),
Column("rowtype", Integer, nullable=False),
Column("ncpus", Integer, nullable=False),
Column("wallclock", Text, nullable=False),
Column("qos", Text, nullable=False),
Column("energy", Integer, nullable=False),
Column("date", Text, nullable=False),
Column("section", Text, nullable=False),
Column("member", Text, nullable=False),
Column("chunk", Integer, nullable=False),
Column("last", Integer, nullable=False),
Column("platform", Text, nullable=False),
Column("job_id", Integer, nullable=False),
Column("extra_data", Text, nullable=False),
Column("nnodes", Integer, nullable=False, default=0),
Column("run_id", Integer),
Column("MaxRSS", Float, nullable=False, default=0.0),
Column("AveRSS", Float, nullable=False, default=0.0),
Column("out", Text, nullable=False),
Column("err", Text, nullable=False),
Column("rowstatus", Integer, nullable=False, default=0),
Column("children", Text, nullable=True),
Column("platform_output", Text, nullable=True),
UniqueConstraint("counter", "job_name", name="unique_counter_and_job_name"),
)
92 changes: 92 additions & 0 deletions autosubmit_api/repositories/experiment_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from abc import ABC, abstractmethod
from typing import Any, List
from pydantic import BaseModel
from sqlalchemy import Engine, Table
from sqlalchemy.schema import CreateTable
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_sqlite_db_engine
from autosubmit_api.persistance.experiment import ExperimentPaths


class ExperimentRunModel(BaseModel):
run_id: Any
created: Any
modified: Any
start: Any
finish: Any
chunk_unit: Any
chunk_size: Any
completed: Any
total: Any
failed: Any
queuing: Any
running: Any
submitted: Any
suspended: Any
metadata: Any


class ExperimentRunRepository(ABC):
@abstractmethod
def get_all(self) -> List[ExperimentRunModel]:
"""
Gets all runs of the experiment
"""

@abstractmethod
def get_last_run(self) -> ExperimentRunModel:
"""
Gets last run of the experiment. Raises ValueError if no runs found.
"""

@abstractmethod
def get_run_by_id(self, run_id: int) -> ExperimentRunModel:
"""
Gets run by id. Raises ValueError if run not found.
"""


class ExperimentRunSQLRepository(ExperimentRunRepository):
def __init__(self, expid: str, engine: Engine, table: Table):
self.engine = engine
self.table = table
self.expid = expid

with self.engine.connect() as conn:
conn.execute(CreateTable(self.table, if_not_exists=True))
conn.commit()

def get_all(self):
with self.engine.connect() as conn:
statement = self.table.select()
result = conn.execute(statement).all()

return [
ExperimentRunModel.model_validate(row, from_attributes=True)
for row in result
]

def get_last_run(self):
with self.engine.connect() as conn:
statement = (
self.table.select().order_by(self.table.c.run_id.desc())
)
result = conn.execute(statement).first()
if result is None:
raise ValueError(f"No runs found for experiment {self.expid}")
return ExperimentRunModel.model_validate(result, from_attributes=True)

def get_run_by_id(self, run_id: int):
with self.engine.connect() as conn:
statement = self.table.select().where(self.table.c.run_id == run_id)
result = conn.execute(statement).first()
if result is None:
raise ValueError(
f"Run with id {run_id} not found for experiment {self.expid}"
)
return ExperimentRunModel.model_validate(result, from_attributes=True)


def create_experiment_run_repository(expid: str):
engine = create_sqlite_db_engine(ExperimentPaths(expid).job_data_db)
return ExperimentRunSQLRepository(expid, engine, tables.ExperimentRunTable)
194 changes: 194 additions & 0 deletions autosubmit_api/repositories/job_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from abc import ABC, abstractmethod
from typing import Any, List
from pydantic import BaseModel
from sqlalchemy import Engine, Table, or_, Index
from sqlalchemy.schema import CreateTable
from autosubmit_api.database import tables
from autosubmit_api.database.common import create_sqlite_db_engine
from autosubmit_api.persistance.experiment import ExperimentPaths


class ExperimentJobDataModel(BaseModel):
id: Any
counter: Any
job_name: Any
created: Any
modified: Any
submit: Any
start: Any
finish: Any
status: Any
rowtype: Any
ncpus: Any
wallclock: Any
qos: Any
energy: Any
date: Any
section: Any
member: Any
chunk: Any
last: Any
platform: Any
job_id: Any
extra_data: Any
nnodes: Any
run_id: Any
MaxRSS: Any
AveRSS: Any
out: Any
err: Any
rowstatus: Any
children: Any


class ExperimentJobDataRepository(ABC):
@abstractmethod
def get_last_job_data_by_run_id(self, run_id: int) -> List[ExperimentJobDataModel]:
"""
Gets last job data of an specific run id
"""

@abstractmethod
def get_last_job_data(self) -> List[ExperimentJobDataModel]:
"""
Gets last job data
"""

@abstractmethod
def get_jobs_by_name(self, job_name: str) -> List[ExperimentJobDataModel]:
"""
Gets historical job data by job_name
"""

@abstractmethod
def get_all(self) -> List[ExperimentJobDataModel]:
"""
Gets all job data
"""

@abstractmethod
def get_job_data_COMPLETED_by_rowtype_run_id(
self, rowtype: int, run_id: int
) -> List[ExperimentJobDataModel]:
"""
Gets job data by rowtype and run id
"""

@abstractmethod
def get_job_data_COMPLETD_by_section(
self, section: str
) -> List[ExperimentJobDataModel]:
"""
Gets job data by section
"""


class ExperimentJobDataSQLRepository(ExperimentJobDataRepository):
def __init__(self, expid: str, engine: Engine, table: Table):
self.engine = engine
self.table = table
self.expid = expid

with self.engine.connect() as conn:
conn.execute(CreateTable(self.table, if_not_exists=True))
Index("ID_JOB_NAME", self.table.c.job_name).create(conn, checkfirst=True)
conn.commit()

def get_last_job_data_by_run_id(self, run_id: int):
with self.engine.connect() as conn:
statement = (
self.table.select()
.where(
(self.table.c.run_id == run_id),
(self.table.c.rowtype >= 2),
)
.order_by(self.table.c.id.desc())
)
result = conn.execute(statement).all()

return [
ExperimentJobDataModel.model_validate(row, from_attributes=True)
for row in result
]

def get_last_job_data(self):
with self.engine.connect() as conn:
statement = self.table.select().where(
(self.table.c.last == 1),
(self.table.c.rowtype >= 2),
)
result = conn.execute(statement).all()

return [
ExperimentJobDataModel.model_validate(row, from_attributes=True)
for row in result
]

def get_jobs_by_name(self, job_name: str):
with self.engine.connect() as conn:
statement = (
self.table.select()
.where(self.table.c.job_name == job_name)
.order_by(self.table.c.counter.desc())
)
result = conn.execute(statement).all()

return [
ExperimentJobDataModel.model_validate(row, from_attributes=True)
for row in result
]

def get_all(self):
with self.engine.connect() as conn:
statement = (
self.table.select().where(self.table.c.id > 0).order_by(self.table.c.id)
)
result = conn.execute(statement).all()

return [
ExperimentJobDataModel.model_validate(row, from_attributes=True)
for row in result
]

def get_job_data_COMPLETED_by_rowtype_run_id(self, rowtype: int, run_id: int):
with self.engine.connect() as conn:
statement = (
self.table.select()
.where(
(self.table.c.rowtype == rowtype),
(self.table.c.run_id == run_id),
(self.table.c.status == "COMPLETED"),
)
.order_by(self.table.c.id)
)
result = conn.execute(statement).all()

return [
ExperimentJobDataModel.model_validate(row, from_attributes=True)
for row in result
]

def get_job_data_COMPLETD_by_section(self, section: str):
with self.engine.connect() as conn:
statement = (
self.table.select()
.where(
(self.table.c.status == "COMPLETED"),
or_(
(self.table.c.section == section),
(self.table.c.member == section),
),
)
.order_by(self.table.c.id)
)
result = conn.execute(statement).all()

return [
ExperimentJobDataModel.model_validate(row, from_attributes=True)
for row in result
]


def create_experiment_job_data_repository(expid: str):
engine = create_sqlite_db_engine(ExperimentPaths(expid).job_data_db)
return ExperimentJobDataSQLRepository(expid, engine, tables.JobDataTable)
Loading

0 comments on commit b53e85f

Please sign in to comment.