26
26
from typing import List , Optional
27
27
from collections import namedtuple
28
28
29
+ from autosubmit_api .repositories .experiment_run import create_experiment_run_repository
30
+ from autosubmit_api .repositories .job_data import create_experiment_job_data_repository
31
+
29
32
class ExperimentHistoryDbManager (DatabaseManager ):
30
33
""" Manages actions directly on the database.
31
34
"""
@@ -37,6 +40,9 @@ def __init__(self, expid: str, basic_config: APIBasicConfig):
37
40
if self .my_database_exists ():
38
41
self .set_db_version_models ()
39
42
43
+ self .runs_repo = create_experiment_run_repository (expid )
44
+ self .jobs_repo = create_experiment_job_data_repository (expid )
45
+
40
46
def set_db_version_models (self ):
41
47
self .db_version = self ._get_pragma_version ()
42
48
self .experiment_run_row_model = Models .get_experiment_row_model (self .db_version )
@@ -56,43 +62,40 @@ def get_experiment_run_dc_with_max_id(self):
56
62
57
63
def _get_experiment_run_with_max_id (self ):
58
64
""" Get Models.ExperimentRunRow for the maximum id run. """
59
- statement = self .get_built_select_statement ("experiment_run" , "run_id > 0 ORDER BY run_id DESC LIMIT 0, 1" )
60
- max_experiment_run = self .get_from_statement (self .historicaldb_file_path , statement )
61
- if len (max_experiment_run ) == 0 :
62
- raise Exception ("No Experiment Runs registered." )
63
- return self .experiment_run_row_model (* max_experiment_run [0 ])
65
+ max_experiment_run = self .runs_repo .get_last_run ()
66
+ return self .experiment_run_row_model (** (max_experiment_run .model_dump ()))
64
67
65
68
def get_experiment_run_by_id (self , run_id : int ) -> Optional [ExperimentRun ]:
66
69
if run_id :
67
70
return ExperimentRun .from_model (self ._get_experiment_run_by_id (run_id ))
68
71
return None
69
72
70
73
def _get_experiment_run_by_id (self , run_id : int ) -> namedtuple :
71
- statement = self .get_built_select_statement ("experiment_run" , "run_id=?" )
72
- arguments = (run_id ,)
73
- experiment_run = self .get_from_statement_with_arguments (self .historicaldb_file_path , statement , arguments )
74
- if len (experiment_run ) == 0 :
75
- raise Exception ("Experiment run {0} for experiment {1} does not exists." .format (run_id , self .expid ))
76
- return self .experiment_run_row_model (* experiment_run [0 ])
74
+ experiment_run = self .runs_repo .get_run_by_id (run_id )
75
+ return self .experiment_run_row_model (** (experiment_run .model_dump ()))
77
76
78
77
def get_experiment_runs_dcs (self ) -> List [ExperimentRun ]:
79
78
experiment_run_rows = self ._get_experiment_runs ()
80
79
return [ExperimentRun .from_model (row ) for row in experiment_run_rows ]
81
80
82
81
def _get_experiment_runs (self ) -> List [namedtuple ]:
83
- statement = self .get_built_select_statement ("experiment_run" )
84
- experiment_runs = self .get_from_statement (self .historicaldb_file_path , statement )
85
- return [self .experiment_run_row_model (* row ) for row in experiment_runs ]
82
+ experiment_runs = self .runs_repo .get_all ()
83
+ return [
84
+ self .experiment_run_row_model (** (run .model_dump ()))
85
+ for run in experiment_runs
86
+ ]
86
87
87
88
def get_job_data_dcs_all (self ) -> List [JobData ]:
88
89
""" Gets all content from job_data ordered by id (from table). """
89
90
return [JobData .from_model (row ) for row in self ._get_job_data_all ()]
90
91
91
92
def _get_job_data_all (self ):
92
93
""" Gets all content from job_data as list of Models.JobDataRow from database. """
93
- statement = self .get_built_select_statement ("job_data" , "id > 0 ORDER BY id" )
94
- job_data_rows = self .get_from_statement (self .historicaldb_file_path , statement )
95
- return [self .job_data_row_model (* row ) for row in job_data_rows ]
94
+ job_data_rows = self .jobs_repo .get_all ()
95
+ return [
96
+ self .job_data_row_model (** (job_data .model_dump ()))
97
+ for job_data in job_data_rows
98
+ ]
96
99
97
100
def get_job_data_dc_COMPLETED_by_wrapper_run_id (self , package_code : int , run_id : int ) -> List [JobData ]:
98
101
if not run_id or package_code <= Models .RowType .NORMAL :
@@ -103,21 +106,23 @@ def get_job_data_dc_COMPLETED_by_wrapper_run_id(self, package_code: int, run_id:
103
106
return [JobData .from_model (row ) for row in job_data_rows ]
104
107
105
108
def _get_job_data_dc_COMPLETED_by_wrapper_run_id (self , package_code : int , run_id : int ) -> List [namedtuple ]:
106
- statement = self .get_built_select_statement ("job_data" , "run_id=? and rowtype=? and status=? ORDER BY id" )
107
- arguments = (run_id , package_code , "COMPLETED" )
108
- job_data_rows = self .get_from_statement_with_arguments (self .historicaldb_file_path , statement , arguments )
109
- return [self .job_data_row_model (* row ) for row in job_data_rows ]
109
+ job_data_rows = self .jobs_repo .get_job_data_COMPLETED_by_rowtype_run_id (package_code , run_id )
110
+ return [
111
+ self .job_data_row_model (** (job_data .model_dump ()))
112
+ for job_data in job_data_rows
113
+ ]
110
114
111
115
def get_job_data_dcs_COMPLETED_by_section (self , section : str ) -> List [JobData ]:
112
116
# arguments = {"status": "COMPLETED", "section": section}
113
117
job_data_rows = self ._get_job_data_COMPLETD_by_section (section )
114
118
return [JobData .from_model (row ) for row in job_data_rows ]
115
119
116
120
def _get_job_data_COMPLETD_by_section (self , section ):
117
- statement = self .get_built_select_statement ("job_data" , "status=? and (section=? or member=?) ORDER BY id" )
118
- arguments = ("COMPLETED" , section , section )
119
- job_data_rows = self .get_from_statement_with_arguments (self .historicaldb_file_path , statement , arguments )
120
- return [self .job_data_row_model (* row ) for row in job_data_rows ]
121
+ job_data_rows = self .jobs_repo .get_job_data_COMPLETD_by_section (section )
122
+ return [
123
+ self .job_data_row_model (** (job_data .model_dump ()))
124
+ for job_data in job_data_rows
125
+ ]
121
126
122
127
def get_all_last_job_data_dcs (self ):
123
128
""" Gets JobData data classes in job_data for last=1. """
@@ -126,20 +131,23 @@ def get_all_last_job_data_dcs(self):
126
131
127
132
def _get_all_last_job_data_rows (self ):
128
133
""" Get List of Models.JobDataRow for last=1. """
129
- statement = self .get_built_select_statement ("job_data" , "last=1 and rowtype >= 2" )
130
- job_data_rows = self .get_from_statement (self .historicaldb_file_path , statement )
131
- return [self .job_data_row_model (* row ) for row in job_data_rows ]
134
+ job_data_rows = self .jobs_repo .get_last_job_data ()
135
+ return [
136
+ self .job_data_row_model (** (job_data .model_dump ()))
137
+ for job_data in job_data_rows
138
+ ]
132
139
133
140
def get_job_data_dcs_by_name (self , job_name : str ) -> List [JobData ]:
134
141
job_data_rows = self ._get_job_data_by_name (job_name )
135
142
return [JobData .from_model (row ) for row in job_data_rows ]
136
143
137
144
def _get_job_data_by_name (self , job_name : str ) -> List [namedtuple ]:
138
145
""" Get List of Models.JobDataRow for job_name """
139
- statement = self .get_built_select_statement ("job_data" , "job_name=? ORDER BY counter DESC" )
140
- arguments = (job_name ,)
141
- job_data_rows = self .get_from_statement_with_arguments (self .historicaldb_file_path , statement , arguments )
142
- return [self .job_data_row_model (* row ) for row in job_data_rows ]
146
+ job_data_rows = self .jobs_repo .get_jobs_by_name (job_name )
147
+ return [
148
+ self .job_data_row_model (** (job_data .model_dump ()))
149
+ for job_data in job_data_rows
150
+ ]
143
151
144
152
def _get_pragma_version (self ) -> int :
145
153
""" Gets current pragma version as int. """
0 commit comments