-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathdenoise.py
205 lines (193 loc) · 7.64 KB
/
denoise.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python3
# −*− coding:utf-8 −*−
import numpy as np
from scipy.linalg import svd
class Denoiser(object):
'''
A class for smoothing a noisy, real-valued data sequence by means of SVD of a partial circulant matrix.
-----
Attributes:
mode: str
Code running mode: "layman" or "expert".
In the "layman" mode, the code autonomously tries to find the optimal denoised sequence.
In the "expert" mode, a user has full control over it.
s: 1D array of floats
Singular values ordered decreasingly.
U: 2D array of floats
A set of left singular vectors as the columns.
r: int
Rank of the approximating matrix of the constructed partial circulant matrix from the sequence.
'''
def __init__(self, mode="layman"):
'''
Class initialization.
-----
Arguments:
mode: str
Denoising mode. To be selected from ["layman", "expert"]. Default is "layman".
While "layman" grants the code autonomy, "expert" allows a user to experiment.
-----
Raises:
ValueError
If mode is neither "layman" nor "expert".
'''
self._method = {"layman": self._denoise_for_layman, "expert": self._denoise_for_expert}
if mode not in self._method:
raise ValueError("unknown mode '{:s}'!".format(mode))
self.mode = mode
def _embed(self, x, m):
'''
Embed a 1D array into a 2D partial circulant matrix by cyclic left-shift.
-----
Arguments:
x: 1D array of floats
Input array.
m: int
Number of rows of the constructed matrix.
-----
Returns:
X: 2D array of floats
Constructed partial circulant matrix.
'''
x_ext = np.hstack((x, x[:m-1]))
shape = (m, x.size)
strides = (x_ext.strides[0], x_ext.strides[0])
X = np.lib.stride_tricks.as_strided(x_ext, shape, strides)
return X
def _reduce(self, A):
'''
Reduce a 2D matrix to a 1D array by cyclic anti-diagonal average.
-----
Arguments:
A: 2D array of floats
Input matrix.
-----
Returns:
a: 1D array of floats
Output array.
'''
m = A.shape[0]
A_ext = np.hstack((A[:,-m+1:], A))
strides = (A_ext.strides[0]-A_ext.strides[1], A_ext.strides[1])
a = np.mean(np.lib.stride_tricks.as_strided(A_ext[:,m-1:], A.shape, strides), axis=0)
return a
def _denoise_for_expert(self, sequence, layer, gap, rank):
'''
Smooth a noisy sequence by means of low-rank approximation of its corresponding partial circulant matrix.
-----
Arguments:
sequence: 1D array of floats
Data sequence to be denoised.
layer: int
Number of leading rows selected from the matrix.
gap: float
Gap between the data levels on the left and right ends of the sequence.
A positive value means the right level is higher.
rank: int
Rank of the approximating matrix.
-----
Returns:
denoised: 1D array of floats
Smoothed sequence after denoise.
-----
Raises:
AssertionError
If condition 1 <= rank <= layer <= sequence.size cannot be fulfilled.
'''
assert 1 <= rank <= layer <= sequence.size
self.r = rank
# linear trend to be deducted
trend = np.linspace(0, gap, sequence.size)
X = self._embed(sequence-trend, layer)
# singular value decomposition
self.U, self.s, Vh = svd(X, full_matrices=False, overwrite_a=True, check_finite=False)
# low-rank approximation
A = self.U[:,:self.r] @ np.diag(self.s[:self.r]) @ Vh[:self.r]
denoised = self._reduce(A) + trend
return denoised
def _cross_validate(self, x, m):
'''
Check if the gap of boundary levels of the detrended sequence is within the estimated noise strength.
-----
Arguments:
x: 1D array of floats
Input array.
m: int
Number of rows of the constructed matrix.
-----
Returns:
valid: bool
Result of cross validation. True means the detrending procedure is valid.
'''
X = self._embed(x, m)
self.U, self.s, self._Vh = svd(X, full_matrices=False, overwrite_a=True, check_finite=False)
# Search for noise components using the normalized mean total variation of the left singular vectors as an indicator.
# The procedure runs in batch of every 10 singular vectors.
self.r = 0
while True:
U_sub = self.U[:,self.r:self.r+10]
NMTV = np.mean(np.abs(np.diff(U_sub,axis=0)), axis=0) / (np.amax(U_sub,axis=0) - np.amin(U_sub,axis=0))
try:
# the threshold of 10% can in most cases discriminate noise components
self.r += np.argwhere(NMTV > .1)[0,0]
break
except IndexError:
self.r += 10
# estimate the noise strength, while r marks the first noise component
noise_stdev = np.sqrt(np.sum(self.s[self.r:]**2) / X.size)
# estimate the gap of boundary levels after detrend
gap = np.abs(x[-self._k:].mean()-x[:self._k].mean())
valid = gap < noise_stdev
return valid
def _denoise_for_layman(self, sequence, layer):
'''
Similar to the "expert" method, except that denoising parameters are optimized autonomously.
-----
Arguments:
sequence: 1D array of floats
Data sequence to be denoised.
layer: int
Number of leading rows selected from the corresponding circulant matrix.
-----
Returns:
denoised: 1D array of floats
Smoothed sequence after denoise.
-----
Raises:
AssertionError
If condition 1 <= layer <= sequence.size cannot be fulfilled.
'''
assert 1 <= layer <= sequence.size
# The code takes the mean of a few neighboring data to estimate the boundary levels of the sequence.
# By default, this number is 11.
self._k = 11
# Initially, the code assumes no linear inclination.
trend = np.zeros_like(sequence)
# Iterate over the averaging length.
# In the worst case, iteration must terminate when it is 1.
while not self._cross_validate(sequence-trend, layer):
self._k -= 2
trend = np.linspace(0, sequence[-self._k:].mean()-sequence[:self._k].mean(), sequence.size)
# low-rank approximation by using only signal components
A = self.U[:,:self.r] @ np.diag(self.s[:self.r]) @ self._Vh[:self.r]
denoised = self._reduce(A) + trend
return denoised
def denoise(self, *args, **kwargs):
'''
User interface method.
It will reference to different denoising methods ad hoc under the fixed name.
'''
return self._method[self.mode](*args, **kwargs)
if __name__ == "__main__":
x = np.linspace(-10, 10, 1000)
signal = np.sinc(x)
noise = np.random.normal(scale=.1, size=1000)
sequence = signal + noise
denoiser = Denoiser()
denoised = denoiser.denoise(sequence, 200)
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(x, sequence)
ax.plot(x, signal)
ax.plot(x, denoised)
plt.show()