forked from DataTalksClub/mlops-zoomcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
evidently_metrics_calculation.py
104 lines (85 loc) · 3.36 KB
/
evidently_metrics_calculation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import datetime
import time
import random
import logging
import uuid
import pytz
import pandas as pd
import io
import psycopg
import joblib
from prefect import task, flow
from evidently.report import Report
from evidently import ColumnMapping
from evidently.metrics import ColumnDriftMetric, DatasetDriftMetric, DatasetMissingValuesMetric
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s")
SEND_TIMEOUT = 10
rand = random.Random()
create_table_statement = """
drop table if exists dummy_metrics;
create table dummy_metrics(
timestamp timestamp,
prediction_drift float,
num_drifted_columns integer,
share_missing_values float
)
"""
reference_data = pd.read_parquet('data/reference.parquet')
with open('models/lin_reg.bin', 'rb') as f_in:
model = joblib.load(f_in)
raw_data = pd.read_parquet('data/green_tripdata_2022-02.parquet')
begin = datetime.datetime(2022, 2, 1, 0, 0)
num_features = ['passenger_count', 'trip_distance', 'fare_amount', 'total_amount']
cat_features = ['PULocationID', 'DOLocationID']
column_mapping = ColumnMapping(
prediction='prediction',
numerical_features=num_features,
categorical_features=cat_features,
target=None
)
report = Report(metrics = [
ColumnDriftMetric(column_name='prediction'),
DatasetDriftMetric(),
DatasetMissingValuesMetric()
])
@task
def prep_db():
with psycopg.connect("host=localhost port=5432 user=postgres password=example", autocommit=True) as conn:
res = conn.execute("SELECT 1 FROM pg_database WHERE datname='test'")
if len(res.fetchall()) == 0:
conn.execute("create database test;")
with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example") as conn:
conn.execute(create_table_statement)
@task
def calculate_metrics_postgresql(curr, i):
current_data = raw_data[(raw_data.lpep_pickup_datetime >= (begin + datetime.timedelta(i))) &
(raw_data.lpep_pickup_datetime < (begin + datetime.timedelta(i + 1)))]
#current_data.fillna(0, inplace=True)
current_data['prediction'] = model.predict(current_data[num_features + cat_features].fillna(0))
report.run(reference_data = reference_data, current_data = current_data,
column_mapping=column_mapping)
result = report.as_dict()
prediction_drift = result['metrics'][0]['result']['drift_score']
num_drifted_columns = result['metrics'][1]['result']['number_of_drifted_columns']
share_missing_values = result['metrics'][2]['result']['current']['share_of_missing_values']
curr.execute(
"insert into dummy_metrics(timestamp, prediction_drift, num_drifted_columns, share_missing_values) values (%s, %s, %s, %s)",
(begin + datetime.timedelta(i), prediction_drift, num_drifted_columns, share_missing_values)
)
@flow
def batch_monitoring_backfill():
prep_db()
last_send = datetime.datetime.now() - datetime.timedelta(seconds=10)
with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example", autocommit=True) as conn:
for i in range(0, 27):
with conn.cursor() as curr:
calculate_metrics_postgresql(curr, i)
new_send = datetime.datetime.now()
seconds_elapsed = (new_send - last_send).total_seconds()
if seconds_elapsed < SEND_TIMEOUT:
time.sleep(SEND_TIMEOUT - seconds_elapsed)
while last_send < new_send:
last_send = last_send + datetime.timedelta(seconds=10)
logging.info("data sent")
if __name__ == '__main__':
batch_monitoring_backfill()