Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross validation of Pipeline/estimators using MLDataset / xarray.Dataset #221

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
55959a5
cross validation of MLDataset Pipeline
Oct 24, 2017
396f9aa
changes with CV sampling
Oct 26, 2017
33bac56
changes to cv_cache
Oct 26, 2017
b422e68
closer to working cross validation for MLDataset
Oct 26, 2017
d45d4e1
CV / xarray experimentation - work in progress
Oct 31, 2017
92054c9
MLDataset cross validation working for pipeline of 1 step that is uns…
Nov 1, 2017
35450c1
wrapped sklearn classes need to wrap score methods as fit, predict, o…
Nov 1, 2017
f86a079
update tests;fix cross validation with most data structures
Nov 3, 2017
5cf646f
a couple tests for Python 2.7
Nov 3, 2017
744109a
avoid dask-searchcv test in conda.recipe;add test_config.yml to MANIF…
Nov 3, 2017
1e7bec8
remove print statement
Nov 3, 2017
83437f5
ensure test_config.yaml included in pkg
Nov 3, 2017
de9efd0
remove elm.mldataset.cross_validation - modify environment.yml for el…
Nov 3, 2017
6267041
fix usage of is_arr utility to separate X, y tuple
Nov 3, 2017
66013e6
1850 passing tests
Nov 4, 2017
a91caf6
dask-searchcv in meta.yaml
Nov 4, 2017
e9b5d85
use elm/label/dev and elm for CI installs
Nov 4, 2017
f6ef7c8
change earthio version for fixing CI build
Nov 4, 2017
948efe5
ensure EARTHIO_CHANNEL_STR is set correctly in .travis.yml
Nov 6, 2017
edbe1f5
ensure ANACONDA_UPLOAD_USER is defined in .travis for pkg upload
Nov 6, 2017
6304e37
change order of channels to ensure dask-searchcv comes from elm
Nov 6, 2017
8a6d46f
subset the number of tests being run in CI
Nov 6, 2017
21a18d9
better diagnostics on upload failure in CI
Nov 6, 2017
8ad7b4c
remove earthio from CI
Nov 6, 2017
9a1734d
be sure to create env from elm's conda build output
Nov 6, 2017
dc47f65
remove diagnostic print from deploy section
Nov 6, 2017
00ea1be
refactor to simplify changes in dask-searchcv
Nov 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions elm/mldataset/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from elm.mldataset.util import is_mldataset
from elm.mldataset.cross_validation import * # uses __all__
114 changes: 114 additions & 0 deletions elm/mldataset/cross_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from sklearn.model_selection import KFold
from dask_searchcv.methods import CVCache
from xarray_filters.pipeline import Step
from sklearn.model_selection import GroupKFold as _GroupKFold
from sklearn.model_selection import GroupShuffleSplit as _GroupShuffleSplit
from sklearn.model_selection import KFold as _KFold
from sklearn.model_selection import LeaveOneGroupOut as _LeaveOneGroupOut
from sklearn.model_selection import LeavePGroupsOut as _LeavePGroupsOut
from sklearn.model_selection import LeaveOneOut as _LeaveOneOut
from sklearn.model_selection import LeavePOut as _LeavePOut
from sklearn.model_selection import PredefinedSplit as _PredefinedSplit
from sklearn.model_selection import RepeatedKFold as _RepeatedKFold
from sklearn.model_selection import RepeatedStratifiedKFold as _RepeatedStratifiedKFold
from sklearn.model_selection import ShuffleSplit as _ShuffleSplit
from sklearn.model_selection import StratifiedKFold as _StratifiedKFold
from sklearn.model_selection import StratifiedShuffleSplit as _StratifiedShuffleSplit
from sklearn.model_selection import TimeSeriesSplit as _TimeSeriesSplit

CV_CLASSES = [
'GroupKFold',
'GroupShuffleSplit',
'KFold',
'LeaveOneGroupOut',
'LeavePGroupsOut',
'LeaveOneOut',
'LeavePOut',
'PredefinedSplƒit',
'RepeatedKFold',
'RepeatedStratifiedKFold',
'ShuffleSplit',
'StratifiedKFold',
'StratifiedShuffleSplit',
'TimeSeriesSplit',
'MLDatasetMixin',
'CVCacheSampleId',
]

__all__ = CV_CLASSES + ['CVCacheSampleId', 'MLDatasetMixin', 'CV_CLASSES']

class CVCacheSampleId(CVCache):
def __init__(self, sampler, splits, pairwise=False, cache=True):
self.sampler = sampler
super(CVCacheSampleId, self).__init__(splits, pairwise=pairwise,
cache=cache)

def _post_splits(self, X, y=None, n=None, is_x=True, is_train=False):
if y is not None:
raise ValueError('Expected y to be None (returned by Sampler() instance or similar.')
return self.sampler.fit_transform(X)


class MLDatasetMixin:
def split(self, *args, **kw):
for test, train in super(cls, self).split(*args, **kw):
for a, b in zip(test, train):
yield a, b


