-
Notifications
You must be signed in to change notification settings - Fork 78
/
eval_methods.py
236 lines (206 loc) · 7.99 KB
/
eval_methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import numpy as np
import more_itertools as mit
from spot import SPOT, dSPOT
def adjust_predicts(score, label, threshold, pred=None, calc_latency=False):
"""
Calculate adjusted predict labels using given `score`, `threshold` (or given `pred`) and `label`.
Args:
score (np.ndarray): The anomaly score
label (np.ndarray): The ground-truth label
threshold (float): The threshold of anomaly score.
A point is labeled as "anomaly" if its score is lower than the threshold.
pred (np.ndarray or None): if not None, adjust `pred` and ignore `score` and `threshold`,
calc_latency (bool):
Returns:
np.ndarray: predict labels
Method from OmniAnomaly (https://github.com/NetManAIOps/OmniAnomaly)
"""
if label is None:
predict = score > threshold
return predict, None
if pred is None:
if len(score) != len(label):
raise ValueError("score and label must have the same length")
predict = score > threshold
else:
predict = pred
actual = label > 0.1
anomaly_state = False
anomaly_count = 0
latency = 0
for i in range(len(predict)):
if any(actual[max(i, 0) : i + 1]) and predict[i] and not anomaly_state:
anomaly_state = True
anomaly_count += 1
for j in range(i, 0, -1):
if not actual[j]:
break
else:
if not predict[j]:
predict[j] = True
latency += 1
elif not actual[i]:
anomaly_state = False
if anomaly_state:
predict[i] = True
if calc_latency:
return predict, latency / (anomaly_count + 1e-4)
else:
return predict
def calc_point2point(predict, actual):
"""
calculate f1 score by predict and actual.
Args:
predict (np.ndarray): the predict label
actual (np.ndarray): np.ndarray
Method from OmniAnomaly (https://github.com/NetManAIOps/OmniAnomaly)
"""
TP = np.sum(predict * actual)
TN = np.sum((1 - predict) * (1 - actual))
FP = np.sum(predict * (1 - actual))
FN = np.sum((1 - predict) * actual)
precision = TP / (TP + FP + 0.00001)
recall = TP / (TP + FN + 0.00001)
f1 = 2 * precision * recall / (precision + recall + 0.00001)
return f1, precision, recall, TP, TN, FP, FN
def pot_eval(init_score, score, label, q=1e-3, level=0.99, dynamic=False):
"""
Run POT method on given score.
:param init_score (np.ndarray): The data to get init threshold.
For `OmniAnomaly`, it should be the anomaly score of train set.
:param: score (np.ndarray): The data to run POT method.
For `OmniAnomaly`, it should be the anomaly score of test set.
:param label (np.ndarray): boolean list of true anomalies in score
:param q (float): Detection level (risk)
:param level (float): Probability associated with the initial threshold t
:return dict: pot result dict
Method from OmniAnomaly (https://github.com/NetManAIOps/OmniAnomaly)
"""
print(f"Running POT with q={q}, level={level}..")
s = SPOT(q) # SPOT object
s.fit(init_score, score)
s.initialize(level=level, min_extrema=False) # Calibration step
ret = s.run(dynamic=dynamic, with_alarm=False)
print(len(ret["alarms"]))
print(len(ret["thresholds"]))
pot_th = np.mean(ret["thresholds"])
pred, p_latency = adjust_predicts(score, label, pot_th, calc_latency=True)
if label is not None:
p_t = calc_point2point(pred, label)
return {
"f1": p_t[0],
"precision": p_t[1],
"recall": p_t[2],
"TP": p_t[3],
"TN": p_t[4],
"FP": p_t[5],
"FN": p_t[6],
"threshold": pot_th,
"latency": p_latency,
}
else:
return {
"threshold": pot_th,
}
def bf_search(score, label, start, end=None, step_num=1, display_freq=1, verbose=True):
"""
Find the best-f1 score by searching best `threshold` in [`start`, `end`).
Method from OmniAnomaly (https://github.com/NetManAIOps/OmniAnomaly)
"""
print(f"Finding best f1-score by searching for threshold..")
if step_num is None or end is None:
end = start
step_num = 1
search_step, search_range, search_lower_bound = step_num, end - start, start
if verbose:
print("search range: ", search_lower_bound, search_lower_bound + search_range)
threshold = search_lower_bound
m = (-1.0, -1.0, -1.0)
m_t = 0.0
m_l = 0
for i in range(search_step):
threshold += search_range / float(search_step)
target, latency = calc_seq(score, label, threshold)
if target[0] > m[0]:
m_t = threshold
m = target
m_l = latency
if verbose and i % display_freq == 0:
print("cur thr: ", threshold, target, m, m_t)
return {
"f1": m[0],
"precision": m[1],
"recall": m[2],
"TP": m[3],
"TN": m[4],
"FP": m[5],
"FN": m[6],
"threshold": m_t,
"latency": m_l,
}
def calc_seq(score, label, threshold):
predict, latency = adjust_predicts(score, label, threshold, calc_latency=True)
return calc_point2point(predict, label), latency
def epsilon_eval(train_scores, test_scores, test_labels, reg_level=1):
best_epsilon = find_epsilon(train_scores, reg_level)
pred, p_latency = adjust_predicts(test_scores, test_labels, best_epsilon, calc_latency=True)
if test_labels is not None:
p_t = calc_point2point(pred, test_labels)
return {
"f1": p_t[0],
"precision": p_t[1],
"recall": p_t[2],
"TP": p_t[3],
"TN": p_t[4],
"FP": p_t[5],
"FN": p_t[6],
"threshold": best_epsilon,
"latency": p_latency,
"reg_level": reg_level,
}
else:
return {"threshold": best_epsilon, "reg_level": reg_level}
def find_epsilon(errors, reg_level=1):
"""
Threshold method proposed by Hundman et. al. (https://arxiv.org/abs/1802.04431)
Code from TelemAnom (https://github.com/khundman/telemanom)
"""
e_s = errors
best_epsilon = None
max_score = -10000000
mean_e_s = np.mean(e_s)
sd_e_s = np.std(e_s)
for z in np.arange(2.5, 12, 0.5):
epsilon = mean_e_s + sd_e_s * z
pruned_e_s = e_s[e_s < epsilon]
i_anom = np.argwhere(e_s >= epsilon).reshape(-1,)
buffer = np.arange(1, 50)
i_anom = np.sort(
np.concatenate(
(
i_anom,
np.array([i + buffer for i in i_anom]).flatten(),
np.array([i - buffer for i in i_anom]).flatten(),
)
)
)
i_anom = i_anom[(i_anom < len(e_s)) & (i_anom >= 0)]
i_anom = np.sort(np.unique(i_anom))
if len(i_anom) > 0:
groups = [list(group) for group in mit.consecutive_groups(i_anom)]
# E_seq = [(g[0], g[-1]) for g in groups if not g[0] == g[-1]]
mean_perc_decrease = (mean_e_s - np.mean(pruned_e_s)) / mean_e_s
sd_perc_decrease = (sd_e_s - np.std(pruned_e_s)) / sd_e_s
if reg_level == 0:
denom = 1
elif reg_level == 1:
denom = len(i_anom)
elif reg_level == 2:
denom = len(i_anom) ** 2
score = (mean_perc_decrease + sd_perc_decrease) / denom
if score >= max_score and len(i_anom) < (len(e_s) * 0.5):
max_score = score
best_epsilon = epsilon
if best_epsilon is None:
best_epsilon = np.max(e_s)
return best_epsilon