-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoutlierDetection.py
187 lines (137 loc) · 5.72 KB
/
outlierDetection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import pandas as pd
import numpy as np
from outliers import smirnov_grubbs as grubbs
# =============================================================================
# Univariate
# =============================================================================
# =============================================================================
# #Grubbs test
# =============================================================================
def grubbsTest(data,testType='2side',alpha=.05,removeOutliers=False):
'''
Parameters
----------
data : 1d array or pd.Series
DESCRIPTION inputData as np.array.
testType : string
The default is '2side', can work with 'min' or 'max'.
alpha : TYPE, float
The significance level to use for the test, default is .05.
Returns
-------
removeOutliers = False, input data returns
removeOutliers = True, manipulated data returns
'''
if testType == '2side':
removedData = grubbs.test(data,alpha)
elif testType == 'min':
removedData = grubbs.min_test(data,alpha)
elif testType == 'max':
removedData = grubbs.max_test(data,alpha)
else:
return print('wrong testType passed, you can only use\
2side, min or max')
if len(removedData) == len(data):
print('No outliers found, consider changing alpha value which is default .05')
else:
print("Detected outliers(grubbsTest-{}) : ".format(testType),np.setdiff1d(data, removedData))
if removeOutliers==True:
return removedData
#%%
#usage#
sampleData1 = np.array([20, 21, 26, 24, 29, 22, 21, 50, 28, 27])
grubbsTest(sampleData1,'2side')
grubbsTest(sampleData1,'min')
grubbsTest(sampleData1,'max')
sampleData2 = pd.Series(sampleData1)
grubbsTest(sampleData2,'2side',removeOutliers=True)
#%%
# =============================================================================
# STD Based
# =============================================================================
data = sampleData1.copy()
def stdBasedOutlier(data,sigma=3,outType='2side',removeOutlier=False):
data_mean, data_std = np.mean(data), np.std(data)
thresh = data_std * sigma
lower, upper = (data_mean - thresh), (data_mean + thresh)
if outType == '2side':
outliers = [x for x in data if x < lower or x > upper]
elif outType == 'min':
outliers = [x for x in data if x < lower]
elif outType == 'max':
outliers = [x for x in data if x > upper]
else:
return print('wrong testType passed, you can only use 2side, min or max')
if len(outliers) == 0:
print('No outlier found, consider change sigma value which is default 3')
else:
print('Identified outliers count: %d' % len(outliers))
print(outliers)
if removeOutlier==True:
outliers_removed = [x for x in data if x >= lower and x <= upper]
return np.array(outliers_removed)
else:
return data
def stdBasedOutlier_df(df,col_name, sigma=3, removeOutlier=False):
# Compute the mean and standard deviation of the given column
col_mean = np.mean(df[col_name])
col_std = np.std(df[col_name])
# Compute the z-score of each value in the column
z_scores = [(val - col_mean) / col_std for val in df[col_name]]
# Create a boolean mask to identify the values that are considered outliers
# based on the z-score and the specified number of standard deviations (sigma)
mask = np.abs(z_scores) > sigma
if removeOutlier:
# Remove the outliers from the data
df = df[~mask]
else:
# Add a new column to the data indicating whether each value is an outlier
df['is_outlier'] = mask
return data
#%%
aa = stdBasedOutlier(data,sigma=2,outType='min',removeOutlier=True)
#%%
# =============================================================================
# IQR Based
# =============================================================================
data = 5 * np.random.randn(10000) + 50
def IQRBasedOutlier(data,iqrThresh=1.5,outType='2side',removeOutlier=False):
q25, q75 = np.percentile(data, 25), np.percentile(data, 75)
iqr = q75 - q25
print('Percentiles: 25th=%.3f, 75th=%.3f, IQR=%.3f' % (q25, q75, iqr))
cut_off = iqr * iqrThresh
lower, upper = q25 - cut_off, q75 + cut_off
if outType == '2side':
outliers = [x for x in data if x < lower or x > upper]
elif outType == 'min':
outliers = [x for x in data if x < lower]
elif outType == 'max':
outliers = [x for x in data if x > upper]
else:
return print('wrong testType passed, you can only use 2side, min or max')
if len(outliers) == 0:
print('No outlier found, consider change sigma value which is default 3')
else:
print('Identified outliers count: %d' % len(outliers))
print(outliers)
if removeOutlier==True:
outliers_removed = [x for x in data if x >= lower and x <= upper]
return np.array(outliers_removed)
else:
return data
## detect outliers based on a column_name in dataframe
def IQRBasedOutlier_df(df,col_name,iqrThresh=1.5,removeOutlier=False):
# calculate quartiles
q1, q3 = df[col_name].quantile([0.25, 0.75])
# calculate IQR
iqr = q3 - q1
# calculate lower and upper bounds
lower_bound = q1 - (iqr * iqrThresh)
upper_bound = q3 + (iqr * iqrThresh)
# create a new boolean column to indicate whether a row is an outlier
df['is_outlier'] = (df[col_name] < lower_bound) | (df[col_name] > upper_bound)
if removeOutlier:
df_without_outliers = df[df['is_outlier']==False]
return df_without_outliers
else:
return df