class GroupKFold(_GroupKFold, MLDatasetMixin):
pass


class GroupShuffleSplit(_GroupShuffleSplit, MLDatasetMixin):
pass


class KFold(_KFold, MLDatasetMixin):
pass


class LeaveOneGroupOut(_LeaveOneGroupOut, MLDatasetMixin):
pass


class LeavePGroupsOut(_LeavePGroupsOut, MLDatasetMixin):
pass


class LeaveOneOut(_LeaveOneOut, MLDatasetMixin):
pass


class LeavePOut(_LeavePOut, MLDatasetMixin):
pass


class PredefinedSplƒit(_PredefinedSplit, MLDatasetMixin):
pass


class RepeatedKFold(_RepeatedKFold, MLDatasetMixin):
pass


class RepeatedStratifiedKFold(_RepeatedStratifiedKFold, MLDatasetMixin):
pass


class ShuffleSplit(_ShuffleSplit, MLDatasetMixin):
pass


class StratifiedKFold(_StratifiedKFold, MLDatasetMixin):
pass


class StratifiedShuffleSplit(_StratifiedShuffleSplit, MLDatasetMixin):
pass


class TimeSeriesSplit(_TimeSeriesSplit, MLDatasetMixin):
pass


26 changes: 26 additions & 0 deletions elm/mldataset/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import numpy as np
import dask.array as da


def is_mldataset(arr, raise_err=False):
try:
from xarray_filters import MLDataset
from xarray import Dataset
return True
except Exception as e:
MLDataset = Dataset = None
if not raise_err:
return False
# Much of the ML logic
# wrapping Xarray would fail
# if only xarray and not Xarray_filters
# is installed, but when xarray_filters
# is installed, xarray.Dataset can be
# used
raise ValueError('Cannot use cross validation for xarray Dataset without xarray_filters')
return MLDataset and isinstance(arr, (MLDataset, Dataset))


def is_arr(arr, raise_err=False):
is_ml = is_mldataset(arr, raise_err=raise_err)
return is_ml or isinstance(arr, (np.ndarray, da.Array))
17 changes: 12 additions & 5 deletions elm/model_selection/ea_searchcv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
RandomizedSearchCV,
DaskBaseSearchCV,
_randomized_parameters)
from dask_searchcv.utils import is_pipeline
import numpy as np
from elm.model_selection.evolve import (fit_ea,
DEFAULT_CONTROL,
ind_to_new_params,
DEFAULT_EVO_PARAMS,)
from elm.mldataset.serialize_mixin import SerializeMixin
from elm.mldataset.wrap_sklearn import SklearnMixin
from elm.mldataset.cv_cache import CVCacheSampleId
from elm.mldataset.util import is_arr
from elm.model_selection.sorting import pareto_front
from elm.model_selection.base import base_selection
from elm.pipeline import Pipeline
Expand Down Expand Up @@ -132,7 +135,9 @@ class EaSearchCV(RandomizedSearchCV, SklearnMixin, SerializeMixin):
parameters=_ea_parameters,
example=_ea_example)

