-
Notifications
You must be signed in to change notification settings - Fork 0
/
comparison_algorithms.py
269 lines (198 loc) · 9 KB
/
comparison_algorithms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
from mlxtend.feature_selection import SequentialFeatureSelector as SFS
from scipy.stats import pearsonr
from sklearn.feature_selection import RFE, SelectFromModel, SelectKBest, f_classif, mutual_info_classif
import numpy as np
import pandas as pd
import pandas as pd
import pymrmr
import pyswarms as ps
from bayesian_algorithms import discretize
from utils import convert_vector, get_estimator
def sfs(data, target, n_features=None, estimator="linear_regression", metric=None):
""" Run Sequential Feature Selection (a wrapper method)
https://rasbt.github.io/mlxtend/user_guide/feature_selection/SequentialFeatureSelector/
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to be selected; if 'None' then best cross-validation result is selected
estimator -- estimator used to determine score
metric -- metric used to calculate score
Return: result vector
"""
if n_features is None:
n_features = "best"
# https://rasbt.github.io/mlxtend/user_guide/feature_selection/SequentialFeatureSelector/
sfs_selection = SFS(get_estimator(estimator),
k_features=n_features,
forward=True,
verbose=0,
cv=2, # enable cross validation
scoring=metric
)
sfs_selection.fit(data, target)
# convert feature-id-array to 0/1 vector
result_vector = [0 for _ in range(len(data.columns))]
for index in sfs_selection.k_feature_idx_:
result_vector[index] = 1
return result_vector
def rfe(data, target, n_features=10, estimator="linear_regression"):
""" Run Recursive Feature Selection (a wrapper method)
https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.RFE.html#sklearn.feature_selection.RFE
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to be selected; if 'None' then 10 features will be selected
estimator -- estimator used to determine score
Return: result vector
"""
rfe_selection = RFE(estimator=get_estimator(estimator),
n_features_to_select=n_features,
verbose=0
)
rfe_selection.fit(data, target)
# calculate result vector
result_vector = convert_vector(rfe_selection.support_)
return result_vector
def sfm_svc(data, target, n_features=None, estimator=None):
return __sfm(data, target, n_features, "svc_linear")
def sfm_logistic_regression(data, target, n_features=None, estimator=None):
return __sfm(data, target, n_features, "logistic_regression")
def sfm_random_forest(data, target, n_features=None, estimator=None):
return __sfm(data, target, n_features, "random_forest")
def __sfm(data, target, n_features=None, estimator="linear_regression"):
""" Run Select From Model (an embedded method)
https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.SelectFromModel.html#sklearn.feature_selection.SelectFromModel.get_support
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
estimator -- estimator used to determine score
Return: result vector
"""
sfm_selection = SelectFromModel(estimator=get_estimator(
estimator), max_features=n_features, threshold=-np.inf).fit(data, target)
# calculate result vector
result_vector = convert_vector(sfm_selection.get_support())
return result_vector
def n_best_anova_f(data, target, n_features, estimator=None):
""" Run SelectKBest feature selection.
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
estimator -- ignored (only for compatibility)
Return: result vector
"""
skb_selection = SelectKBest(score_func=f_classif, k=n_features).fit(data, target)
result_vector = convert_vector(skb_selection.get_support())
return result_vector
def n_best_mutual(data, target, n_features, estimator=None):
""" Calculate mutual score for each feature, then select n highest features.
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
estimator -- ignored (only for compatibility)
Return: result vector
"""
mutual_selection = SelectKBest(score_func=mutual_info_classif, k=n_features).fit(data,target)
# select n features with highest mutual score
result_vector = convert_vector(mutual_selection.get_support())
return result_vector
def n_best_pearsonr(data, target, n_features, estimator=None):
""" Calculate pearson correlation for each feature, then select n highest features.
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
estimator -- ignored (only for compatibility)
Return: result vector
"""
pearson_selection = [abs(pearsonr(data.loc[:,feature], target.astype("float"))[1]) for feature in data.columns]
result_vector = discretize(pearson_selection, "n_highest", n_features)
return result_vector
def pymrmr_fs_miq(data, target, n_features, estimator=None):
""" Wrapper for mRMR with MIQ scheme
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
estimator -- ignored (only for compatibility)
Return: result vector
"""
return __pymrmr_fs(data, target, n_features, "MIQ", None)
def pymrmr_fs_mid(data, target, n_features, estimator=None):
""" Wrapper for mRMR with MID scheme
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
estimator -- ignored (only for compatibility)
Return: result vector
"""
return __pymrmr_fs(data, target, n_features, "MID", None)
def __pymrmr_fs(data, target, n_features, scheme="MIQ", estimator=None):
""" Minimum redundancy feature selection
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
scheme -- mRMR scheme (MID or MIQ)
estimator -- ignored (only for compatibility)
Return: result vector
"""
# discretize data to integers
target_data = pd.concat([target, data], axis=1)
target_data = target_data.apply(lambda x: pd.factorize(x)[0])
result = pymrmr.mRMR(target_data, scheme, n_features)
# convert feature names to 0/1 vector
result_vector = [0 for _ in data.columns]
for i in range(0,len(result_vector)):
if data.columns[i] in result:
result_vector[i] = 1
return result_vector
# NOT USED (because to limit number of features by penalty does not work well
# therefore it is not really comparable to other approaches)
def binary_swarm(data, target, n_features=None, estimator=None):
""" Binary Particle Swarm optimization
Source: https://pyswarms.readthedocs.io/en/development/examples/feature_subset_selection.html
Keyword arguments:
data -- feature matrix
target -- regression or classification targets
n_features -- number of features to select
estimator -- estimator used to determine score
Return: result vector
"""
estimator = get_estimator(estimator)
total_features = len(data.columns)
# Define objective function
def f_per_particle(mask, alpha):
# Get the subset of the features from the binary mask
if np.count_nonzero(mask) == 0:
X_subset = data.values
else:
X_subset = data.values[:,mask==1]
# Perform classification and store performance in P
estimator.fit(X_subset, target)
P = (estimator.predict(X_subset) == target).mean()
# Calculate penalty (to optimize towards the desired number of selected features)
if n_features is not None:
feature_overflow_penalty = abs(sum(mask)-n_features)/total_features
else:
feature_overflow_penalty = 0
# Compute for the objective function
j = (alpha * (1.0 - P)
+ (1.0 - alpha) * (1 - (X_subset.shape[1] / total_features))) + feature_overflow_penalty
return j
def f(x, alpha=0.88):
n_particles = x.shape[0]
j = [f_per_particle(x[i], alpha) for i in range(n_particles)]
return np.array(j)
# Call instance of PSO
options = {'c1': 0.5, 'c2': 0.5, 'w':0.9, 'k': 30, 'p':2}
dimensions = total_features # dimensions should be the number of features
optimizer = ps.discrete.BinaryPSO(n_particles=30, dimensions=dimensions, options=options)
# Perform optimization
_, pos = optimizer.optimize(f, iters=1000, verbose=0)
result_vector = convert_vector(pos)
return result_vector