6
6
import pandas as pd
7
7
from dotenv import load_dotenv
8
8
from sqlalchemy import create_engine
9
+ from sqlalchemy .exc import PendingRollbackError , SQLAlchemyError
10
+ from sqlalchemy .orm import sessionmaker
9
11
10
12
11
13
class DatabaseManager :
@@ -25,9 +27,15 @@ def __init__(self, dotenv_path=".env"):
25
27
f"mysql+pymysql://{ user } :{ password } @{ host } :{ port } /{ self .database } "
26
28
)
27
29
self .engine = create_engine (database_url )
30
+ self .Session = sessionmaker (bind = self .engine )
28
31
29
32
def execute (self , query ):
30
- return pd .read_sql (query , self .engine )
33
+ try :
34
+ with self .engine .connect () as connection :
35
+ return pd .read_sql (query , connection )
36
+ except SQLAlchemyError as e :
37
+ logging .error (f"Error executing query: { e } " )
38
+ raise
31
39
32
40
def get_minds_cohort (self , query ):
33
41
df = self .execute (query )
@@ -36,9 +44,12 @@ def get_minds_cohort(self, query):
36
44
37
45
def get_gdc_cohort (self , gdc_cohort ):
38
46
cohort = pd .read_csv (gdc_cohort , sep = "\t " , dtype = str )
39
- df = self .execute (
40
- f"SELECT case_id, case_submitter_id FROM { self .database } .clinical WHERE case_id IN { tuple (cohort ['id' ])} "
41
- )
47
+ query = f"""
48
+ SELECT case_id, case_submitter_id
49
+ FROM { self .database } .clinical
50
+ WHERE case_id IN ({ ',' .join ([f"'{ i } '" for i in cohort ['id' ]])} )
51
+ """
52
+ df = self .execute (query )
42
53
cohort = df .groupby ("case_id" )["case_submitter_id" ].unique ()
43
54
return cohort
44
55
@@ -55,29 +66,41 @@ def get_columns(self, table):
55
66
return columns ["Field" ]
56
67
57
68
def update (self , temp_folder ):
58
- # make sure the temp folder exists
59
69
if not os .path .exists (temp_folder ):
60
70
os .makedirs (temp_folder )
61
- # upload all the files to the database as a table
71
+
62
72
logging .info ("Uploading new data to the database" )
63
- for file in os .listdir (temp_folder ):
64
- table_name = file .split ("." )[0 ]
65
- df = pd .read_csv (f"{ temp_folder } /{ file } " , sep = "\t " , dtype = str )
66
- df .replace ("'--" , np .nan , inplace = True )
67
- # if table already exists, append the new data
68
- if table_name in self .get_tables ().tolist ():
69
- logging .info (f"Updating { table_name } " )
70
- df .to_sql (
71
- name = table_name ,
72
- con = self .engine ,
73
- if_exists = "append" ,
74
- index = False ,
75
- chunksize = 1000 ,
76
- )
77
- else :
78
- logging .info (f"Creating { table_name } " )
79
- df .to_sql (
80
- name = table_name , con = self .engine , if_exists = "replace" , index = False
81
- )
82
- logging .info ("Finished uploading to the database" )
83
- shutil .rmtree (temp_folder )
73
+
74
+ session = self .Session ()
75
+ try :
76
+ for file in os .listdir (temp_folder ):
77
+ table_name = file .split ("." )[0 ]
78
+ df = pd .read_csv (f"{ temp_folder } /{ file } " , sep = "\t " , dtype = str )
79
+ df .replace ("'--" , np .nan , inplace = True )
80
+
81
+ if table_name in self .get_tables ().tolist ():
82
+ logging .info (f"Updating { table_name } " )
83
+ df .to_sql (
84
+ name = table_name ,
85
+ con = self .engine ,
86
+ if_exists = "append" ,
87
+ index = False ,
88
+ chunksize = 1000 ,
89
+ )
90
+ else :
91
+ logging .info (f"Creating { table_name } " )
92
+ df .to_sql (
93
+ name = table_name ,
94
+ con = self .engine ,
95
+ if_exists = "replace" ,
96
+ index = False ,
97
+ )
98
+ session .commit ()
99
+ logging .info ("Finished uploading to the database" )
100
+ except (SQLAlchemyError , PendingRollbackError ) as e :
101
+ session .rollback ()
102
+ logging .error (f"Error during update: { e } " )
103
+ raise
104
+ finally :
105
+ session .close ()
106
+ shutil .rmtree (temp_folder )
0 commit comments