-
Notifications
You must be signed in to change notification settings - Fork 1
/
Database.py
129 lines (110 loc) · 4.81 KB
/
Database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import sqlite3
from sqlite3 import DatabaseError
from loguru import logger
from datetime import datetime
class Database:
def __init__(self, db_file):
""" create a database connection to a SQLite database """
# Ensure db_file ends with .db extension
if not db_file.endswith('.db'):
db_file += '.db'
logger.info(f"Initializing the database with the given name {db_file}")
self.conn = None
try:
self.conn = sqlite3.connect(db_file)
print(sqlite3.version)
except DatabaseError as e:
logger.error(f"Couldn't initialize the database with the given name {db_file}")
logger.error(e)
raise DatabaseCantBeInitialized(e)
if self.conn:
logger.info(f"Database initialized with the given name {db_file}")
def create_table(self, table_name, table_structure):
""" create a table with the given name and structure in the database
Make sure to properly format table_name before using this"""
logger.info(f"Creating table {table_name} with structure {table_structure}")
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} ({table_structure})
""")
logger.info(f"Table {table_name} created successfully")
except sqlite3.DatabaseError as e:
print(e)
raise DatabaseCantBeInitialized(e)
def store_data(self, table_name, columns, data):
""" store data in the database """
if self.conn:
try:
logger.info(f"Storing data in the table {table_name} and columns {columns} and data {data}")
cursor = self.conn.cursor()
placeholders = ', '.join('?' * len(data))
table_name = self.format_table_name(table_name)
query = f"""
INSERT OR IGNORE INTO {table_name}({columns}) VALUES({placeholders})
"""
logger.info(f"Query: {query}")
cursor.execute(query, data)
self.conn.commit()
except DatabaseError as e:
print(e)
raise DatabaseCantBeInitialized(e)
def fetch_data(self, table_name, sort_by: str = None):
"""Fetch data from the database, sorted by the specified column."""
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute(f"""
SELECT * FROM {table_name} {"ORDER BY " + sort_by + " DESC" if not sort_by is None else ""}
""")
return cursor.fetchall()
except DatabaseError as e:
print(e)
raise DatabaseCantBeInitialized(e)
def fetch_all_tables(self):
"""Fetch data from all tables in the database."""
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
all_data = {}
for table in tables:
table_name = table[0]
cursor.execute(f"SELECT * FROM {table_name} ORDER BY date DESC")
all_data[table_name] = cursor.fetchall()
return all_data
except DatabaseError as e:
print(e)
raise DatabaseCantBeInitialized(e)
@staticmethod
def format_table_name(table_name: str) -> str:
"""Format the table name."""
table_name = table_name.replace("-", "")
if not table_name.startswith("date"):
return "date" + table_name
return table_name
def fetch_and_sort_table_dates(self):
# Fetch all table names
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
# Remove table names that don't start with "date"
tables = [table for table in tables if table[0].startswith("date")]
# Remove "date" prefix and sort
dates = [table[0][4:] for table in tables]
# Convert strings to datetime objects and sor
sorted_dates = []
for date in dates:
try:
sorted_dates.append(datetime.strptime(date, '%Y%m%d'))
except ValueError:
logger.error(f"Couldn't convert date {date} to datetime object")
sorted_dates = sorted(sorted_dates)
# Convert datetime objects back to strings
sorted_dates = [date.strftime('%Y%m%d') for date in sorted_dates]
logger.info(f"Sorted dates: {sorted_dates}")
return sorted_dates
class DatabaseCantBeInitialized(Exception):
pass