import numpy as np
from numpy.linalg import det, inv
from scipy.special import gamma
from math import pi
from scipy.special import erfinv
class AutoregressiveTree:
def __init__(self, p, u0=0, alpha_u=1):
erf_temp = np.zeros([7,1])
for i in range(1,8):
erf_temp[i-1] = erfinv((i/4) - 1)
self._erf = erf_temp
self._p = p
self._alpha_W = p + 2
self._u0 = u0
self._alpha_u = alpha_u
def sample_mean(self, data):
return sum(data) / len(data)
def scatter_matrix(self, data, uN_):
temp = data - uN_
SN = 0
for row in temp:
row = row[:, np.newaxis]
SN += row * row.T
return SN
def WN_func(self, uN_, SN, W0, N):
temp = self._u0 - uN_
temp = temp[:, np.newaxis]
WN = W0 + SN + ((self._alpha_u * N) / (self._alpha_u + N)) * np.dot(temp, temp.T)
return WN
def WN_d_func(self, u0_, uN_d_, SN_d_, W0_, N_):
temp = u0_ - uN_d_
temp = temp[:, np.newaxis]
WN_ = W0_ + SN_d_ + ((self._alpha_u * N_) / (self._alpha_u + N_)) * np.dot(temp, temp.T)
return WN_
def MAP_param(self, N, uN_, WN):
ut = ((self._alpha_u * self._u0) + (N * uN_)) / (self._alpha_u + N)
Wt_inv = (1 / (self._alpha_W + N - (self._p + 1))) * WN
return ut, Wt_inv
def param(self, data):
N = len(data)
uN_ = self.sample_mean(data)
SN = self.scatter_matrix(data, uN_)
W0 = np.identity(SN.shape[0])
WN = self.WN_func(uN_, SN, W0, N)
ut, Wt_inv = self.MAP_param(N, uN_, WN)
W = inv(Wt_inv)
var = 1 / W[-1, -1]
Wpp = inv(Wt_inv[:-1, :-1])
b = np.zeros([self._p, 1])
for j in range(len(b)):
for i in range(self._p):
b[j] += Wt_inv[-1, i] * Wpp[i, j]
m = ut[-1]
for i in range(self._p):
m += b[i] * ut[i]
return var, b, m[0]
def c_func(self, l, alpha):
c = 1
# for loop goes from 1 to l
for i in range(1, l + 1):
c *= gamma((alpha + 1 - i) / 2)
return c
def pds_func(self, N, W0, WN):
pds = (pi**(-((self._p + 1) * N) / 2)) + \
((self._alpha_u / (self._alpha_u + N))**((self._p + 1) / 2)) + \
(self.c_func(self._p + 1, self._alpha_W + N) / self.c_func(self._p + 1, self._alpha_W)) * (det(W0)**(self._alpha_W / 2))*(det(WN)**(-(self._alpha_W + N) / 2))
return pds
def pd_s_func(self, u0_, N_, W0_, WN_):
pds = (pi**(-((self._p + 1) * N_) / 2)) + \
((self._alpha_u / (self._alpha_u + N_))**((self._p + 1) / 2)) + \
(self.c_func(self._p + 1, self._alpha_W - 1 + N_) / self.c_func(self._p + 1, self._alpha_W - 1)) * (det(W0_)**((self._alpha_W - 1) / 2))*(det(WN_)**(-(self._alpha_W - 1 + N_) / 2))
return pds
def LeafScore(self, data):
N = len(data)
uN_ = self.sample_mean(data)
SN = self.scatter_matrix(data, uN_)
W0 = np.identity(SN.shape[0])
WN = self.WN_func(uN_, SN, W0, N)
ut, Wt_inv = self.MAP_param(N, uN_, WN)
pds = self.pds_func(N, W0, WN)
data_ = []
for x in data:
data_.append(x[:-1])
N_ = len(data_)
uN_d_ = self.sample_mean(data_)
SN_d_ = self.scatter_matrix(data_, uN_d_)
u0_ = ut[:-1]
W0_ = inv(inv(W0)[:-1, :-1])
WN_d_ = self.WN_d_func(u0_, uN_d_, SN_d_, W0_, N_)
pds_ = self.pd_s_func(u0_, N_, W0_, WN_d_)
return pds / pds_
def test_split(self, index, value, dataset):
left, right = list(), list()
for row in dataset:
if row[index] < value:
left.append(row)
else:
right.append(row)
return left, right
def get_split(self, dataset):
b_index, b_value, b_groups = 999, 999, None
b_score = self.LeafScore(dataset)
avg = np.mean(dataset, axis=0)[:-1]
sigma = np.std(dataset, axis=0)[:-1]
for index in range(len(avg)):
for i in range(len(self._erf)):
value = avg[index] + sigma[index] * self._erf[i]
groups = self.test_split(index, value, dataset)
new_score = 1
for group in groups:
if len(group) != 0:
new_score *= self.LeafScore(group)
if new_score > b_score:
b_index, b_value, b_score, b_groups = index, value, new_score, groups
return {'index':b_index, 'value':b_value, 'groups':b_groups}
def to_terminal(self, group):
outcomes = self.param(group)
return outcomes
def split(self, node, max_depth, min_size, depth):
left, right = node['groups']
del(node['groups'])
if not left or not right:
node['left'] = node['right'] = self.to_terminal(left + right)
return
if depth >= max_depth:
node['left'], node['right'] = self.to_terminal(left), self.to_terminal(right)
return
if len(left) <= min_size:
node['left'] = self.to_terminal(left)
else:
node['left'] = self.get_split(left)
if node['left']['groups'] is None:
node['left'] = self.to_terminal(left)
else:
self.split(node['left'], max_depth, min_size, depth+1)
if len(right) <= min_size:
node['right'] = self.to_terminal(right)
else:
node['right'] = self.get_split(right)
if node['right']['groups'] is None:
node['right'] = self.to_terminal(right)
else:
self.split(node['right'], max_depth, min_size, depth+1)
def build_tree(self, train, max_depth, min_size):
root = self.get_split(train)
if root['groups'] is None:
root['root'] = self.to_terminal(train)
root['index'] = None
root['value'] = None
del(root['groups'])
else:
self.split(root, max_depth, min_size, 1)
return root
def print_tree(self, node, depth=0):
if isinstance(node, dict):
if node['value'] is None:
print(node)
return
print('%s[X%d < %.3f]' % ((depth*' ', (node['index']+1), node['value'])))
self.print_tree(node['left'], depth+1)
self.print_tree(node['right'], depth+1)
else:
print('%s[%s]' % ((depth*' ', node)))
def predict(self, node, row):
if 'root' in node:
return node['root']
if row[node['index']] < node['value']:
if isinstance(node['left'], dict):
return self.predict(node['left'], row)
else:
return node['left']
else:
if isinstance(node['right'], dict):
return self.predict(node['right'], row)
else:
return node['right']