-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_utils.py
202 lines (152 loc) · 7.22 KB
/
db_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import psycopg2
from typing import List, Optional
from sqlalchemy import create_engine
import pandas as pd
import yaml
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
import numpy as np
class DataTransform:
def __init__(self, df: pd.DataFrame):
self.df = df
def convert_to_categorical(self, columns: List[str]):
for col in columns:
self.df[col] = self.df[col].astype('category')
def apply_transformations(self):
categorical_columns = ['month', 'operating_systems', 'browser', 'region', 'traffic_type', 'visitor_type']
self.convert_to_categorical(categorical_columns)
class DataFrameInfo:
def __init__(self, df: pd.DataFrame):
self.df = df
def describe_columns(self) -> pd.Series:
return self.df.dtypes
def extract_statistical_values(self) -> pd.DataFrame:
return self.df.describe(include='all').loc[['mean', 'std', '50%']]
def count_distinct_values(self) -> pd.Series:
return self.df.select_dtypes(include=['category']).nunique()
def print_shape(self):
return self.df.shape
def count_null_values(self) -> pd.Series:
null_count = self.df.isnull().sum()
null_percentage = (self.df.isnull().sum() / len(self.df)) * 100
return pd.DataFrame({'Count': null_count, 'Percentage (%)': null_percentage})
class Plotter:
def plot_null_removal(self, before: pd.Series, after: pd.Series):
df = pd.DataFrame({'Before': before, 'After': after})
df.plot(kind='bar', figsize=(10, 5))
plt.title('Null Value Removal')
plt.xlabel('Columns')
plt.ylabel('Null Value Count')
plt.show()
def visualize_data_distribution(self, df: pd.DataFrame, columns: List[str]):
for col in columns:
sns.kdeplot(df[col], fill=True)
plt.title(f'Distribution for {col}')
plt.show()
class DataFrameTransform(DataFrameInfo):
def drop_columns(self, columns: List[str]):
self.df.drop(columns=columns, inplace=True)
def impute_columns(self):
for col in self.df.columns:
if self.df[col].isnull().sum() > 0:
if self.df[col].dtype == 'object' or self.df[col].dtype.name == 'category':
self.df[col].fillna(self.df[col].mode()[0], inplace=True)
else:
if self.df[col].dtype in ['int64', 'float64']:
if self.df[col].skew() > 1:
self.df[col].fillna(self.df[col].median(), inplace=True)
else:
self.df[col].fillna(self.df[col].mean(), inplace=True)
def identify_skewed_columns(self, threshold: float = 0.5) -> List[str]:
numeric_cols = self.df.select_dtypes(include=['int64', 'float64'])
skewed_cols = numeric_cols.columns[numeric_cols.skew().abs() > threshold].tolist()
return skewed_cols
def transform_skewed_columns(self, columns: List[str]):
for col in columns:
if self.df[col].min() > 0:
try:
self.df[col], _ = stats.boxcox(self.df[col])
except:
print(f"Could not transform {col} using boxcox.")
else:
try:
self.df[col], _ = stats.yeojohnson(self.df[col])
except:
print(f"Could not transform {col} using yeojohnson.")
def handle_outliers(self, columns: List[str], method="IQR"):
for col in columns:
if method == "IQR":
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
self.df = self.df[(self.df[col] >= lower_bound) & (self.df[col] <= upper_bound)]
def compute_correlation_matrix(self):
return self.df.corr(numeric_only=True)
def visualize_correlation_matrix(self, matrix):
plt.figure(figsize=(15, 10))
sns.heatmap(matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1)
plt.title("Correlation Matrix")
plt.show()
def identify_highly_correlated_columns(self, threshold=0.9):
corr_matrix = self.df.corr(numeric_only=True).abs()
upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool_))
to_drop = [column for column in upper_triangle.columns if any(upper_triangle[column] > threshold)]
return to_drop
def remove_columns(self, columns: List[str]):
self.df.drop(columns=columns, inplace=True)
class RDSDatabaseConnector:
def __init__(self, credentials: dict):
self.host = credentials['RDS_HOST']
self.port = credentials['RDS_PORT']
self.dbname = credentials['RDS_DATABASE']
self.user = credentials['RDS_USER']
self.password = credentials['RDS_PASSWORD']
self.connection = None
def create_engine(self):
url = f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.dbname}"
self.engine = create_engine(url)
def extract_data(self, table_name: str) -> pd.DataFrame:
query = f"SELECT * FROM {table_name};"
data_frame = pd.read_sql(query, self.engine)
return data_frame
def save_data_to_csv(data_frame: pd.DataFrame, filename: str):
data_frame.to_csv(filename, index=False)
def load_db_credentials(filepath='credentials.yaml') -> dict:
with open(filepath, 'r') as file:
credentials = yaml.safe_load(file)
return credentials
if __name__ == "__main__":
credentials = load_db_credentials()
connector = RDSDatabaseConnector(credentials)
connector.create_engine()
data_frame = connector.extract_data('customer_activity')
transformer = DataTransform(data_frame)
transformer.apply_transformations()
eda_transform = DataFrameTransform(data_frame)
null_values_before = eda_transform.count_null_values()['Count']
print("Null Values Before:", null_values_before)
eda_transform.impute_columns()
null_values_after = eda_transform.count_null_values()['Count']
print("Null Values After:", null_values_after)
plotter = Plotter()
plotter.plot_null_removal(null_values_before, null_values_after)
skewed_cols = eda_transform.identify_skewed_columns()
numeric_cols = data_frame.select_dtypes(include=['int64', 'float64']).columns.tolist()
plotter.visualize_data_distribution(data_frame, numeric_cols)
eda_transform.handle_outliers(numeric_cols)
plotter.visualize_data_distribution(data_frame, numeric_cols)
for col in skewed_cols:
sns.kdeplot(data_frame[col], label='Before Skew Transformation', fill=True)
eda_transform.transform_skewed_columns(skewed_cols)
for col in skewed_cols:
sns.kdeplot(data_frame[col], label='After Skew Transformation', fill=True)
plt.legend()
plt.show()
correlation_matrix = eda_transform.compute_correlation_matrix()
eda_transform.visualize_correlation_matrix(correlation_matrix)
columns_to_remove = eda_transform.identify_highly_correlated_columns()
eda_transform.remove_columns(columns_to_remove)
save_data_to_csv(data_frame, 'customer_activity_transformed_corrected_skew.csv')