-
Notifications
You must be signed in to change notification settings - Fork 3
/
makeModel.py
140 lines (89 loc) · 4.42 KB
/
makeModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import numpy as np
from sklearn import linear_model
from datetime import datetime
class MakeCCDCModel(object):
def __init__(self, datetimes, init_obs, band):
self.T = 365.25
self.pi_val_simple = (2 * np.pi) / self.T
self.pi_val_advanced = (4 * np.pi) / self.T
self.pi_val_full = (6 * np.pi) / self.T
self.datetimes = datetimes
self.band = band
self.doy = np.array([datetime.fromordinal(x.astype(int)).timetuple().tm_yday for x in self.datetimes])
self.lasso_model = None
self.residuals = None
self.RMSE = None
self.coefficients = None
self.predicted = None
self.start_val = None
self.end_val = None
self.alpha = None
self.init_obs = init_obs
def fitModel(self, band_data, cv, alpha):
self.start_val = band_data[0]
self.end_val = band_data[-1]
"""Finds the coefficients by fitting a Lasso model to the data"""
rescaled = self.datetimes - self.getMinDate()
x = np.array([rescaled,
np.cos(self.pi_val_simple * rescaled),
np.sin(self.pi_val_simple * rescaled)])
if(self.init_obs >= 18):
x = np.vstack((x, np.array([np.cos(self.pi_val_advanced * rescaled),
np.sin(self.pi_val_advanced * rescaled)])))
if(self.init_obs >= 24):
x = np.vstack((x, np.array([np.cos(self.pi_val_full * rescaled),
np.sin(self.pi_val_full * rescaled)])))
x = x.T
if(cv): # If cross validation should be used to find alpha parameter
self.lasso_model = linear_model.LassoCV(fit_intercept=True).fit(x, band_data)
self.alpha = self.lasso_model.alpha_
else:
self.lasso_model = linear_model.Lasso(fit_intercept=True, alpha=alpha).fit(x, band_data)
self.alpha = alpha
self.predicted = self.lasso_model.predict(x)
self.coefficients = self.lasso_model.coef_
self.residuals = band_data - self.predicted
# Get overall RMSE of model
self.RMSE = np.sqrt(np.mean(self.residuals ** 2))
def getPrediction(self, date_to_predict):
"""Returns the predicted value for a given date based on the current model"""
# Rescale date so that it starts from 0
date_to_predict = date_to_predict - self.getMinDate()
x = np.array([[date_to_predict],
[np.cos(self.pi_val_simple * date_to_predict)],
[np.sin(self.pi_val_simple * date_to_predict)]])
if(self.init_obs >= 18):
x = np.vstack((x, np.array([[np.cos(self.pi_val_advanced * date_to_predict)],
[np.sin(self.pi_val_advanced * date_to_predict)]])))
if(self.init_obs >= 24):
x = np.vstack((x, np.array([[np.cos(self.pi_val_full * date_to_predict)],
[np.sin(self.pi_val_full * date_to_predict)]])))
x = x.T
if(len(x) == 1):
x = x.reshape(1, -1)
else:
x = x.reshape(len(x), 7)
return self.lasso_model.predict(x)
def getAdjustedRMSE(self, curr_date):
"""Get adjusted RMSE for a specific DOY"""
# Get DOY for current date
curr_doy = datetime.fromordinal(curr_date.astype(int)).timetuple().tm_yday
# Get absolute differences between the current DOY and all other DOY values
differenced = np.abs(self.doy - curr_doy)
# Sort differenced values and return index
sorted_ix = sorted(range(len(differenced)), key=lambda k: differenced[k])
# Get closest values by index
closest = sorted_ix[:self.init_obs]
# Subset residuals by indices of closest DOY values
closest_residuals = self.residuals[closest]
# Calculate adjusted RMSE
adjusted_rmse = np.sqrt(np.mean(closest_residuals ** 2))
return adjusted_rmse
def getRMSE(self, curr_date):
return self.getAdjustedRMSE(curr_date)
def getMinDate(self):
return np.min(self.datetimes)
def getMaxDate(self):
return np.max(self.datetimes)
def getNumCoeffs(self):
return len(self.coefficients)