-
Notifications
You must be signed in to change notification settings - Fork 0
/
multi_models.py
161 lines (129 loc) · 5.09 KB
/
multi_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from itertools import chain, combinations
import json
from pickle import dump, load
import numpy as np
import pandas as pd
from scipy.stats import t
from sklearn.compose import make_column_transformer, make_column_selector
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
def make_cat_ohe(drop="first"):
"""Make a one hot encoder that only acts on categorical columns"""
cat_transformer_tuple = (
OneHotEncoder(drop=drop),
make_column_selector(dtype_include="category"),
)
ohe = make_column_transformer(cat_transformer_tuple, remainder="passthrough")
return ohe
def calc_prediction_delta(y, y_pred, alpha=0.90, print_ratio_captured=False):
"""Calculates the half width of the prediction interval, in which the
the fraction of values that fall within this interval is expected to
be `alpha`.
If `print_ratio_captured` is true, the ratio of values actually in the
prediction interval is printed. This should be close to `alpha`.
"""
n = len(y)
resid = y - y_pred
mean_resid = np.mean(y - y_pred)
sN2 = 1 / (n - 1) * sum((resid - mean_resid) ** 2)
dy = t.ppf((1 + alpha) / 2, n - 1) * np.sqrt(sN2) * (1 + 1 / n)
if print_ratio_captured:
print(
"Ratio of values inside prediction interval:"
+ " {:.2f}, mean residual: {:.2g}".format(
np.mean(np.abs(resid + mean_resid) < dy), mean_resid
)
)
return dy
def eval_price_with_pred_interval(X, linreg, dy):
y_predict = linreg.predict(X)
y_pred_w_interval = pd.DataFrame(
{"y": y_predict, "y-dy": y_predict - dy, "y+dy": y_predict + dy}
)
price = np.power(10, y_pred_w_interval).rename(
{"y": "price", "y-dy": "lower", "y+dy": "upper"}, axis="columns"
)
return price
def powerset(iterable, start=0):
""" "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)
This function comes from the python documentation at https://docs.python.org/3/library/itertools.html"""
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(start, len(s) + 1))
def train_set_of_models(features, bmw, dependent="log price"):
models = {}
for feature_set in powerset(features, start=1):
linreg_local = Pipeline(
(("one_hot", make_cat_ohe()), ("regressor", LinearRegression()))
)
X = bmw[list(feature_set)]
y = bmw[dependent]
linreg_local.fit(X, y)
dy = calc_prediction_delta(y, linreg_local.predict(X), alpha=0.90)
models[feature_set] = {"model": linreg_local, "dy_90": dy}
return models
def find_longest_element(keys):
return keys[np.argmax(list(map(len, keys)))]
def scalar_to_list(x):
if np.isscalar(x):
return [x]
else:
return x
def eval_model(models, **kwargs):
features = find_longest_element(list(models.keys()))
# print(features)
for key in kwargs:
if key not in features:
raise ValueError(f"{key} not found in {features}")
chosen_features = tuple(feature for feature in features if feature in kwargs)
model_holder = models[chosen_features]
model = model_holder["model"]
dy = model_holder["dy_90"]
values = (scalar_to_list(kwargs[feature]) for feature in chosen_features)
X = pd.DataFrame(
dict(
zip(
chosen_features,
values,
)
)
)
price = np.power(10, model.predict(X))
lower_90 = np.power(10, model.predict(X) - dy)
upper_90 = np.power(10, model.predict(X) + dy)
price_w_interval = pd.DataFrame(
{"price": price, "90% lower bound": lower_90, "90% upper bound": upper_90}
)
return price_w_interval
def pretty_print_prediction(prediction):
price = float(prediction["price"])
low_bound = float(prediction["90% lower bound"])
upper_bound = float(prediction["90% upper bound"])
return f"Price: ${price:,.0f}. \n90% of prices between ${low_bound:,.0f} and ${upper_bound:,.0f}"
def extract_feature_ranges(df):
features_ranges = {}
for col in df.select_dtypes(include=np.number):
series = df[col]
summary = {
"type": "numeric",
"range": (float(series.min()), float(series.max())),
}
features_ranges[col] = summary
for col in df.select_dtypes(include="category"):
series = df[col]
summary = {"type": "category", "values": list(series.cat.categories)}
features_ranges[col] = summary
return features_ranges
def dump_feature_ranges_to_json_file(df, filename="feature_ranges.json"):
feature_ranges = extract_feature_ranges(df)
with open(filename, "w") as fil:
json.dump(feature_ranges, fil)
return filename
def dump_models(models, model_dump_file="bmw_linreg_model.pckl"):
with open(model_dump_file, "wb") as file_:
dump(models, file_)
return model_dump_file
def load_models(model_dump_file="bmw_linreg_model.pckl"):
with open(model_dump_file, "rb") as file_:
models = load(file_)
return models