Skip to content

Commit

Permalink
Merge pull request #4 from fidelity/parallel
Browse files Browse the repository at this point in the history
Benchmark Parallelization
  • Loading branch information
irmakbky authored Jun 16, 2021
2 parents 98179a9 + 6a39bad commit a1dec9c
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 31 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
CHANGELOG
=========

-------------------------------------------------------------------------------
June, 16, 2021 1.1.0
-------------------------------------------------------------------------------

- Parallelize benchmark function.

-------------------------------------------------------------------------------
March, 23, 2021 1.0.1
-------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion feature/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Copyright FMR LLC <[email protected]>
# SPDX-License-Identifier: GNU GPLv3

__version__ = "1.0.1"
__version__ = "1.1.0"
115 changes: 85 additions & 30 deletions feature/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@

"""
:Author: FMR LLC
:Version: 1.0.0 of August 10, 2020
:Version: 1.1.0 of June 16, 2021
This module defines the public interface of the **Selective Library** for feature selection.
"""

import multiprocessing as mp
from time import time
from typing import Dict, Union, NamedTuple, NoReturn, Tuple, Optional

import numpy as np
import pandas as pd
import seaborn as sns
from catboost import CatBoostClassifier, CatBoostRegressor
from joblib import Parallel, delayed
from lightgbm import LGBMClassifier, LGBMRegressor
from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
Expand Down Expand Up @@ -439,7 +441,7 @@ def _validate_args(seed, selection_method) -> NoReturn:
SelectionMethod.TreeBased,
SelectionMethod.Statistical,
SelectionMethod.Variance)),
TypeError("Unknown selection type: " + str(selection_method)))
TypeError("Unknown selection type: " + str(selection_method) + " " + str(type(selection_method))))

# Selection method value
selection_method._validate()
Expand Down Expand Up @@ -480,6 +482,7 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
output_filename: Optional[str] = None,
drop_zero_variance_features: Optional[bool] = True,
verbose: bool = False,
n_jobs: int = 1,
seed: int = Constants.default_seed) \
-> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Expand Down Expand Up @@ -507,6 +510,10 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
Whether to drop features with zero variance before running feature selector methods or not.
verbose: bool, optional (default=False)
Whether to print progress messages or not.
n_jobs: int, optional (default=1)
Number of concurrent processes/threads to use in parallelized routines.
If set to -1, all CPUs are used.
If set to -2, all CPUs but one are used, and so on.
seed: int, optional (default=Constants.default_seed)
The random seed to initialize the random number generator.
Expand All @@ -525,7 +532,8 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
labels=labels,
output_filename=output_filename,
drop_zero_variance_features=drop_zero_variance_features,
verbose=verbose)
verbose=verbose,
n_jobs=n_jobs)
else:

# Create K-Fold object
Expand Down Expand Up @@ -555,7 +563,8 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
labels=train_labels,
output_filename=output_filename,
drop_zero_variance_features=drop_zero_variance_features,
verbose=False)
verbose=False,
n_jobs=n_jobs)