def __init__(self, estimator, param_distributions, n_iter=10,
def __init__(self, estimator, param_distributions,
n_iter=10,
sampler=None,
random_state=None,
ngen=3, score_weights=None,
sort_fitness=pareto_front,
Expand All @@ -143,9 +148,10 @@ def __init__(self, estimator, param_distributions, n_iter=10,
scoring=None,
iid=True, refit=True,
cv=None, error_score='raise', return_train_score=True,
scheduler=None, n_jobs=-1, cache_cv=True):
scheduler=None, n_jobs=-1, cache_cv=CVCacheSampleId):
filter_kw_and_run_init(RandomizedSearchCV.__init__, **locals())
self.ngen = ngen
self.sampler = sampler
self.select_with_test = select_with_test
self.model_selection = model_selection
self.model_selection_kwargs = model_selection_kwargs
Expand Down Expand Up @@ -264,10 +270,11 @@ def _as_dask_array(self, X, y=None, **kw):

def fit(self, X, y=None, groups=None, **fit_params):
self._open()
X, y = self._as_dask_array(X, y=y)
if not self.get_params('sampler'):
X, y = self._as_dask_array(X, y=y)
for self._gen in range(self.ngen):
print('Generation', self._gen)
RandomizedSearchCV.fit(self, X, y, groups, **fit_params)
RandomizedSearchCV.fit(self, X, y, groups=groups, **fit_params)
fitnesses = self._get_cv_scores()
self.cv_results_all_gen_ = _concat_cv_results(self.cv_results_all_gen_,
self.cv_results_,
Expand All @@ -289,7 +296,7 @@ def fit(self, X, y=None, groups=None, **fit_params):
return self

def _get_param_iterator(self):
if self._is_ea and not getattr(self, '_invalid_ind', None):
if self._gen != 0 and self._is_ea and not getattr(self, '_invalid_ind', None):
return iter(())
if not self._is_ea and self._gen == 0:
self.next_params_ = tuple(RandomizedSearchCV._get_param_iterator(self))
Expand Down
28 changes: 12 additions & 16 deletions elm/pipeline/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ def get_module_classes(m):
return {attr: getattr(module, attr) for attr in attrs}


def patch_cls(cls):

class Wrapped(SklearnMixin, cls):
_cls = cls
__init__ = cls.__init__
_cls_name = cls.__name__
name = 'Elm{}'.format(cls.__name__)
globals()[name] = Wrapped
return globals()[name]


_all = []
_seen = set()
ALL_STEPS = {}
Expand All @@ -55,12 +44,20 @@ class Wrapped(SklearnMixin, cls):
for cls in get_module_classes(m).values():
if cls.__name__ in _seen:
continue
if not m in cls.__module__:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just checking that we are getting StandardScaler or similar from the sklearn module where it is actually defined, not some other one where it is imported for internal usage.

continue
_seen.add(cls.__name__)
w = patch_cls(cls)
if any(s in cls.__name__ for s in SKIP):
name = cls.__name__
if any(s in name for s in SKIP):
continue
this_module[cls.__name__] = w
ALL_STEPS[(m, cls.__name__)] = w
class Wrapped(SklearnMixin, cls):
_cls = cls
__init__ = cls.__init__
_cls_name = name

globals()[name] = Wrapped
this_module[cls.__name__] = globals()[name]
ALL_STEPS[(m, cls.__name__)] = globals()[name]
this_module = Namespace(**this_module)
if m == 'cluster.bicluster':
bicluster = this_module # special case (dotted name)
Expand All @@ -75,5 +72,4 @@ class Wrapped(SklearnMixin, cls):
del _all
del m
del this_module
del w
del _seen
104 changes: 104 additions & 0 deletions elm/tests/test_xarray_cross_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import print_function, unicode_literals, division

from collections import OrderedDict
import datetime

from sklearn.metrics import r2_score, mean_squared_error, make_scorer
from sklearn.model_selection import StratifiedShuffleSplit
from xarray_filters import MLDataset
from xarray_filters.datasets import make_regression
from xarray_filters.pipeline import Generic, Step
import numpy as np
import pytest


from elm.mldataset import CV_CLASSES
from elm.model_selection import EaSearchCV
from elm.model_selection.sorting import pareto_front
from elm.pipeline import Pipeline
from elm.pipeline.predict_many import predict_many
from elm.pipeline.steps import linear_model,cluster
import elm.mldataset.cross_validation as cross_validation

START_DATE = datetime.datetime(2000, 1, 1, 0, 0, 0)
MAX_TIME_STEPS = 144
DATES = np.array([START_DATE - datetime.timedelta(hours=hr)
for hr in range(MAX_TIME_STEPS)])
DATE_GROUPS = np.linspace(0, 5, DATES.size).astype(np.int32)


# TODO - also test regressors
param_distributions = {
'estimator__fit_intercept': [True, False],
}

param_distributions = {
'estimator__n_clusters': [4,5,6,7,8, 10, 12],
'estimator__init': ['k-means++', 'random'],
'estimator__copy_x': [False],
'estimator__algorithm': ["auto", "full", "auto"],
}

model_selection = {
'select_method': 'selNSGA2',
'crossover_method': 'cxTwoPoint',
'mutate_method': 'mutUniformInt',
'init_pop': 'random',
'indpb': 0.5,
'mutpb': 0.9,
'cxpb': 0.3,
'eta': 20,
'ngen': 2,
'mu': 16,
'k': 8, # TODO ensure that k is not ignored - make elm issue if it is
'early_stop': None
}

def example_function(date):
dset = make_regression()
dset.attrs['example_function_argument'] = date
# TODO - this is not really testing
# MLDataset as X because of .features.values below
return dset.to_features(keep_attrs=True).features.values


class Sampler(Step):
def transform(self, X, y=None, **kw):
return example_function(X)


class GetY(Step):
layer = 'y'
def transform(self, X, y=None, **kw):
layer = self.get_params()['layer']
y = getattr(X, layer).values.ravel()
X = MLDataset(OrderedDict([(k, v) for k, v in X.data_vars.items()
if k != layer])).to_features()
return X.features.values, y

pipe = Pipeline([ # TODO see note above about supervised models
('get_y', GetY()),
('estimator', linear_model.LinearRegression(n_jobs=-1)),
])

pipe = Pipeline([
#('get_y', GetY()), # TODO this wasn't working but should
('estimator', cluster.KMeans(n_jobs=1)),
])

@pytest.mark.parametrize('cls', CV_CLASSES)
def test_each_cv(cls):
cv = getattr(cross_validation, cls)()
ea = EaSearchCV(pipe,
param_distributions=param_distributions,
sampler=Sampler(),
ngen=2,
model_selection=model_selection,
cv=cv,
refit=False) # TODO refit = True

print(ea.get_params())
ea.fit(DATES, groups=DATE_GROUPS)
results = getattr(ea, 'cv_results_', None)
assert isinstance(results, dict) and 'gen' in results and all(getattr(v,'size',v) for v in results.values())