forked from SleepyBag/Statistical-Learning-Methods
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGMM.py
134 lines (119 loc) · 6.05 KB
/
GMM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import numpy as np
from matplotlib import pyplot as plt
class GMM:
def __init__(self, k, independent_variance=True, max_step=2000):
self.k = k
self.max_step = max_step
self.epsilon = 1e-8
# specify whether each feature has independent variance - that is, has a diagnol covariance matrix
self.independent_variance = independent_variance
def fit(self, X):
"""
X: training data of shape [n, feature_size]
"""
n, self.feature_size = X.shape
# the parameter of each gaussian distribution
self.prior = np.ones(self.k) / self.k
self.prior /= self.prior.sum()
if self.independent_variance:
self.std = np.repeat(np.std(X, axis=0, keepdims=True), self.k, axis=0)
self.mean = np.random.normal(X.mean(axis=0), self.std, [self.k, self.feature_size])
else:
self.cov = np.repeat(np.cov(X.T)[None, ...], self.k, axis=0)
self.mean = np.random.multivariate_normal(X.mean(axis=0), self.cov[0], [self.k])
pre_likelihood = np.zeros([self.k, n])
for step in range(self.max_step):
##########################################
# Expectation step
##########################################
# posterior probability of each sample in each Gaussian model
posterior = self.predict(X)
##########################################
# Maximization step
##########################################
# center of each Gaussian model
self.mean = (posterior[:, :, None] * X[None, :, :]).sum(axis=1) / \
(posterior.sum(axis=1)[:, None] + self.epsilon)
# distance from each sample to each center
dis = X[None, :, :] - self.mean[:, None, :]
if self.independent_variance:
# variance of each Gaussian model
var = (posterior[:, :, None] * dis ** 2).sum(axis=1) / \
(posterior.sum(axis=1)[:, None] + self.epsilon)
# standard deviation of each Gaussian model, in each dimension
# shape [k, feature_size]
# std[i, j] is the variance of j-th feature in the i-th Gaussian model
self.std = np.sqrt(var)
else:
# covariance of each Gaussian model
# shape [k, feature_size, feature_size]
# cov[i] is the covariance matrix of i-th Gaussian model
self.cov = (dis.transpose(0, 2, 1) @ (posterior[:, :, None] * dis)) / \
(posterior.sum(axis=1)[:, None, None] + self.epsilon)
self.prior = posterior.sum(axis=1)
self.prior /= (self.prior.sum() + self.epsilon)
if (self.likelihood - pre_likelihood).max() < self.epsilon:
break
pre_likelihood = self.likelihood
def predict(self, X):
"""return the probability of each x belonging to each gaussian distribution"""
# dis[i, j, k] is the distance from i-th center to j-th sample, in k-th dimension
dis = X[None, :, :] - self.mean[:, None, :]
# calculate log likelihood first, then likelihood
if self.independent_variance:
# log_likelihook is of shape [k, n, feature_size]
log_likelihood = -dis ** 2 * .5 / (self.std[:, None, :] ** 2 + self.epsilon) \
- np.log(np.sqrt(2 * np.pi) + self.epsilon) - np.log(self.std[:, None, :] + self.epsilon)
# reduce likelihood to shape [k, n]
# log_likelihood[i, j] is the likelihood of j-th sample belonging to i-th center
log_likelihood = log_likelihood.sum(-1)
else:
# log_likelihook is of shape [k, n]
# log_likelihood[i, j] is the likelihood of j-th sample belonging to i-th center
fixed_cov = self.cov + self.epsilon * np.eye(self.feature_size)
log_likelihood = -.5 * (dis @ np.linalg.inv(fixed_cov) * dis).sum(axis=-1) \
-.5 * np.linalg.slogdet(2 * np.pi * fixed_cov)[1][:, None] # slogdet returns [sign, logdet], we just need logdet
likelihood = np.exp(log_likelihood)
self.likelihood = likelihood
# the posterior of each datium belonging to a distribution, of shape [k, n]
posterior = self.prior[:, None] * likelihood
posterior /= (posterior.sum(axis=0, keepdims=True) + self.epsilon)
return posterior
if __name__ == '__main__':
def demonstrate(desc, X):
gmm = GMM(3, independent_variance=False)
gmm.fit(X)
pred = gmm.predict(X).T
plt.scatter(X[:, 0], X[:, 1], color=pred)
plt.title(desc)
plt.show()
# ---------------------- Example 1---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.3, .3], [100, 2]),
np.random.normal([0, 1], [.3, .3], [100, 2]),
np.random.normal([1, 0], [.3, .3], [100, 2]),
])
demonstrate("Example 1", X)
# ---------------------- Example 2---------------------------------------------
demonstrate("Example 2: GMM does'nt promise the same result for the same data", X)
# ---------------------- Example 3---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.4, .4], [100, 2]),
np.random.normal([0, 1], [.4, .4], [100, 2]),
np.random.normal([1, 0], [.4, .4], [100, 2]),
])
demonstrate("Example 3", X)
# ---------------------- Example 4---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.4, .4], [100, 2]),
np.random.normal([0, 3], [.4, .4], [100, 2]),
np.random.normal([3, 0], [.4, .4], [100, 2]),
])
demonstrate("Example 4", X)
# ---------------------- Example 5---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.4, .4], [1, 2]),
np.random.normal([0, 3], [.4, .4], [1, 2]),
np.random.normal([3, 0], [.4, .4], [1, 2]),
])
demonstrate("Example 5", X)