forked from jamesdj/tobit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtobit.py
169 lines (138 loc) · 5.23 KB
/
tobit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import math
import warnings
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import scipy.stats
from scipy.special import log_ndtr
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error
def split_left_right_censored(x, y, cens):
counts = cens.value_counts()
if -1 not in counts and 1 not in counts:
warnings.warn("No censored observations; use regression methods for uncensored data")
xs = []
ys = []
for value in [-1, 0, 1]:
if value in counts:
split = cens == value
y_split = np.squeeze(y[split].values)
x_split = x[split].values
else:
y_split, x_split = None, None
xs.append(x_split)
ys.append(y_split)
return xs, ys
def tobit_neg_log_likelihood(xs, ys, params):
x_left, x_mid, x_right = xs
y_left, y_mid, y_right = ys
b = params[:-1]
# s = math.exp(params[-1])
s = params[-1]
to_cat = []
cens = False
if y_left is not None:
cens = True
left = (y_left - np.dot(x_left, b))
to_cat.append(left)
if y_right is not None:
cens = True
right = (np.dot(x_right, b) - y_right)
to_cat.append(right)
if cens:
concat_stats = np.concatenate(to_cat, axis=0) / s
log_cum_norm = scipy.stats.norm.logcdf(concat_stats) # log_ndtr(concat_stats)
cens_sum = log_cum_norm.sum()
else:
cens_sum = 0
if y_mid is not None:
mid_stats = (y_mid - np.dot(x_mid, b)) / s
mid = scipy.stats.norm.logpdf(mid_stats) - math.log(max(np.finfo('float').resolution, s))
mid_sum = mid.sum()
else:
mid_sum = 0
loglik = cens_sum + mid_sum
return - loglik
def tobit_neg_log_likelihood_der(xs, ys, params):
x_left, x_mid, x_right = xs
y_left, y_mid, y_right = ys
b = params[:-1]
# s = math.exp(params[-1]) # in censReg, not using chain rule as below; they optimize in terms of log(s)
s = params[-1]
beta_jac = np.zeros(len(b))
sigma_jac = 0
if y_left is not None:
left_stats = (y_left - np.dot(x_left, b)) / s
l_pdf = scipy.stats.norm.logpdf(left_stats)
l_cdf = log_ndtr(left_stats)
left_frac = np.exp(l_pdf - l_cdf)
beta_left = np.dot(left_frac, x_left / s)
beta_jac -= beta_left
left_sigma = np.dot(left_frac, left_stats)
sigma_jac -= left_sigma
if y_right is not None:
right_stats = (np.dot(x_right, b) - y_right) / s
r_pdf = scipy.stats.norm.logpdf(right_stats)
r_cdf = log_ndtr(right_stats)
right_frac = np.exp(r_pdf - r_cdf)
beta_right = np.dot(right_frac, x_right / s)
beta_jac += beta_right
right_sigma = np.dot(right_frac, right_stats)
sigma_jac -= right_sigma
if y_mid is not None:
mid_stats = (y_mid - np.dot(x_mid, b)) / s
beta_mid = np.dot(mid_stats, x_mid / s)
beta_jac += beta_mid
mid_sigma = (np.square(mid_stats) - 1).sum()
sigma_jac += mid_sigma
combo_jac = np.append(beta_jac, sigma_jac / s) # by chain rule, since the expression above is dloglik/dlogsigma
return -combo_jac
class TobitModel:
def __init__(self, fit_intercept=True):
self.fit_intercept = fit_intercept
self.ols_coef_ = None
self.ols_intercept = None
self.coef_ = None
self.intercept_ = None
self.sigma_ = None
def fit(self, x, y, cens, verbose=False):
"""
Fit a maximum-likelihood Tobit regression
:param x: Pandas DataFrame (n_samples, n_features): Data
:param y: Pandas Series (n_samples,): Target
:param cens: Pandas Series (n_samples,): -1 indicates left-censored samples, 0 for uncensored, 1 for right-censored
:param verbose: boolean, show info from minimization
:return:
"""
x_copy = x.copy()
if self.fit_intercept:
x_copy.insert(0, 'intercept', 1.0)
else:
x_copy.scale(with_mean=True, with_std=False, copy=False)
init_reg = LinearRegression(fit_intercept=False).fit(x_copy, y)
b0 = init_reg.coef_
y_pred = init_reg.predict(x_copy)
resid = y - y_pred
resid_var = np.var(resid)
s0 = np.sqrt(resid_var)
params0 = np.append(b0, s0)
xs, ys = split_left_right_censored(x_copy, y, cens)
result = minimize(lambda params: tobit_neg_log_likelihood(xs, ys, params), params0, method='BFGS',
jac=lambda params: tobit_neg_log_likelihood_der(xs, ys, params), options={'disp': verbose})
if verbose:
print(result)
self.ols_coef_ = b0[1:]
self.ols_intercept = b0[0]
if self.fit_intercept:
self.intercept_ = result.x[0] #Replaced result.x[1] which was returning the coef
self.coef_ = result.x[1:-1]
else:
self.coef_ = result.x[:-1]
self.intercept_ = 0
self.sigma_ = result.x[-1]
return self
def predict(self, x):
return self.intercept_ + np.dot(x, self.coef_)
def score(self, x, y, scoring_function=mean_absolute_error):
y_pred = np.dot(x, self.coef_)
return scoring_function(y, y_pred)