# Concatenate data frames
score_df = pd.concat((score_df, score_cv_df))
Expand All @@ -577,7 +586,8 @@ def _bench(selectors: Dict[str, Union[SelectionMethod.Correlation,
labels: Optional[pd.Series] = None,
output_filename: Optional[str] = None,
drop_zero_variance_features: Optional[bool] = True,
verbose: bool = False) \
verbose: bool = False,
n_jobs: int = 1) \
-> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Benchmark with a given set of feature selectors.
Expand All @@ -591,7 +601,7 @@ def _bench(selectors: Dict[str, Union[SelectionMethod.Correlation,
check_true(selectors is not None, ValueError("Benchmark selectors cannot be none."))
check_true(data is not None, ValueError("Benchmark data cannot be none."))

# Output files
# Output file
if output_filename is not None:
output_file = open(output_filename, "a")
else:
Expand All @@ -605,39 +615,84 @@ def _bench(selectors: Dict[str, Union[SelectionMethod.Correlation,
method_to_runtime = {}
score_df = pd.DataFrame(index=data.columns)
selected_df = pd.DataFrame(index=data.columns)
for method_name, method in selectors.items():
selector = Selective(method)
t0 = time()
if verbose:
print("\n>>> Running", method_name)
scores = None
selected = []
try:
subset = selector.fit_transform(data, labels)
scores = selector.get_absolute_scores()
selected = [1 if c in subset.columns else 0 for c in data.columns]
method_to_runtime[method_name] = round((time() - t0) / 60, 2)
except Exception as exp:
print("Exception", exp)
scores = np.repeat(0, len(data.columns))
selected = np.repeat(0, len(data.columns))
method_to_runtime[method_name] = str(round((time() - t0) / 60, 2)) + " (exception)"
finally:
score_df[method_name] = scores
selected_df[method_name] = selected

# Find the effective number of jobs
size = len(selectors.items())
if n_jobs < 0:
n_jobs = max(mp.cpu_count() + 1 + n_jobs, 1)
n_jobs = min(n_jobs, size)

# Parallel benchmarks for each method
output_list = Parallel(n_jobs=n_jobs, require="sharedmem")(
delayed(_parallel_bench)(
data, labels, method_name, method, verbose)
for method_name, method in selectors.items())

# Collect the output from each method
for output in output_list:
for method_name, results_dict in output.items():
score_df[method_name] = results_dict["scores"]
selected_df[method_name] = results_dict["selected"]
method_to_runtime[method_name] = results_dict["runtime"]

if output_filename is not None:
output_file.write(method_name + " " + str(method_to_runtime[method_name]) + "\n")
output_file.write(str(selected) + "\n")
output_file.write(str(scores) + "\n")
if verbose:
print(f"<<< Done! Time taken: {(time() - t0) / 60:.2f} minutes")
output_file.write(str(results_dict["selected"]) + "\n")
output_file.write(str(results_dict["scores"]) + "\n")

# Format
runtime_df = pd.Series(method_to_runtime).to_frame("runtime").rename_axis("method").reset_index()

return score_df, selected_df, runtime_df


def _parallel_bench(data: pd.DataFrame,
labels: Optional[pd.Series],
method_name: str,
method: Union[SelectionMethod.Correlation,
SelectionMethod.Linear,
SelectionMethod.TreeBased,
SelectionMethod.Statistical,
SelectionMethod.Variance],
verbose: bool) \
-> Dict[str, Dict[str, Union[pd.DataFrame, list, float]]]:
"""
Benchmark with a given set of feature selectors.
Return a dictionary of feature selection method names with their corresponding scores,
selected features and runtime.
Returns
-------
Dictionary of feature selection method names with their corresponding scores, selected features
and runtime.
"""

selector = Selective(method)
t0 = time()
if verbose:
run_str = "\n>>> Running " + method_name
print(run_str, flush=True)

try:
subset = selector.fit_transform(data, labels)
scores = selector.get_absolute_scores()
selected = [1 if c in subset.columns else 0 for c in data.columns]
runtime = round((time() - t0) / 60, 2)
except Exception as exp:
print("Exception", exp)
scores = np.repeat(0, len(data.columns))
selected = np.repeat(0, len(data.columns))
runtime = str(round((time() - t0) / 60, 2)) + " (exception)"
finally:
if verbose:
done_str = f"<<< Done! {method_name} Time taken: {(time() - t0) / 60:.2f} minutes"
print(done_str, flush=True)

results_dict = {"scores": scores, "selected": selected, "runtime": runtime}

return {method_name: results_dict}


def calculate_statistics(scores: pd.DataFrame,
selected: pd.DataFrame,
columns: Optional[list] = None,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
catboost
joblib
lightgbm
minepy
numpy
Expand Down
160 changes: 160 additions & 0 deletions tests/test_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
# Copyright FMR LLC <[email protected]>
# SPDX-License-Identifier: GNU GPLv3

from catboost import CatBoostClassifier, CatBoostRegressor
from lightgbm import LGBMClassifier, LGBMRegressor
from sklearn.datasets import load_boston, load_iris
from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor
from sklearn.ensemble import ExtraTreesClassifier, ExtraTreesRegressor
from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor
from xgboost import XGBClassifier, XGBRegressor

from feature.utils import get_data_label
from feature.selector import SelectionMethod, benchmark
from tests.test_base import BaseTest


class TestParallel(BaseTest):

num_features = 3
corr_threshold = 0.5
alpha = 1000
tree_params = {"random_state": 123, "n_estimators": 100}

selectors = {
"corr_pearson": SelectionMethod.Correlation(corr_threshold, method="pearson"),
"corr_kendall": SelectionMethod.Correlation(corr_threshold, method="kendall"),
"corr_spearman": SelectionMethod.Correlation(corr_threshold, method="spearman"),
"univ_anova": SelectionMethod.Statistical(num_features, method="anova"),
"univ_chi_square": SelectionMethod.Statistical(num_features, method="chi_square"),
"univ_mutual_info": SelectionMethod.Statistical(num_features, method="mutual_info"),
"linear": SelectionMethod.Linear(num_features, regularization="none"),
"lasso": SelectionMethod.Linear(num_features, regularization="lasso", alpha=alpha),
"ridge": SelectionMethod.Linear(num_features, regularization="ridge", alpha=alpha),
"random_forest": SelectionMethod.TreeBased(num_features),
"xgboost_clf": SelectionMethod.TreeBased(num_features, estimator=XGBClassifier(**tree_params)),
"xgboost_reg": SelectionMethod.TreeBased(num_features, estimator=XGBRegressor(**tree_params)),
"extra_clf": SelectionMethod.TreeBased(num_features, estimator=ExtraTreesClassifier(**tree_params)),
"extra_reg": SelectionMethod.TreeBased(num_features, estimator=ExtraTreesRegressor(**tree_params)),
"lgbm_clf": SelectionMethod.TreeBased(num_features, estimator=LGBMClassifier(**tree_params)),
"lgbm_reg": SelectionMethod.TreeBased(num_features, estimator=LGBMRegressor(**tree_params)),
"gradient_clf": SelectionMethod.TreeBased(num_features, estimator=GradientBoostingClassifier(**tree_params)),
"gradient_reg": SelectionMethod.TreeBased(num_features, estimator=GradientBoostingRegressor(**tree_params)),
"adaboost_clf": SelectionMethod.TreeBased(num_features, estimator=AdaBoostClassifier(**tree_params)),
"adaboost_reg": SelectionMethod.TreeBased(num_features, estimator=AdaBoostRegressor(**tree_params)),
"catboost_clf": SelectionMethod.TreeBased(num_features, estimator=CatBoostClassifier(**tree_params, silent=True)),
"catboost_reg": SelectionMethod.TreeBased(num_features, estimator=CatBoostRegressor(**tree_params, silent=True))
}

def test_benchmark_regression(self):
data, label = get_data_label(load_boston())
data = data.drop(columns=["CHAS", "NOX", "RM", "DIS", "RAD", "TAX", "PTRATIO", "INDUS"])

# Benchmark
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label)
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, verbose=True, n_jobs=1)
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, verbose=True, n_jobs=2)

# Scores
self.assertListAlmostEqual([0.069011, 0.054086, 0.061452, 0.006510, 0.954662],
score_df_sequential["linear"].to_list())
self.assertListAlmostEqual([0.056827, 0.051008, 0.053192, 0.007176, 0.923121],
score_df_sequential["lasso"].to_list())

self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())

# Selected
self.assertListEqual([1, 0, 1, 0, 1], selected_df_sequential["linear"].to_list())
self.assertListEqual([1, 0, 1, 0, 1], selected_df_sequential["lasso"].to_list())

self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p1["linear"].to_list())
self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p2["linear"].to_list())
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p1["lasso"].to_list())
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p2["lasso"].to_list())

