-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdatabase_link.py
69 lines (62 loc) · 2.33 KB
/
database_link.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import psycopg2
import os
import logging
import traceback
from variables import *
from csv_writers import CSVWriters
from psycopg2.errors import CharacterNotInRepertoire
class DatabaseLink:
"""
Class to link to the database and perform operations on it.
"""
def __init__(self):
self.conn = psycopg2.connect(database=database_name, user=database_user,
password=database_password, host=database_host, port=database_port)
self.cursor = self.conn.cursor()
def __enter__(self):
self.__init__()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.conn.commit()
self.cursor.close()
self.conn.close()
def create_tables(self):
"""
Create tables in the database.
:return: None
"""
with open('sql/create_tables.sql', 'r') as f:
self.cursor.execute(f.read())
self.conn.commit()
def create_indices(self):
"""
Create indices in the database.
:return: None
"""
logging.info('Creating indices')
with open('sql/create_indices.sql', 'r') as f:
self.cursor.execute(f.read())
self.conn.commit()
logging.info('Finished creating indices')
def insert_csvs_into_db(self, date):
"""
Insert CSV files into the database.
:param date: the date of the files to be inserted, corresponds to file name
:return: None
"""
for table in CSVWriters.file_names:
query = f"COPY {table} FROM '{data_path}/{table}-{date}.csv' WITH (FORMAT csv)"
try:
self.cursor.execute(query)
except CharacterNotInRepertoire:
self.conn.rollback()
logging.warn(f'Illegal character in table {table} for {date}, removing null bytes and retrying')
os.system(f"{sed_name} -i 's/\\x00//g' {data_path}/{table}-{date}.csv")
logging.info(f'Removed null bytes from {table}')
self.cursor.execute(query)
except Exception:
self.conn.rollback()
logging.error(f'Error copying table {table} for {date} into database')
logging.error(traceback.format_exc())
self.conn.commit()
logging.info(f'Finished copying {date} into database')