def test_benchmark_classification(self):
data, label = get_data_label(load_iris())

# Benchmark
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label)
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, n_jobs=1)
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, n_jobs=2)

# Scores
self.assertListAlmostEqual([0.289930, 0.560744, 0.262251, 0.042721],
score_df_sequential["linear"].to_list())
self.assertListAlmostEqual([0.764816, 0.593482, 0.365352, 1.015095],
score_df_sequential["lasso"].to_list())

self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())

# Selected
self.assertListEqual([1, 1, 1, 0], selected_df_sequential["linear"].to_list())
self.assertListEqual([1, 1, 0, 1], selected_df_sequential["lasso"].to_list())

self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p1["linear"].to_list())
self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p2["linear"].to_list())
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p1["lasso"].to_list())
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p2["lasso"].to_list())

def test_benchmark_regression_cv(self):
data, label = get_data_label(load_boston())
data = data.drop(columns=["CHAS", "NOX", "RM", "DIS", "RAD", "TAX", "PTRATIO", "INDUS"])

# Benchmark
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label,
cv=5, output_filename=None)
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, cv=5,
output_filename=None, n_jobs=1)
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, cv=5,
output_filename=None, n_jobs=2)

# Aggregate scores from different cv-folds
score_df_sequential = score_df_sequential.groupby(score_df_sequential.index).mean()
score_df_p1 = score_df_p1.groupby(score_df_p1.index).mean()
score_df_p2 = score_df_p2.groupby(score_df_p2.index).mean()

# Scores
self.assertListAlmostEqual([0.061577, 0.006446, 0.066933, 0.957603, 0.053797],
score_df_sequential["linear"].to_list())
self.assertListAlmostEqual([0.053294, 0.007117, 0.054563, 0.926039, 0.050716],
score_df_sequential["lasso"].to_list())

self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())

def test_benchmark_classification_cv(self):
data, label = get_data_label(load_iris())

# Benchmark
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label,
cv=5, output_filename=None)
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, cv=5,
output_filename=None, n_jobs=1)
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, cv=5,
output_filename=None, n_jobs=2)

# Aggregate scores from different cv-folds
score_df_sequential = score_df_sequential.groupby(score_df_sequential.index).mean()
score_df_p1 = score_df_p1.groupby(score_df_p1.index).mean()
score_df_p2 = score_df_p2.groupby(score_df_p2.index).mean()

# Scores
self.assertListAlmostEqual([0.223276, 0.035431, 0.262547, 0.506591],
score_df_sequential["linear"].to_list())
self.assertListAlmostEqual([0.280393, 0.948935, 0.662777, 0.476188],
score_df_sequential["lasso"].to_list())

self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())

0 comments on commit a1dec9c

Please sign in to comment.