From 47834f418ab709ec8b0707882163c74e6061b7df Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Thu, 26 Oct 2017 08:38:31 -0700 Subject: [PATCH 01/16] cross validation for xarray_filters.MLDataset - Elm PR 221 --- dask_searchcv/methods.py | 51 ++++++++++++++++++--- dask_searchcv/model_selection.py | 76 +++++++++++++++++++++----------- dask_searchcv/utils.py | 22 +++++++++ 3 files changed, 118 insertions(+), 31 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 057ee6b..308950d 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -15,10 +15,13 @@ from sklearn.exceptions import FitFailedWarning from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.utils import safe_indexing -from sklearn.utils.validation import check_consistent_length, _is_arraylike +from sklearn.utils.validation import (check_consistent_length as _check_consistent_length, + _is_arraylike) from .utils import copy_estimator + + # Copied from scikit-learn/sklearn/utils/fixes.py, can be removed once we drop # support for scikit-learn < 0.18.1 or numpy < 1.12.0. if LooseVersion(np.__version__) < '1.12.0': @@ -64,6 +67,17 @@ def warn_fit_failure(error_score, e): # ----------------------- # + +def check_consistent_length(*arrays): + try: + from elm.mldataset import is_mldataset + except: + is_mldataset = lambda x: False + if any(is_mldataset(arr) for arr in arrays): + return True # TODO ? consequence? + return _check_consistent_length(*arrays) + + class CVCache(object): def __init__(self, splits, pairwise=False, cache=True): self.splits = splits @@ -101,7 +115,11 @@ def _extract(self, X, y, n, is_x=True, is_train=True): return self.cache[n, is_x, is_train] inds = self.splits[n][0] if is_train else self.splits[n][1] - result = safe_indexing(X if is_x else y, inds) + post_splits = getattr(self, '_post_splits', None) + if post_splits: + result = post_splits(inds) + else: + result = safe_indexing(X if is_x else y, inds) if self.cache is not None: self.cache[n, is_x, is_train] = result @@ -117,16 +135,33 @@ def _extract_pairwise(self, X, y, n, is_train=True): if X.shape[0] != X.shape[1]: raise ValueError("X should be a square kernel matrix") train, test = self.splits[n] + post_splits = getattr(self, '_post_splits', None) result = X[np.ix_(train if is_train else test, train)] - + if post_splits: + result = post_splits(result) if self.cache is not None: self.cache[n, True, is_train] = result return result -def cv_split(cv, X, y, groups, is_pairwise, cache): - check_consistent_length(X, y, groups) - return CVCache(list(cv.split(X, y, groups)), is_pairwise, cache) +def cv_split(cv, X, y, groups, is_pairwise, cache, sampler): + print('cv, X, y, groups, is_pairwise, cache, sampler', cv, X, y, groups, is_pairwise, cache, sampler) + kw = dict(pairwise=is_pairwise, cache=cache) + if sampler is None: + _check_consistent_length(X, y, groups) + if cache and not hasattr(cache, 'extract'): + cls = CVCache + kw.pop('sampler') + else: + cls = cache + kw['cache'] = True + splits = list(cv.split(X, y, groups)) + print('cls', cls, splits, kw) + if sampler: + args = (sampler, splits,) + else: + args = (splits,) + return cls(*args, **kw) def cv_n_samples(cvs): @@ -200,11 +235,13 @@ def set_params(est, fields=None, params=None, copy=True): # TODO: rewrite set_params to avoid lock for classes that use the standard # set_params/get_params methods with SET_PARAMS_LOCK: + #print('params sp', fields, est, params) return est.set_params(**params) def fit(est, X, y, error_score='raise', fields=None, params=None, fit_params=None): + #print('estxxxx', est, X, y, fields, params, fit_params) if X is FIT_FAILURE: est, fit_time = FIT_FAILURE, 0.0 else: @@ -226,6 +263,7 @@ def fit(est, X, y, error_score='raise', fields=None, params=None, def fit_transform(est, X, y, error_score='raise', fields=None, params=None, fit_params=None): + #print('estftxxx', est, fields, params, fit_params) if X is FIT_FAILURE: est, fit_time, Xt = FIT_FAILURE, 0.0, FIT_FAILURE else: @@ -302,6 +340,7 @@ def _store(results, key_name, array, n_splits, n_candidates, def create_cv_results(scores, candidate_params, n_splits, error_score, weights): + print('scores, candidate_params, n_splits, error_score, weights', scores, candidate_params, n_splits, error_score, weights) if len(scores[0]) == 4: fit_times, test_scores, score_times, train_scores = zip(*scores) else: diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 5ca0cdf..366dde5 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -38,7 +38,7 @@ cv_n_samples, cv_extract, cv_extract_params, decompress_params, score, feature_union, feature_union_concat, MISSING) -from .utils import to_indexable, to_keys, unzip +from .utils import to_indexable, to_keys, unzip, is_pipeline,_get_est_type try: from cytoolz import get, pluck @@ -62,18 +62,16 @@ def __call__(self, est): def build_graph(estimator, cv, scorer, candidate_params, X, y=None, - groups=None, fit_params=None, iid=True, refit=True, - error_score='raise', return_train_score=True, cache_cv=True): - + groups=None, sampler=None, fit_params=None, iid=True, refit=True, + error_score='raise', return_train_score=True, + cache_cv=True): + dsk = {} + # "pairwise" estimators require a different graph for CV splitting X, y, groups = to_indexable(X, y, groups) cv = check_cv(cv, y, is_classifier(estimator)) - # "pairwise" estimators require a different graph for CV splitting is_pairwise = getattr(estimator, '_pairwise', False) - - dsk = {} X_name, y_name, groups_name = to_keys(dsk, X, y, groups) n_splits = compute_n_splits(cv, X, y, groups) - if fit_params: # A mapping of {name: (name, graph-key)} param_values = to_indexable(*fit_params.values(), allow_scalars=True) @@ -88,8 +86,9 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, error_score == 'raise', return_train_score) cv_name = 'cv-split-' + main_token + dsk[cv_name] = (cv_split, cv, X_name, y_name, groups_name, - is_pairwise, cache_cv) + is_pairwise, cache_cv, sampler) if iid: weights = 'cv-n-samples-' + main_token @@ -162,13 +161,18 @@ def _group_fit_params(steps, fit_params): def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, X, y, fit_params, n_splits, error_score, scorer, return_train_score): - if not isinstance(est, Pipeline): + print('do_fit_and_score', dsk, main_token, est, cv, fields, tokens, params, + X, y, fit_params, n_splits, error_score, scorer, + return_train_score) + if not is_pipeline(est): # Fitting and scoring can all be done as a single task + print('not pipeline') n_and_fit_params = _get_fit_params(cv, fit_params, n_splits) - est_type = type(est).__name__.lower() + est_type = _get_est_type(est) est_name = '%s-%s' % (est_type, main_token) score_name = '%s-fit-score-%s' % (est_type, main_token) + print('score_name', score_name) dsk[est_name] = est seen = {} @@ -190,6 +194,7 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, m += 1 scores = [k + (n,) for n in range(n_splits) for k in out] else: + print('pipeline', cv, X, y) X_train = (cv_extract, cv, X, y, True, True) X_test = (cv_extract, cv, X, y, True, False) y_train = (cv_extract, cv, X, y, False, True) @@ -198,15 +203,17 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, # Fit the estimator on the training data X_trains = [X_train] * len(params) y_trains = [y_train] * len(params) + print('xtyt', X_trains, y_trains, est, cv, error_score) fit_ests = do_fit(dsk, TokenIterator(main_token), est, cv, fields, tokens, params, X_trains, y_trains, fit_params, n_splits, error_score) - + print('fit_ests', fit_ests) score_name = 'score-' + main_token scores = [] scores_append = scores.append for n in range(n_splits): + print('n', n) if return_train_score: xtrain = X_train + (n,) ytrain = y_train + (n,) @@ -215,7 +222,7 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, xtest = X_test + (n,) ytest = y_test + (n,) - + print('xtyt2222', xtest, ytest) for (name, m) in fit_ests: dsk[(score_name, m, n)] = (score, (name, m, n), xtest, ytest, xtrain, ytrain, scorer) @@ -225,7 +232,8 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): - if isinstance(est, Pipeline) and params is not None: + if is_pipeline(est) and params is not None: + #print('pppp', params) return _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, False) else: @@ -236,7 +244,7 @@ def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fields = None token = next_token(est) - est_type = type(est).__name__.lower() + est_type = _get_est_type(est) est_name = '%s-%s' % (est_type, token) fit_name = '%s-fit-%s' % (est_type, token) dsk[est_name] = est @@ -263,7 +271,7 @@ def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): - if isinstance(est, Pipeline) and params is not None: + if is_pipeline(est) and params is not None: return _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, True) elif isinstance(est, FeatureUnion) and params is not None: @@ -292,9 +300,11 @@ def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, for X, y, t, p in zip(Xs, ys, tokens, params): if (X, y, t) in seen: + print('seen', seen) out_append(seen[X, y, t]) else: for n, fit_params in n_and_fit_params: + print('nfppp', n, fit_params) dsk[(fit_Xt_name, m, n)] = (fit_transform, est_name, X + (n,), y + (n,), error_score, fields, p, @@ -343,7 +353,12 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, is_transform): sub_fields, sub_inds = map(list, unzip(step_fields_lk[step_name], 2)) sub_fit_params = fit_params_lk[step_name] - + print('sssss', sub_fields, sub_inds, sub_fit_params, step_name in field_to_index, step_name, field_to_index) + nnn = '''next_token, step, cv, fields, tokens, params, Xs, ys,fit_params, n_splits, error_score, step_fields_lk,fit_params_lk, field_to_index, step_name, none_passthrough,is_transform'''.split(',') + print(dict(zip(nnn, (next_token, step, cv, fields, tokens, params, Xs, ys, + fit_params, n_splits, error_score, step_fields_lk, + fit_params_lk, field_to_index, step_name, none_passthrough, + is_transform)))) if step_name in field_to_index: # The estimator may change each call new_fits = {} @@ -351,12 +366,14 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, est_index = field_to_index[step_name] for ids in _group_ids_by_index(est_index, tokens): + print('ids', ids) # Get the estimator for this subgroup sub_est = params[ids[0]][est_index] if sub_est is MISSING: sub_est = step # If an estimator is `None`, there's nothing to do + print('nffff', new_fits) if sub_est is None: nones = dict.fromkeys(ids, None) new_fits.update(nones) @@ -377,6 +394,7 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, sub_tokens = sub_params = None if is_transform: + print('do_fit_transform') sub_fits, sub_Xs = do_fit_transform(dsk, next_token, sub_est, cv, sub_fields, sub_tokens, sub_params, @@ -386,6 +404,7 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, new_Xs.update(zip(ids, sub_Xs)) new_fits.update(zip(ids, sub_fits)) else: + print('do_fit') sub_fits = do_fit(dsk, next_token, sub_est, cv, sub_fields, sub_tokens, sub_params, sub_Xs, sub_ys, sub_fit_params, @@ -425,22 +444,24 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, is_transform): if 'steps' in fields: raise NotImplementedError("Setting Pipeline.steps in a gridsearch") - + print('fiessss', fields, est, cv, params, fit_params, est.steps) field_to_index, step_fields_lk = _group_subparams(est.steps, fields) fit_params_lk = _group_fit_params(est.steps, fit_params) - + print('lllllk', fit_params_lk) # A list of (step, is_transform) instrs = [(s, True) for s in est.steps[:-1]] instrs.append((est.steps[-1], is_transform)) - + print('insss', instrs) fit_steps = [] for (step_name, step), transform in instrs: + # print('stepn', step_name, step, transform) fits, Xs = _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, step_fields_lk, fit_params_lk, field_to_index, step_name, True, transform) fit_steps.append(fits) - + print('ftt', fits) + print('lennnn', len(fit_steps), fit_steps) # Rebuild the pipelines step_names = [n for n, _ in est.steps] out_ests = [] @@ -449,7 +470,9 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, m = 0 seen = {} for steps in zip(*fit_steps): + print('stepssss', steps, seen) if steps in seen: + # print('continue') out_ests_append(seen[steps]) else: for n in range(n_splits): @@ -775,10 +798,11 @@ def fit(self, X, y=None, groups=None, **fit_params): error_score == 'raise'): raise ValueError("error_score must be the string 'raise' or a" " numeric value.") - dsk, keys, n_splits = build_graph(estimator, self.cv, self.scorer_, list(self._get_param_iterator()), - X, y, groups, fit_params, + X=X, y=y, groups=groups, + sampler=self.sampler, + fit_params=fit_params, iid=self.iid, refit=self.refit, error_score=error_score, @@ -893,11 +917,12 @@ def visualize(self, filename='mydask', format=None, **kwargs): distributed schedulers. If ``n_jobs == -1`` [default] all cpus are used. For ``n_jobs < -1``, ``(n_cpus + 1 + n_jobs)`` are used. -cache_cv : bool, default=True +cache_cv : bool or CVCache-like class, default=True Whether to extract each train/test subset at most once in each worker process, or every time that subset is needed. Caching the splits can speedup computation at the cost of increased memory usage per worker - process. + process. If cache_cv is a class, then it is used in place of CVCache + (and extraction is assumed to be at most once). If True, worst case memory usage is ``(n_splits + 1) * (X.nbytes + y.nbytes)`` per worker. If False, worst case memory usage is @@ -1055,6 +1080,7 @@ def __init__(self, estimator, param_grid, scoring=None, iid=True, self.param_grid = param_grid def _get_param_iterator(self): + print('get') """Return ParameterGrid instance for the given param_grid""" return model_selection.ParameterGrid(self.param_grid) diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index 674c888..ae8b157 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import, division, print_function, unicode_literals import copy import dask.array as da @@ -5,6 +6,27 @@ from dask.delayed import delayed, Delayed from sklearn.utils.validation import indexable, _is_arraylike +from sklearn.pipeline import Pipeline as sk_Pipeline + +def is_pipeline(estimator): + print('is_pipeline', estimator) + if isinstance(estimator, sk_Pipeline): + ret = True + try: + from elm.pipeline import Pipeline as elm_Pipeline + ret = isinstance(estimator, elm_Pipeline) + except: + ret = False + print('is_pipeline', estimator, ret) + return ret + +def _get_est_type(est): + if hasattr(est, '_cls') and hasattr(est._cls, '__name__'): + est_type = est._cls.__name__.lower() + else: + est_type = type(est).__name__.lower() + print('_get_est_type', est_type) + return est_type def _indexable(x): From 6c61b0490f84ae4471b133ea666901c6d8154107 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Thu, 26 Oct 2017 11:17:02 -0700 Subject: [PATCH 02/16] diagnostic printing --- dask_searchcv/methods.py | 18 +++++++++++++++--- dask_searchcv/model_selection.py | 13 +++++++++---- dask_searchcv/utils.py | 2 -- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 308950d..2f2cbcd 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -83,15 +83,19 @@ def __init__(self, splits, pairwise=False, cache=True): self.splits = splits self.pairwise = pairwise self.cache = {} if cache else None + print('CVCache', vars(self)) def __reduce__(self): + print('reduce') return (CVCache, (self.splits, self.pairwise, self.cache is not None)) def num_test_samples(self): + print('num_test_samples') return np.array([i.sum() if i.dtype == bool else len(i) for i in pluck(1, self.splits)]) def extract(self, X, y, n, is_x=True, is_train=True): + print('extract') if is_x: if self.pairwise: return self._extract_pairwise(X, y, n, is_train=is_train) @@ -101,6 +105,7 @@ def extract(self, X, y, n, is_x=True, is_train=True): return self._extract(X, y, n, is_x=False, is_train=is_train) def extract_param(self, key, x, n): + print('extract_param', key, x, n) if self.cache is not None and (n, key) in self.cache: return self.cache[n, key] @@ -111,21 +116,24 @@ def extract_param(self, key, x, n): return out def _extract(self, X, y, n, is_x=True, is_train=True): + print('_extract', X, y, n, is_x, is_train) if self.cache is not None and (n, is_x, is_train) in self.cache: return self.cache[n, is_x, is_train] inds = self.splits[n][0] if is_train else self.splits[n][1] post_splits = getattr(self, '_post_splits', None) + print('post', post_splits) if post_splits: result = post_splits(inds) else: result = safe_indexing(X if is_x else y, inds) - + print('result') if self.cache is not None: self.cache[n, is_x, is_train] = result return result def _extract_pairwise(self, X, y, n, is_train=True): + print('pairwise', X, y, n, is_train) if self.cache is not None and (n, True, is_train) in self.cache: return self.cache[n, True, is_train] @@ -136,6 +144,7 @@ def _extract_pairwise(self, X, y, n, is_train=True): raise ValueError("X should be a square kernel matrix") train, test = self.splits[n] post_splits = getattr(self, '_post_splits', None) + print('post_splits', post_splits) result = X[np.ix_(train if is_train else test, train)] if post_splits: result = post_splits(result) @@ -156,7 +165,7 @@ def cv_split(cv, X, y, groups, is_pairwise, cache, sampler): cls = cache kw['cache'] = True splits = list(cv.split(X, y, groups)) - print('cls', cls, splits, kw) + print('cls', cls, splits, kw, sampler) if sampler: args = (sampler, splits,) else: @@ -263,7 +272,7 @@ def fit(est, X, y, error_score='raise', fields=None, params=None, def fit_transform(est, X, y, error_score='raise', fields=None, params=None, fit_params=None): - #print('estftxxx', est, fields, params, fit_params) + print('estftxxx', est, fields, params, fit_params) if X is FIT_FAILURE: est, fit_time, Xt = FIT_FAILURE, 0.0, FIT_FAILURE else: @@ -307,6 +316,9 @@ def score(est_and_time, X_test, y_test, X_train, y_train, scorer): def fit_and_score(est, cv, X, y, n, scorer, error_score='raise', fields=None, params=None, fit_params=None, return_train_score=True): + print('fit_and_score', est, cv, X, y, n, scorer, + error_score, fields, params, + fit_params, return_train_score) X_train = cv.extract(X, y, n, True, True) y_train = cv.extract(X, y, n, False, True) X_test = cv.extract(X, y, n, True, False) diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 366dde5..166c1ed 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -72,6 +72,7 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, is_pairwise = getattr(estimator, '_pairwise', False) X_name, y_name, groups_name = to_keys(dsk, X, y, groups) n_splits = compute_n_splits(cv, X, y, groups) + print('X_name, y_name, n_splits, is_pairwise, X, y, groups', X_name, y_name, n_splits, is_pairwise, X, y, groups) if fit_params: # A mapping of {name: (name, graph-key)} param_values = to_indexable(*fit_params.values(), allow_scalars=True) @@ -79,7 +80,7 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, zip(fit_params, to_keys(dsk, *param_values))} else: fit_params = {} - + print('fit_params', fit_params) fields, tokens, params = normalize_params(candidate_params) main_token = tokenize(normalize_estimator(estimator), fields, params, X_name, y_name, groups_name, fit_params, cv, @@ -118,7 +119,7 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, dsk[best_estimator] = (fit_best, clone(estimator), best_params, X_name, y_name, fit_params) keys.append(best_estimator) - + print('dsk,keys, n_splits', dsk,keys, n_splits) return dsk, keys, n_splits @@ -232,8 +233,10 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): + print('do_fit', dsk, next_token, est, cv, fields, tokens, params, Xs, ys, + fit_params, n_splits, error_score) if is_pipeline(est) and params is not None: - #print('pppp', params) + print('pppp', params) return _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, False) else: @@ -528,6 +531,7 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, params, Xs, ys, fit_params, n_splits, error_score, step_fields_lk, fit_params_lk, field_to_index, step_name, False, True) + print('fitts outs', fits, out_Xs) fit_steps.append(fits) tr_Xs.append(out_Xs) @@ -561,6 +565,7 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, seen = {} for steps, Xs, wt, (w, wl), nsamp in zip(zip(*fit_steps), zip(*tr_Xs), weight_tokens, weights, n_samples): + print('sX,w,wwl, nsamp', steps, Xs, wt, (w, wl), nsamp) if (steps, wt) in seen: out_append(seen[steps, wt]) else: @@ -810,7 +815,7 @@ def fit(self, X, y=None, groups=None, **fit_params): cache_cv=self.cache_cv) self.dask_graph_ = dsk self.n_splits_ = n_splits - + print('fit vars(self)', vars(self)) n_jobs = _normalize_n_jobs(self.n_jobs) scheduler = _normalize_scheduler(self.scheduler, n_jobs) diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index ae8b157..d585d57 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -9,7 +9,6 @@ from sklearn.pipeline import Pipeline as sk_Pipeline def is_pipeline(estimator): - print('is_pipeline', estimator) if isinstance(estimator, sk_Pipeline): ret = True try: @@ -17,7 +16,6 @@ def is_pipeline(estimator): ret = isinstance(estimator, elm_Pipeline) except: ret = False - print('is_pipeline', estimator, ret) return ret def _get_est_type(est): From ce34117a23c4ecc2543b32683bd6787102960ced Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Thu, 26 Oct 2017 16:53:20 -0700 Subject: [PATCH 03/16] remove print statements --- dask_searchcv/methods.py | 23 ++------------- dask_searchcv/model_selection.py | 49 ++++---------------------------- dask_searchcv/utils.py | 5 ++-- 3 files changed, 10 insertions(+), 67 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 2f2cbcd..6ee6169 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -83,19 +83,15 @@ def __init__(self, splits, pairwise=False, cache=True): self.splits = splits self.pairwise = pairwise self.cache = {} if cache else None - print('CVCache', vars(self)) def __reduce__(self): - print('reduce') return (CVCache, (self.splits, self.pairwise, self.cache is not None)) def num_test_samples(self): - print('num_test_samples') return np.array([i.sum() if i.dtype == bool else len(i) for i in pluck(1, self.splits)]) def extract(self, X, y, n, is_x=True, is_train=True): - print('extract') if is_x: if self.pairwise: return self._extract_pairwise(X, y, n, is_train=is_train) @@ -105,7 +101,6 @@ def extract(self, X, y, n, is_x=True, is_train=True): return self._extract(X, y, n, is_x=False, is_train=is_train) def extract_param(self, key, x, n): - print('extract_param', key, x, n) if self.cache is not None and (n, key) in self.cache: return self.cache[n, key] @@ -116,24 +111,20 @@ def extract_param(self, key, x, n): return out def _extract(self, X, y, n, is_x=True, is_train=True): - print('_extract', X, y, n, is_x, is_train) if self.cache is not None and (n, is_x, is_train) in self.cache: return self.cache[n, is_x, is_train] inds = self.splits[n][0] if is_train else self.splits[n][1] post_splits = getattr(self, '_post_splits', None) - print('post', post_splits) if post_splits: - result = post_splits(inds) + result = post_splits(np.array(X)[inds]) else: result = safe_indexing(X if is_x else y, inds) - print('result') if self.cache is not None: self.cache[n, is_x, is_train] = result return result def _extract_pairwise(self, X, y, n, is_train=True): - print('pairwise', X, y, n, is_train) if self.cache is not None and (n, True, is_train) in self.cache: return self.cache[n, True, is_train] @@ -144,7 +135,6 @@ def _extract_pairwise(self, X, y, n, is_train=True): raise ValueError("X should be a square kernel matrix") train, test = self.splits[n] post_splits = getattr(self, '_post_splits', None) - print('post_splits', post_splits) result = X[np.ix_(train if is_train else test, train)] if post_splits: result = post_splits(result) @@ -154,18 +144,15 @@ def _extract_pairwise(self, X, y, n, is_train=True): def cv_split(cv, X, y, groups, is_pairwise, cache, sampler): - print('cv, X, y, groups, is_pairwise, cache, sampler', cv, X, y, groups, is_pairwise, cache, sampler) kw = dict(pairwise=is_pairwise, cache=cache) if sampler is None: _check_consistent_length(X, y, groups) if cache and not hasattr(cache, 'extract'): cls = CVCache - kw.pop('sampler') else: cls = cache kw['cache'] = True splits = list(cv.split(X, y, groups)) - print('cls', cls, splits, kw, sampler) if sampler: args = (sampler, splits,) else: @@ -244,13 +231,11 @@ def set_params(est, fields=None, params=None, copy=True): # TODO: rewrite set_params to avoid lock for classes that use the standard # set_params/get_params methods with SET_PARAMS_LOCK: - #print('params sp', fields, est, params) return est.set_params(**params) def fit(est, X, y, error_score='raise', fields=None, params=None, fit_params=None): - #print('estxxxx', est, X, y, fields, params, fit_params) if X is FIT_FAILURE: est, fit_time = FIT_FAILURE, 0.0 else: @@ -272,7 +257,6 @@ def fit(est, X, y, error_score='raise', fields=None, params=None, def fit_transform(est, X, y, error_score='raise', fields=None, params=None, fit_params=None): - print('estftxxx', est, fields, params, fit_params) if X is FIT_FAILURE: est, fit_time, Xt = FIT_FAILURE, 0.0, FIT_FAILURE else: @@ -316,9 +300,6 @@ def score(est_and_time, X_test, y_test, X_train, y_train, scorer): def fit_and_score(est, cv, X, y, n, scorer, error_score='raise', fields=None, params=None, fit_params=None, return_train_score=True): - print('fit_and_score', est, cv, X, y, n, scorer, - error_score, fields, params, - fit_params, return_train_score) X_train = cv.extract(X, y, n, True, True) y_train = cv.extract(X, y, n, False, True) X_test = cv.extract(X, y, n, True, False) @@ -352,7 +333,7 @@ def _store(results, key_name, array, n_splits, n_candidates, def create_cv_results(scores, candidate_params, n_splits, error_score, weights): - print('scores, candidate_params, n_splits, error_score, weights', scores, candidate_params, n_splits, error_score, weights) + if len(scores[0]) == 4: fit_times, test_scores, score_times, train_scores = zip(*scores) else: diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 166c1ed..514da8f 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -72,7 +72,6 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, is_pairwise = getattr(estimator, '_pairwise', False) X_name, y_name, groups_name = to_keys(dsk, X, y, groups) n_splits = compute_n_splits(cv, X, y, groups) - print('X_name, y_name, n_splits, is_pairwise, X, y, groups', X_name, y_name, n_splits, is_pairwise, X, y, groups) if fit_params: # A mapping of {name: (name, graph-key)} param_values = to_indexable(*fit_params.values(), allow_scalars=True) @@ -80,7 +79,6 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, zip(fit_params, to_keys(dsk, *param_values))} else: fit_params = {} - print('fit_params', fit_params) fields, tokens, params = normalize_params(candidate_params) main_token = tokenize(normalize_estimator(estimator), fields, params, X_name, y_name, groups_name, fit_params, cv, @@ -119,7 +117,6 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, dsk[best_estimator] = (fit_best, clone(estimator), best_params, X_name, y_name, fit_params) keys.append(best_estimator) - print('dsk,keys, n_splits', dsk,keys, n_splits) return dsk, keys, n_splits @@ -162,18 +159,13 @@ def _group_fit_params(steps, fit_params): def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, X, y, fit_params, n_splits, error_score, scorer, return_train_score): - print('do_fit_and_score', dsk, main_token, est, cv, fields, tokens, params, - X, y, fit_params, n_splits, error_score, scorer, - return_train_score) if not is_pipeline(est): # Fitting and scoring can all be done as a single task - print('not pipeline') n_and_fit_params = _get_fit_params(cv, fit_params, n_splits) est_type = _get_est_type(est) est_name = '%s-%s' % (est_type, main_token) score_name = '%s-fit-score-%s' % (est_type, main_token) - print('score_name', score_name) dsk[est_name] = est seen = {} @@ -195,7 +187,6 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, m += 1 scores = [k + (n,) for n in range(n_splits) for k in out] else: - print('pipeline', cv, X, y) X_train = (cv_extract, cv, X, y, True, True) X_test = (cv_extract, cv, X, y, True, False) y_train = (cv_extract, cv, X, y, False, True) @@ -204,17 +195,14 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, # Fit the estimator on the training data X_trains = [X_train] * len(params) y_trains = [y_train] * len(params) - print('xtyt', X_trains, y_trains, est, cv, error_score) fit_ests = do_fit(dsk, TokenIterator(main_token), est, cv, fields, tokens, params, X_trains, y_trains, fit_params, n_splits, error_score) - print('fit_ests', fit_ests) score_name = 'score-' + main_token scores = [] scores_append = scores.append for n in range(n_splits): - print('n', n) if return_train_score: xtrain = X_train + (n,) ytrain = y_train + (n,) @@ -223,7 +211,6 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, xtest = X_test + (n,) ytest = y_test + (n,) - print('xtyt2222', xtest, ytest) for (name, m) in fit_ests: dsk[(score_name, m, n)] = (score, (name, m, n), xtest, ytest, xtrain, ytrain, scorer) @@ -233,10 +220,7 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): - print('do_fit', dsk, next_token, est, cv, fields, tokens, params, Xs, ys, - fit_params, n_splits, error_score) if is_pipeline(est) and params is not None: - print('pppp', params) return _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, False) else: @@ -288,12 +272,12 @@ def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, params = tokens = repeat(None) fields = None - name = type(est).__name__.lower() + name = _get_est_type(est) token = next_token(est) fit_Xt_name = '%s-fit-transform-%s' % (name, token) fit_name = '%s-fit-%s' % (name, token) Xt_name = '%s-transform-%s' % (name, token) - est_name = '%s-%s' % (type(est).__name__.lower(), token) + est_name = '%s-%s' % (_get_est_type(est), token) dsk[est_name] = est seen = {} @@ -303,11 +287,9 @@ def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, for X, y, t, p in zip(Xs, ys, tokens, params): if (X, y, t) in seen: - print('seen', seen) out_append(seen[X, y, t]) else: for n, fit_params in n_and_fit_params: - print('nfppp', n, fit_params) dsk[(fit_Xt_name, m, n)] = (fit_transform, est_name, X + (n,), y + (n,), error_score, fields, p, @@ -356,12 +338,6 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, is_transform): sub_fields, sub_inds = map(list, unzip(step_fields_lk[step_name], 2)) sub_fit_params = fit_params_lk[step_name] - print('sssss', sub_fields, sub_inds, sub_fit_params, step_name in field_to_index, step_name, field_to_index) - nnn = '''next_token, step, cv, fields, tokens, params, Xs, ys,fit_params, n_splits, error_score, step_fields_lk,fit_params_lk, field_to_index, step_name, none_passthrough,is_transform'''.split(',') - print(dict(zip(nnn, (next_token, step, cv, fields, tokens, params, Xs, ys, - fit_params, n_splits, error_score, step_fields_lk, - fit_params_lk, field_to_index, step_name, none_passthrough, - is_transform)))) if step_name in field_to_index: # The estimator may change each call new_fits = {} @@ -369,14 +345,12 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, est_index = field_to_index[step_name] for ids in _group_ids_by_index(est_index, tokens): - print('ids', ids) # Get the estimator for this subgroup sub_est = params[ids[0]][est_index] if sub_est is MISSING: sub_est = step # If an estimator is `None`, there's nothing to do - print('nffff', new_fits) if sub_est is None: nones = dict.fromkeys(ids, None) new_fits.update(nones) @@ -397,7 +371,6 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, sub_tokens = sub_params = None if is_transform: - print('do_fit_transform') sub_fits, sub_Xs = do_fit_transform(dsk, next_token, sub_est, cv, sub_fields, sub_tokens, sub_params, @@ -407,7 +380,6 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, new_Xs.update(zip(ids, sub_Xs)) new_fits.update(zip(ids, sub_fits)) else: - print('do_fit') sub_fits = do_fit(dsk, next_token, sub_est, cv, sub_fields, sub_tokens, sub_params, sub_Xs, sub_ys, sub_fit_params, @@ -447,24 +419,18 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, is_transform): if 'steps' in fields: raise NotImplementedError("Setting Pipeline.steps in a gridsearch") - print('fiessss', fields, est, cv, params, fit_params, est.steps) field_to_index, step_fields_lk = _group_subparams(est.steps, fields) fit_params_lk = _group_fit_params(est.steps, fit_params) - print('lllllk', fit_params_lk) # A list of (step, is_transform) instrs = [(s, True) for s in est.steps[:-1]] instrs.append((est.steps[-1], is_transform)) - print('insss', instrs) fit_steps = [] for (step_name, step), transform in instrs: - # print('stepn', step_name, step, transform) fits, Xs = _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, step_fields_lk, fit_params_lk, field_to_index, step_name, True, transform) fit_steps.append(fits) - print('ftt', fits) - print('lennnn', len(fit_steps), fit_steps) # Rebuild the pipelines step_names = [n for n, _ in est.steps] out_ests = [] @@ -473,9 +439,7 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, m = 0 seen = {} for steps in zip(*fit_steps): - print('stepssss', steps, seen) if steps in seen: - # print('continue') out_ests_append(seen[steps]) else: for n in range(n_splits): @@ -531,7 +495,6 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, params, Xs, ys, fit_params, n_splits, error_score, step_fields_lk, fit_params_lk, field_to_index, step_name, False, True) - print('fitts outs', fits, out_Xs) fit_steps.append(fits) tr_Xs.append(out_Xs) @@ -565,7 +528,6 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, seen = {} for steps, Xs, wt, (w, wl), nsamp in zip(zip(*fit_steps), zip(*tr_Xs), weight_tokens, weights, n_samples): - print('sX,w,wwl, nsamp', steps, Xs, wt, (w, wl), nsamp) if (steps, wt) in seen: out_append(seen[steps, wt]) else: @@ -803,8 +765,11 @@ def fit(self, X, y=None, groups=None, **fit_params): error_score == 'raise'): raise ValueError("error_score must be the string 'raise' or a" " numeric value.") + candidate_params = list(self._get_param_iterator()) + if not candidate_params: + raise ValueError('_get_param_iterator() failed to yield any parameter sets') dsk, keys, n_splits = build_graph(estimator, self.cv, self.scorer_, - list(self._get_param_iterator()), + candidate_params, X=X, y=y, groups=groups, sampler=self.sampler, fit_params=fit_params, @@ -815,7 +780,6 @@ def fit(self, X, y=None, groups=None, **fit_params): cache_cv=self.cache_cv) self.dask_graph_ = dsk self.n_splits_ = n_splits - print('fit vars(self)', vars(self)) n_jobs = _normalize_n_jobs(self.n_jobs) scheduler = _normalize_scheduler(self.scheduler, n_jobs) @@ -1085,7 +1049,6 @@ def __init__(self, estimator, param_grid, scoring=None, iid=True, self.param_grid = param_grid def _get_param_iterator(self): - print('get') """Return ParameterGrid instance for the given param_grid""" return model_selection.ParameterGrid(self.param_grid) diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index d585d57..f6923f6 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -19,11 +19,10 @@ def is_pipeline(estimator): return ret def _get_est_type(est): - if hasattr(est, '_cls') and hasattr(est._cls, '__name__'): - est_type = est._cls.__name__.lower() + if hasattr(est, '_cls_name'): + est_type = est._cls_name.lower() else: est_type = type(est).__name__.lower() - print('_get_est_type', est_type) return est_type From 013b3adff7c7698aa801fc2c880099affaab8bc4 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Tue, 31 Oct 2017 18:08:10 -0700 Subject: [PATCH 04/16] fix usage of isinstance dask.base.Base -> is_dask_collection --- dask_searchcv/model_selection.py | 10 +++++----- dask_searchcv/utils.py | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 514da8f..17d0334 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -8,7 +8,7 @@ import numpy as np import dask -from dask.base import tokenize, Base +from dask.base import tokenize, is_dask_collection from dask.delayed import delayed from dask.threaded import get as threaded_get from dask.utils import derived_from @@ -558,7 +558,7 @@ def check_cv(cv=3, y=None, classifier=False): # If ``cv`` is not an integer, the scikit-learn implementation doesn't # touch the ``y`` object, so passing on a dask object is fine - if not isinstance(y, Base) or not isinstance(cv, numbers.Integral): + if not is_dask_collection(y) or not isinstance(cv, numbers.Integral): return model_selection.check_cv(cv, y, classifier) if classifier: @@ -581,7 +581,7 @@ def compute_n_splits(cv, X, y=None, groups=None): ------- n_splits : int """ - if not any(isinstance(i, Base) for i in (X, y, groups)): + if not any(is_dask_collection(i) for i in (X, y, groups)): return cv.get_n_splits(X, y, groups) if isinstance(cv, (_BaseKFold, BaseShuffleSplit)): @@ -593,12 +593,12 @@ def compute_n_splits(cv, X, y=None, groups=None): elif isinstance(cv, _CVIterableWrapper): return len(cv.cv) - elif isinstance(cv, (LeaveOneOut, LeavePOut)) and not isinstance(X, Base): + elif isinstance(cv, (LeaveOneOut, LeavePOut)) and not is_dask_collection(X): # Only `X` is referenced for these classes return cv.get_n_splits(X, None, None) elif (isinstance(cv, (LeaveOneGroupOut, LeavePGroupsOut)) and not - isinstance(groups, Base)): + is_dask_collection(groups)): # Only `groups` is referenced for these classes return cv.get_n_splits(None, None, groups) diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index f6923f6..49feea7 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -2,7 +2,7 @@ import copy import dask.array as da -from dask.base import Base, tokenize +from dask.base import is_dask_collection, tokenize from dask.delayed import delayed, Delayed from sklearn.utils.validation import indexable, _is_arraylike @@ -52,7 +52,7 @@ def to_indexable(*args, **kwargs): for x in args: if x is None or isinstance(x, da.Array): yield x - elif isinstance(x, Base): + elif is_dask_collection(x): yield delayed(indexable, pure=True)(x) else: yield indexable(x) @@ -70,7 +70,7 @@ def to_keys(dsk, *args): dsk.update(x.dask) yield x.key else: - assert not isinstance(x, Base) + assert not is_dask_collection(x) key = 'array-' + tokenize(x) dsk[key] = x yield key From 6cb7c8d726c96de8a1079a4b72cb039ad1c68c30 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Thu, 2 Nov 2017 17:40:21 -0700 Subject: [PATCH 05/16] sampler related cross validation changes --- dask_searchcv/methods.py | 50 ++++++++++++++++++++++++-------- dask_searchcv/model_selection.py | 47 ++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 6ee6169..9cf0f56 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -1,7 +1,7 @@ from __future__ import absolute_import, division, print_function import warnings -from collections import defaultdict +from collections import defaultdict, Sequence from threading import Lock from timeit import default_timer from distutils.version import LooseVersion @@ -69,6 +69,7 @@ def warn_fit_failure(error_score, e): def check_consistent_length(*arrays): + # TODO - is this function necessary? try: from elm.mldataset import is_mldataset except: @@ -77,6 +78,9 @@ def check_consistent_length(*arrays): return True # TODO ? consequence? return _check_consistent_length(*arrays) +def _is_xy_tuple(result): + return isinstance(result, tuple) and len(result) == 2 + class CVCache(object): def __init__(self, splits, pairwise=False, cache=True): @@ -118,10 +122,11 @@ def _extract(self, X, y, n, is_x=True, is_train=True): post_splits = getattr(self, '_post_splits', None) if post_splits: result = post_splits(np.array(X)[inds]) + self.cache[n, True, is_train] = result else: result = safe_indexing(X if is_x else y, inds) - if self.cache is not None: - self.cache[n, is_x, is_train] = result + if self.cache is not None: + self.cache[n, is_x, is_train] = result return result def _extract_pairwise(self, X, y, n, is_train=True): @@ -138,20 +143,36 @@ def _extract_pairwise(self, X, y, n, is_train=True): result = X[np.ix_(train if is_train else test, train)] if post_splits: result = post_splits(result) - if self.cache is not None: - self.cache[n, True, is_train] = result + if _is_xy_tuple(result): + if self.cache is not None: + self.cache[n, True, is_train], self.cache[n, False, is_train] = result + else: + if self.cache is not None: + self.cache[n, True, is_train] = result return result +class CVCacheSampler(CVCache): + def __init__(self, sampler, splits, pairwise=False, cache=True): + self.sampler = sampler + super(CVCacheSampler, self).__init__(splits, pairwise=pairwise, + cache=cache) + + def _post_splits(self, X, y=None, n=None, is_x=True, is_train=False): + if y is not None: + raise ValueError('Expected y to be None (returned by Sampler() instance or similar.') + func = getattr(self.sampler, 'fit_transform', getattr(self.sampler, 'transform', self.sampler)) + return func(X, y=y, is_x=is_x, is_train=is_train) + + def cv_split(cv, X, y, groups, is_pairwise, cache, sampler): kw = dict(pairwise=is_pairwise, cache=cache) - if sampler is None: - _check_consistent_length(X, y, groups) - if cache and not hasattr(cache, 'extract'): - cls = CVCache - else: - cls = cache + if sampler: + cls = CVCacheSampler kw['cache'] = True + else: + cls = CVCache + _check_consistent_length(X, y, groups) splits = list(cv.split(X, y, groups)) if sampler: args = (sampler, splits,) @@ -270,6 +291,8 @@ def fit_transform(est, X, y, error_score='raise', fields=None, params=None, else: est.fit(X, y, **fit_params) Xt = est.transform(X) + if isinstance(Xt, Sequence) and len(Xt) == 2: + Xt, y = Xt except Exception as e: if error_score == 'raise': raise @@ -277,12 +300,15 @@ def fit_transform(est, X, y, error_score='raise', fields=None, params=None, est = Xt = FIT_FAILURE fit_time = default_timer() - start_time - return (est, fit_time), Xt + return (est, fit_time), (Xt, y) def _score(est, X, y, scorer): if est is FIT_FAILURE: return FIT_FAILURE + if y is None and hasattr(X, '__len__') and len(X) == 2: + # TODO is this used? + X, y = X return scorer(est, X) if y is None else scorer(est, X, y) diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 17d0334..f52ae6c 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -256,6 +256,14 @@ def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, return out +def _split_Xy(X, y): + siz = getattr(X, '__len__', lambda: None)() + new_y = y + if siz == 2: + X, new_y = X + return X, new_y + + def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): if is_pipeline(est) and params is not None: @@ -276,7 +284,8 @@ def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, token = next_token(est) fit_Xt_name = '%s-fit-transform-%s' % (name, token) fit_name = '%s-fit-%s' % (name, token) - Xt_name = '%s-transform-%s' % (name, token) + Xt_name = '%s-transform-X-%s' % (name, token) + yt_name = '%s-transform-y-%s' % (name, token) est_name = '%s-%s' % (_get_est_type(est), token) dsk[est_name] = est @@ -295,12 +304,16 @@ def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, error_score, fields, p, fit_params) dsk[(fit_name, m, n)] = (getitem, (fit_Xt_name, m, n), 0) - dsk[(Xt_name, m, n)] = (getitem, (fit_Xt_name, m, n), 1) + Xty = (getitem, (fit_Xt_name, m, n), 1) + dsk[(Xt_name, m, n)] = (getitem, Xty, 0) + dsk[(yt_name, m, n)] = (getitem, Xty, 1) seen[X, y, t] = m out_append(m) m += 1 - return [(fit_name, i) for i in out], [(Xt_name, i) for i in out] + return ([(fit_name, i) for i in out], + [(Xt_name, i) for i in out], + [(yt_name, i) for i in out],) def _group_subparams(steps, fields, ignore=()): @@ -342,6 +355,7 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, # The estimator may change each call new_fits = {} new_Xs = {} + new_ys = {} est_index = field_to_index[step_name] for ids in _group_ids_by_index(est_index, tokens): @@ -371,12 +385,13 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, sub_tokens = sub_params = None if is_transform: - sub_fits, sub_Xs = do_fit_transform(dsk, next_token, + sub_fits, sub_Xs, sub_ys = do_fit_transform(dsk, next_token, sub_est, cv, sub_fields, sub_tokens, sub_params, sub_Xs, sub_ys, sub_fit_params, n_splits, error_score) + new_ys.update(zip(ids, sub_ys)) new_Xs.update(zip(ids, sub_Xs)) new_fits.update(zip(ids, sub_fits)) else: @@ -389,6 +404,7 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, all_ids = list(range(len(Xs))) if is_transform: Xs = get(all_ids, new_Xs) + ys = get(all_ids, new_ys) fits = get(all_ids, new_fits) elif step is None: # Nothing to do @@ -404,7 +420,7 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, sub_tokens = sub_params = None if is_transform: - fits, Xs = do_fit_transform(dsk, next_token, step, cv, + fits, Xs, ys = do_fit_transform(dsk, next_token, step, cv, sub_fields, sub_tokens, sub_params, Xs, ys, sub_fit_params, n_splits, error_score) @@ -412,7 +428,7 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, fits = do_fit(dsk, next_token, step, cv, sub_fields, sub_tokens, sub_params, Xs, ys, sub_fit_params, n_splits, error_score) - return (fits, Xs) if is_transform else (fits, None) + return (fits, Xs, ys) if is_transform else (fits, None, None) def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, @@ -426,10 +442,10 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, instrs.append((est.steps[-1], is_transform)) fit_steps = [] for (step_name, step), transform in instrs: - fits, Xs = _do_fit_step(dsk, next_token, step, cv, fields, tokens, - params, Xs, ys, fit_params, n_splits, - error_score, step_fields_lk, fit_params_lk, - field_to_index, step_name, True, transform) + fits, Xs, ys = _do_fit_step(dsk, next_token, step, cv, fields, tokens, + params, Xs, ys, fit_params, n_splits, + error_score, step_fields_lk, fit_params_lk, + field_to_index, step_name, True, transform) fit_steps.append(fits) # Rebuild the pipelines step_names = [n for n, _ in est.steps] @@ -491,10 +507,10 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_steps = [] tr_Xs = [] for (step_name, step) in est.transformer_list: - fits, out_Xs = _do_fit_step(dsk, next_token, step, cv, fields, tokens, - params, Xs, ys, fit_params, n_splits, - error_score, step_fields_lk, fit_params_lk, - field_to_index, step_name, False, True) + fits, out_Xs, _ = _do_fit_step(dsk, next_token, step, cv, fields, tokens, + params, Xs, ys, fit_params, n_splits, + error_score, step_fields_lk, fit_params_lk, + field_to_index, step_name, False, True) fit_steps.append(fits) tr_Xs.append(out_Xs) @@ -768,10 +784,11 @@ def fit(self, X, y=None, groups=None, **fit_params): candidate_params = list(self._get_param_iterator()) if not candidate_params: raise ValueError('_get_param_iterator() failed to yield any parameter sets') + sampler = getattr(self, 'sampler', None) dsk, keys, n_splits = build_graph(estimator, self.cv, self.scorer_, candidate_params, X=X, y=y, groups=groups, - sampler=self.sampler, + sampler=sampler, fit_params=fit_params, iid=self.iid, refit=self.refit, From 557e40d15c7530207266b5d99454d6a4e6351702 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Mon, 6 Nov 2017 11:58:06 -0800 Subject: [PATCH 06/16] deduplicate X,y splitting logic --- dask_searchcv/methods.py | 33 ++++++++++++-------------------- dask_searchcv/model_selection.py | 8 -------- dask_searchcv/utils.py | 6 ++++++ 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 9cf0f56..d8bd735 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -15,10 +15,9 @@ from sklearn.exceptions import FitFailedWarning from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.utils import safe_indexing -from sklearn.utils.validation import (check_consistent_length as _check_consistent_length, - _is_arraylike) +from sklearn.utils.validation import _is_arraylike, check_consistent_length -from .utils import copy_estimator +from .utils import copy_estimator, _split_Xy @@ -67,17 +66,6 @@ def warn_fit_failure(error_score, e): # ----------------------- # - -def check_consistent_length(*arrays): - # TODO - is this function necessary? - try: - from elm.mldataset import is_mldataset - except: - is_mldataset = lambda x: False - if any(is_mldataset(arr) for arr in arrays): - return True # TODO ? consequence? - return _check_consistent_length(*arrays) - def _is_xy_tuple(result): return isinstance(result, tuple) and len(result) == 2 @@ -166,18 +154,25 @@ def _post_splits(self, X, y=None, n=None, is_x=True, is_train=False): def cv_split(cv, X, y, groups, is_pairwise, cache, sampler): + print('cv, groups, is_pairwise, cache, sampler', + cv, groups, is_pairwise, cache, sampler) kw = dict(pairwise=is_pairwise, cache=cache) if sampler: + print('if sampler = True') cls = CVCacheSampler kw['cache'] = True else: + print('if sampler = Falsy') cls = CVCache - _check_consistent_length(X, y, groups) + check_consistent_length(X, y, groups) splits = list(cv.split(X, y, groups)) if sampler: + print('if sampler = True') args = (sampler, splits,) else: + print('if sampler = Falsy') args = (splits,) + print('ak', args, kw) return cls(*args, **kw) @@ -291,8 +286,7 @@ def fit_transform(est, X, y, error_score='raise', fields=None, params=None, else: est.fit(X, y, **fit_params) Xt = est.transform(X) - if isinstance(Xt, Sequence) and len(Xt) == 2: - Xt, y = Xt + Xt, y = _split_Xy(Xt, y) except Exception as e: if error_score == 'raise': raise @@ -306,9 +300,7 @@ def fit_transform(est, X, y, error_score='raise', fields=None, params=None, def _score(est, X, y, scorer): if est is FIT_FAILURE: return FIT_FAILURE - if y is None and hasattr(X, '__len__') and len(X) == 2: - # TODO is this used? - X, y = X + X, y = _split_Xy(X, y) return scorer(est, X) if y is None else scorer(est, X, y) @@ -359,7 +351,6 @@ def _store(results, key_name, array, n_splits, n_candidates, def create_cv_results(scores, candidate_params, n_splits, error_score, weights): - if len(scores[0]) == 4: fit_times, test_scores, score_times, train_scores = zip(*scores) else: diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index d5ab237..773ccc2 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -257,14 +257,6 @@ def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, return out -def _split_Xy(X, y): - siz = getattr(X, '__len__', lambda: None)() - new_y = y - if siz == 2: - X, new_y = X - return X, new_y - - def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): if is_pipeline(est) and params is not None: diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index 9c969e7..d410357 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -97,3 +97,9 @@ def copy_estimator(est): def unzip(itbl, n): return zip(*itbl) if itbl else [()] * n + + +def _split_Xy(X, y): + if isinstance(X, (tuple, list)) and len(X) == 2: + X, y = X + return X, y From ea512ae5cf3a34568a5d7e0720b283ff6f8e311d Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Tue, 7 Nov 2017 04:42:19 -0800 Subject: [PATCH 07/16] fix test failures related to transformers of None and feature union --- dask_searchcv/methods.py | 22 +++-------- dask_searchcv/model_selection.py | 66 +++++++++++++++++++------------- dask_searchcv/utils.py | 19 ++++++--- 3 files changed, 59 insertions(+), 48 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index d8bd735..dc9c643 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -17,7 +17,7 @@ from sklearn.utils import safe_indexing from sklearn.utils.validation import _is_arraylike, check_consistent_length -from .utils import copy_estimator, _split_Xy +from .utils import copy_estimator, _split_Xy, _is_xy_tuple @@ -65,11 +65,6 @@ def warn_fit_failure(error_score, e): # Functions in the graphs # # ----------------------- # - -def _is_xy_tuple(result): - return isinstance(result, tuple) and len(result) == 2 - - class CVCache(object): def __init__(self, splits, pairwise=False, cache=True): self.splits = splits @@ -134,8 +129,9 @@ def _extract_pairwise(self, X, y, n, is_train=True): if _is_xy_tuple(result): if self.cache is not None: self.cache[n, True, is_train], self.cache[n, False, is_train] = result - else: - if self.cache is not None: + elif self.cache is not None: + self.cache[n, True, is_train] = result + elif self.cache is not None: self.cache[n, True, is_train] = result return result @@ -154,25 +150,18 @@ def _post_splits(self, X, y=None, n=None, is_x=True, is_train=False): def cv_split(cv, X, y, groups, is_pairwise, cache, sampler): - print('cv, groups, is_pairwise, cache, sampler', - cv, groups, is_pairwise, cache, sampler) kw = dict(pairwise=is_pairwise, cache=cache) if sampler: - print('if sampler = True') cls = CVCacheSampler kw['cache'] = True else: - print('if sampler = Falsy') cls = CVCache check_consistent_length(X, y, groups) splits = list(cv.split(X, y, groups)) if sampler: - print('if sampler = True') args = (sampler, splits,) else: - print('if sampler = Falsy') args = (splits,) - print('ak', args, kw) return cls(*args, **kw) @@ -273,6 +262,7 @@ def fit(est, X, y, error_score='raise', fields=None, params=None, def fit_transform(est, X, y, error_score='raise', fields=None, params=None, fit_params=None): + new_y = None if X is FIT_FAILURE: est, fit_time, Xt = FIT_FAILURE, 0.0, FIT_FAILURE else: @@ -286,7 +276,7 @@ def fit_transform(est, X, y, error_score='raise', fields=None, params=None, else: est.fit(X, y, **fit_params) Xt = est.transform(X) - Xt, y = _split_Xy(Xt, y) + Xt, y = _split_Xy(Xt, y, typ=(tuple, list)) except Exception as e: if error_score == 'raise': raise diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 773ccc2..2b2da1b 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -39,7 +39,7 @@ decompress_params, score, feature_union, feature_union_concat, MISSING) from .utils import (to_indexable, to_keys, unzip, is_dask_collection, - is_pipeline,_get_est_type) + is_pipeline,_get_est_type, _split_Xy) try: from cytoolz import get, pluck @@ -341,7 +341,7 @@ def new_group(): def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, step_fields_lk, fit_params_lk, field_to_index, step_name, none_passthrough, - is_transform): + is_transform, is_featureunion): sub_fields, sub_inds = map(list, unzip(step_fields_lk[step_name], 2)) sub_fit_params = fit_params_lk[step_name] if step_name in field_to_index: @@ -364,8 +364,10 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, if is_transform: if none_passthrough: new_Xs.update(zip(ids, get(ids, Xs))) + new_ys.update(zip(ids, get(ids, ys))) else: new_Xs.update(nones) + new_ys.update(nones) else: # Extract the proper subset of Xs, ys sub_Xs = get(ids, Xs) @@ -378,26 +380,30 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, sub_tokens = sub_params = None if is_transform: - sub_fits, sub_Xs, sub_ys = do_fit_transform(dsk, next_token, - sub_est, cv, sub_fields, - sub_tokens, sub_params, - sub_Xs, sub_ys, - sub_fit_params, - n_splits, error_score) - new_ys.update(zip(ids, sub_ys)) + out = do_fit_transform(dsk, next_token, + sub_est, cv, sub_fields, + sub_tokens, sub_params, + sub_Xs, sub_ys, + sub_fit_params, + n_splits, error_score) + if len(out) == 3: + sub_fits, sub_Xs, sub_ys = out + new_ys.update(zip(ids, sub_ys)) + else: + sub_fits, sub_Xs = out new_Xs.update(zip(ids, sub_Xs)) - new_fits.update(zip(ids, sub_fits)) else: sub_fits = do_fit(dsk, next_token, sub_est, cv, sub_fields, sub_tokens, sub_params, sub_Xs, sub_ys, sub_fit_params, n_splits, error_score) - new_fits.update(zip(ids, sub_fits)) + new_fits.update(zip(ids, sub_fits)) # Extract lists of transformed Xs and fit steps all_ids = list(range(len(Xs))) if is_transform: Xs = get(all_ids, new_Xs) - ys = get(all_ids, new_ys) + if not is_featureunion: + ys = get(all_ids, new_ys) fits = get(all_ids, new_fits) elif step is None: # Nothing to do @@ -413,10 +419,14 @@ def _do_fit_step(dsk, next_token, step, cv, fields, tokens, params, Xs, ys, sub_tokens = sub_params = None if is_transform: - fits, Xs, ys = do_fit_transform(dsk, next_token, step, cv, - sub_fields, sub_tokens, sub_params, - Xs, ys, sub_fit_params, n_splits, - error_score) + out = do_fit_transform(dsk, next_token, step, cv, + sub_fields, sub_tokens, sub_params, + Xs, ys, sub_fit_params, n_splits, + error_score) + if len(out) == 3: + fits, Xs, ys = out + else: + fits, Xs = out else: fits = do_fit(dsk, next_token, step, cv, sub_fields, sub_tokens, sub_params, Xs, ys, sub_fit_params, @@ -435,10 +445,13 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, instrs.append((est.steps[-1], is_transform)) fit_steps = [] for (step_name, step), transform in instrs: - fits, Xs, ys = _do_fit_step(dsk, next_token, step, cv, fields, tokens, - params, Xs, ys, fit_params, n_splits, - error_score, step_fields_lk, fit_params_lk, - field_to_index, step_name, True, transform) + fits, temp_Xs, temp_ys = _do_fit_step(dsk, next_token, step, cv, fields, tokens, + params, Xs, ys, fit_params, n_splits, + error_score, step_fields_lk, fit_params_lk, + field_to_index, step_name, True, transform, False) + if transform: + Xs = temp_Xs + ys = temp_ys fit_steps.append(fits) # Rebuild the pipelines step_names = [n for n, _ in est.steps] @@ -460,7 +473,7 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, m += 1 if is_transform: - return out_ests, Xs + return out_ests, Xs, ys return out_ests @@ -501,9 +514,9 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, tr_Xs = [] for (step_name, step) in est.transformer_list: fits, out_Xs, _ = _do_fit_step(dsk, next_token, step, cv, fields, tokens, - params, Xs, ys, fit_params, n_splits, - error_score, step_fields_lk, fit_params_lk, - field_to_index, step_name, False, True) + params, Xs, ys, fit_params, n_splits, + error_score, step_fields_lk, fit_params_lk, + field_to_index, step_name, False, True, True) fit_steps.append(fits) tr_Xs.append(out_Xs) @@ -535,7 +548,8 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, tr_name = 'feature-union-concat-' + token m = 0 seen = {} - for steps, Xs, wt, (w, wl), nsamp in zip(zip(*fit_steps), zip(*tr_Xs), + for steps, Xs, wt, (w, wl), nsamp in zip(zip(*fit_steps), + zip(*tr_Xs), weight_tokens, weights, n_samples): if (steps, wt) in seen: out_append(seen[steps, wt]) @@ -550,7 +564,7 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, seen[steps, wt] = m out_append(m) m += 1 - return [(fit_name, i) for i in out], [(tr_name, i) for i in out] + return ([(fit_name, i) for i in out], [(tr_name, i) for i in out],) # ------------ # diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index d410357..c843d4c 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -12,13 +12,13 @@ def is_pipeline(estimator): if isinstance(estimator, sk_Pipeline): - ret = True + return True try: from elm.pipeline import Pipeline as elm_Pipeline - ret = isinstance(estimator, elm_Pipeline) + return isinstance(estimator, elm_Pipeline) except: - ret = False - return ret + return False + def _get_est_type(est): if hasattr(est, '_cls_name'): @@ -99,7 +99,14 @@ def unzip(itbl, n): return zip(*itbl) if itbl else [()] * n -def _split_Xy(X, y): - if isinstance(X, (tuple, list)) and len(X) == 2: +def _is_xy_tuple(result, typ=tuple): + if typ and not isinstance(typ, tuple): + typ = (typ,) + typ = typ + (tuple,) + return isinstance(result, typ) and len(result) == 2 + + +def _split_Xy(X, y, typ=tuple): + if _is_xy_tuple(X, typ=typ): X, y = X return X, y From 632ba83e291c565cd6ac8a2b323c4b85de1cbec1 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Tue, 7 Nov 2017 08:34:15 -0800 Subject: [PATCH 08/16] fix pep8 issues found in CI checks --- dask_searchcv/methods.py | 7 +++---- dask_searchcv/model_selection.py | 34 ++++++++++++++++++-------------- dask_searchcv/utils.py | 3 ++- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index dc9c643..cb77ad4 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -1,7 +1,7 @@ from __future__ import absolute_import, division, print_function import warnings -from collections import defaultdict, Sequence +from collections import defaultdict from threading import Lock from timeit import default_timer from distutils.version import LooseVersion @@ -20,7 +20,6 @@ from .utils import copy_estimator, _split_Xy, _is_xy_tuple - # Copied from scikit-learn/sklearn/utils/fixes.py, can be removed once we drop # support for scikit-learn < 0.18.1 or numpy < 1.12.0. if LooseVersion(np.__version__) < '1.12.0': @@ -128,7 +127,8 @@ def _extract_pairwise(self, X, y, n, is_train=True): result = post_splits(result) if _is_xy_tuple(result): if self.cache is not None: - self.cache[n, True, is_train], self.cache[n, False, is_train] = result + (self.cache[n, True, is_train], + self.cache[n, False, is_train]) = result elif self.cache is not None: self.cache[n, True, is_train] = result elif self.cache is not None: @@ -262,7 +262,6 @@ def fit(est, X, y, error_score='raise', fields=None, params=None, def fit_transform(est, X, y, error_score='raise', fields=None, params=None, fit_params=None): - new_y = None if X is FIT_FAILURE: est, fit_time, Xt = FIT_FAILURE, 0.0, FIT_FAILURE else: diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 2b2da1b..4029888 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -27,7 +27,7 @@ LeavePGroupsOut, PredefinedSplit, _CVIterableWrapper) -from sklearn.pipeline import Pipeline, FeatureUnion +from sklearn.pipeline import FeatureUnion from sklearn.utils.metaestimators import if_delegate_has_method from sklearn.utils.multiclass import type_of_target from sklearn.utils.validation import _num_samples, check_is_fitted @@ -39,7 +39,7 @@ decompress_params, score, feature_union, feature_union_concat, MISSING) from .utils import (to_indexable, to_keys, unzip, is_dask_collection, - is_pipeline,_get_est_type, _split_Xy) + is_pipeline,_get_est_type) try: from cytoolz import get, pluck @@ -445,10 +445,14 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, instrs.append((est.steps[-1], is_transform)) fit_steps = [] for (step_name, step), transform in instrs: - fits, temp_Xs, temp_ys = _do_fit_step(dsk, next_token, step, cv, fields, tokens, - params, Xs, ys, fit_params, n_splits, - error_score, step_fields_lk, fit_params_lk, - field_to_index, step_name, True, transform, False) + fits, temp_Xs, temp_ys = _do_fit_step(dsk, next_token, step, + cv, fields, tokens, + params, Xs, ys, fit_params, + n_splits, + error_score, step_fields_lk, + fit_params_lk, + field_to_index, step_name, + True, transform, False) if transform: Xs = temp_Xs ys = temp_ys @@ -793,15 +797,15 @@ def fit(self, X, y=None, groups=None, **fit_params): raise ValueError('_get_param_iterator() failed to yield any parameter sets') sampler = getattr(self, 'sampler', None) dsk, keys, n_splits = build_graph(estimator, self.cv, self.scorer_, - candidate_params, - X=X, y=y, groups=groups, - sampler=sampler, - fit_params=fit_params, - iid=self.iid, - refit=self.refit, - error_score=error_score, - return_train_score=self.return_train_score, - cache_cv=self.cache_cv) + candidate_params, + X=X, y=y, groups=groups, + sampler=sampler, + fit_params=fit_params, + iid=self.iid, + refit=self.refit, + error_score=error_score, + return_train_score=self.return_train_score, + cache_cv=self.cache_cv) self.dask_graph_ = dsk self.n_splits_ = n_splits n_jobs = _normalize_n_jobs(self.n_jobs) diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index c843d4c..9849e72 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -10,13 +10,14 @@ from sklearn.utils.validation import indexable, _is_arraylike from sklearn.pipeline import Pipeline as sk_Pipeline + def is_pipeline(estimator): if isinstance(estimator, sk_Pipeline): return True try: from elm.pipeline import Pipeline as elm_Pipeline return isinstance(estimator, elm_Pipeline) - except: + except Exception as e: return False From 9ad1d74c641da763cd5a9ae068a7b3556746a2ab Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Tue, 7 Nov 2017 12:43:06 -0800 Subject: [PATCH 09/16] fix pep8 issues --- dask_searchcv/methods.py | 6 ++++-- dask_searchcv/model_selection.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index cb77ad4..33f08e6 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -144,8 +144,10 @@ def __init__(self, sampler, splits, pairwise=False, cache=True): def _post_splits(self, X, y=None, n=None, is_x=True, is_train=False): if y is not None: - raise ValueError('Expected y to be None (returned by Sampler() instance or similar.') - func = getattr(self.sampler, 'fit_transform', getattr(self.sampler, 'transform', self.sampler)) + raise ValueError('y should be None (found {})'.format(type(y))) + func = getattr(self.sampler, 'fit_transform', None) + if func is None: + func = getattr(self.sampler, 'transform', self.sampler) return func(X, y=y, is_x=is_x, is_train=is_train) diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 4029888..8e2ac72 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -39,7 +39,7 @@ decompress_params, score, feature_union, feature_union_concat, MISSING) from .utils import (to_indexable, to_keys, unzip, is_dask_collection, - is_pipeline,_get_est_type) + is_pipeline, _get_est_type) try: from cytoolz import get, pluck From ec1e287e2d5017593a0e8db91bf7e03a8dd04c60 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Tue, 7 Nov 2017 17:19:36 -0800 Subject: [PATCH 10/16] refactor to simplify changes in dask-searchcv --- dask_searchcv/methods.py | 46 +++++++++++--------------------- dask_searchcv/model_selection.py | 6 ++--- 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 33f08e6..6a2b5d3 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -15,7 +15,7 @@ from sklearn.exceptions import FitFailedWarning from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.utils import safe_indexing -from sklearn.utils.validation import _is_arraylike, check_consistent_length +from sklearn.utils.validation import check_consistent_length, _is_arraylike from .utils import copy_estimator, _split_Xy, _is_xy_tuple @@ -103,12 +103,13 @@ def _extract(self, X, y, n, is_x=True, is_train=True): inds = self.splits[n][0] if is_train else self.splits[n][1] post_splits = getattr(self, '_post_splits', None) if post_splits: + if self.cache in (None, False): + raise ValueError('Must set cache_cv=True with _post_splits') result = post_splits(np.array(X)[inds]) self.cache[n, True, is_train] = result else: result = safe_indexing(X if is_x else y, inds) - if self.cache is not None: - self.cache[n, is_x, is_train] = result + self.cache[n, is_x, is_train] = result return result def _extract_pairwise(self, X, y, n, is_train=True): @@ -136,35 +137,20 @@ def _extract_pairwise(self, X, y, n, is_train=True): return result -class CVCacheSampler(CVCache): - def __init__(self, sampler, splits, pairwise=False, cache=True): - self.sampler = sampler - super(CVCacheSampler, self).__init__(splits, pairwise=pairwise, - cache=cache) - - def _post_splits(self, X, y=None, n=None, is_x=True, is_train=False): - if y is not None: - raise ValueError('y should be None (found {})'.format(type(y))) - func = getattr(self.sampler, 'fit_transform', None) - if func is None: - func = getattr(self.sampler, 'transform', self.sampler) - return func(X, y=y, is_x=is_x, is_train=is_train) - - -def cv_split(cv, X, y, groups, is_pairwise, cache, sampler): +def cv_split(cv, X, y, groups, is_pairwise, cache): kw = dict(pairwise=is_pairwise, cache=cache) - if sampler: - cls = CVCacheSampler - kw['cache'] = True - else: - cls = CVCache - check_consistent_length(X, y, groups) splits = list(cv.split(X, y, groups)) - if sampler: - args = (sampler, splits,) - else: - args = (splits,) - return cls(*args, **kw) + if not cache or isinstance(cache, bool): + check_consistent_length(X, y, groups) + return CVCache(list(cv.split(X, y, groups)), is_pairwise, cache) + replace = dict(pairwise=is_pairwise, + cache={}, + splits=splits) + params = tuple(cache.get_params()) + for key, val in replace.items(): + print(key, val) + cache.set_params(**{key:val}) + return cache def cv_n_samples(cvs): diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 8e2ac72..29b839b 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -63,7 +63,7 @@ def __call__(self, est): def build_graph(estimator, cv, scorer, candidate_params, X, y=None, - groups=None, sampler=None, fit_params=None, iid=True, refit=True, + groups=None, fit_params=None, iid=True, refit=True, error_score='raise', return_train_score=True, cache_cv=True): dsk = {} @@ -88,7 +88,7 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, cv_name = 'cv-split-' + main_token dsk[cv_name] = (cv_split, cv, X_name, y_name, groups_name, - is_pairwise, cache_cv, sampler) + is_pairwise, cache_cv) if iid: weights = 'cv-n-samples-' + main_token @@ -795,11 +795,9 @@ def fit(self, X, y=None, groups=None, **fit_params): candidate_params = list(self._get_param_iterator()) if not candidate_params: raise ValueError('_get_param_iterator() failed to yield any parameter sets') - sampler = getattr(self, 'sampler', None) dsk, keys, n_splits = build_graph(estimator, self.cv, self.scorer_, candidate_params, X=X, y=y, groups=groups, - sampler=sampler, fit_params=fit_params, iid=self.iid, refit=self.refit, From 6906e83704c9389064ec5cf59640e54de86eabab Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Tue, 7 Nov 2017 17:26:00 -0800 Subject: [PATCH 11/16] refactor to simplify changes in dask-searchcv --- dask_searchcv/model_selection.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 29b839b..9283815 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -64,15 +64,15 @@ def __call__(self, est): def build_graph(estimator, cv, scorer, candidate_params, X, y=None, groups=None, fit_params=None, iid=True, refit=True, - error_score='raise', return_train_score=True, - cache_cv=True): - dsk = {} + error_score='raise', return_train_score=True, cache_cv=True): # "pairwise" estimators require a different graph for CV splitting X, y, groups = to_indexable(X, y, groups) cv = check_cv(cv, y, is_classifier(estimator)) is_pairwise = getattr(estimator, '_pairwise', False) + dsk = {} X_name, y_name, groups_name = to_keys(dsk, X, y, groups) n_splits = compute_n_splits(cv, X, y, groups) + if fit_params: # A mapping of {name: (name, graph-key)} param_values = to_indexable(*fit_params.values(), allow_scalars=True) @@ -80,13 +80,13 @@ def build_graph(estimator, cv, scorer, candidate_params, X, y=None, zip(fit_params, to_keys(dsk, *param_values))} else: fit_params = {} + fields, tokens, params = normalize_params(candidate_params) main_token = tokenize(normalize_estimator(estimator), fields, params, X_name, y_name, groups_name, fit_params, cv, error_score == 'raise', return_train_score) cv_name = 'cv-split-' + main_token - dsk[cv_name] = (cv_split, cv, X_name, y_name, groups_name, is_pairwise, cache_cv) @@ -438,8 +438,10 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, is_transform): if 'steps' in fields: raise NotImplementedError("Setting Pipeline.steps in a gridsearch") + field_to_index, step_fields_lk = _group_subparams(est.steps, fields) fit_params_lk = _group_fit_params(est.steps, fit_params) + # A list of (step, is_transform) instrs = [(s, True) for s in est.steps[:-1]] instrs.append((est.steps[-1], is_transform)) @@ -457,6 +459,7 @@ def _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, Xs = temp_Xs ys = temp_ys fit_steps.append(fits) + # Rebuild the pipelines step_names = [n for n, _ in est.steps] out_ests = [] @@ -552,8 +555,7 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, tr_name = 'feature-union-concat-' + token m = 0 seen = {} - for steps, Xs, wt, (w, wl), nsamp in zip(zip(*fit_steps), - zip(*tr_Xs), + for steps, Xs, wt, (w, wl), nsamp in zip(zip(*fit_steps), zip(*tr_Xs), weight_tokens, weights, n_samples): if (steps, wt) in seen: out_append(seen[steps, wt]) @@ -568,7 +570,7 @@ def _do_featureunion(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, seen[steps, wt] = m out_append(m) m += 1 - return ([(fit_name, i) for i in out], [(tr_name, i) for i in out],) + return [(fit_name, i) for i in out], [(tr_name, i) for i in out] # ------------ # @@ -792,11 +794,8 @@ def fit(self, X, y=None, groups=None, **fit_params): error_score == 'raise'): raise ValueError("error_score must be the string 'raise' or a" " numeric value.") - candidate_params = list(self._get_param_iterator()) - if not candidate_params: - raise ValueError('_get_param_iterator() failed to yield any parameter sets') dsk, keys, n_splits = build_graph(estimator, self.cv, self.scorer_, - candidate_params, + list(self._get_param_iterator()), X=X, y=y, groups=groups, fit_params=fit_params, iid=self.iid, From 5940ff168e1d4a49c065b6d7e4aab9673aff4774 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Tue, 7 Nov 2017 17:30:38 -0800 Subject: [PATCH 12/16] refactor to simplify changes in dask-searchcv --- dask_searchcv/methods.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 6a2b5d3..9678f61 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -19,7 +19,6 @@ from .utils import copy_estimator, _split_Xy, _is_xy_tuple - # Copied from scikit-learn/sklearn/utils/fixes.py, can be removed once we drop # support for scikit-learn < 0.18.1 or numpy < 1.12.0. if LooseVersion(np.__version__) < '1.12.0': @@ -64,6 +63,7 @@ def warn_fit_failure(error_score, e): # Functions in the graphs # # ----------------------- # + class CVCache(object): def __init__(self, splits, pairwise=False, cache=True): self.splits = splits @@ -101,14 +101,9 @@ def _extract(self, X, y, n, is_x=True, is_train=True): return self.cache[n, is_x, is_train] inds = self.splits[n][0] if is_train else self.splits[n][1] - post_splits = getattr(self, '_post_splits', None) - if post_splits: - if self.cache in (None, False): - raise ValueError('Must set cache_cv=True with _post_splits') - result = post_splits(np.array(X)[inds]) - self.cache[n, True, is_train] = result - else: - result = safe_indexing(X if is_x else y, inds) + result = safe_indexing(X if is_x else y, inds) + + if self.cache is not None: self.cache[n, is_x, is_train] = result return result @@ -122,20 +117,11 @@ def _extract_pairwise(self, X, y, n, is_train=True): if X.shape[0] != X.shape[1]: raise ValueError("X should be a square kernel matrix") train, test = self.splits[n] - post_splits = getattr(self, '_post_splits', None) result = X[np.ix_(train if is_train else test, train)] - if post_splits: - result = post_splits(result) - if _is_xy_tuple(result): - if self.cache is not None: - (self.cache[n, True, is_train], - self.cache[n, False, is_train]) = result - elif self.cache is not None: - self.cache[n, True, is_train] = result - elif self.cache is not None: - self.cache[n, True, is_train] = result - return result + if self.cache is not None: + self.cache[n, True, is_train] = result + return result def cv_split(cv, X, y, groups, is_pairwise, cache): kw = dict(pairwise=is_pairwise, cache=cache) From 2e1edc969e7dfe645ab874eb895ef7a88bd1887d Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Wed, 8 Nov 2017 03:34:32 -0800 Subject: [PATCH 13/16] reduce diff in dask-searchcv -> move cv stuff to elm --- dask_searchcv/methods.py | 1 - dask_searchcv/model_selection.py | 10 +++++----- dask_searchcv/utils.py | 12 +----------- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 9678f61..1e7f1d4 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -134,7 +134,6 @@ def cv_split(cv, X, y, groups, is_pairwise, cache): splits=splits) params = tuple(cache.get_params()) for key, val in replace.items(): - print(key, val) cache.set_params(**{key:val}) return cache diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 9283815..b0113dd 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -27,7 +27,7 @@ LeavePGroupsOut, PredefinedSplit, _CVIterableWrapper) -from sklearn.pipeline import FeatureUnion +from sklearn.pipeline import FeatureUnion, Pipeline from sklearn.utils.metaestimators import if_delegate_has_method from sklearn.utils.multiclass import type_of_target from sklearn.utils.validation import _num_samples, check_is_fitted @@ -39,7 +39,7 @@ decompress_params, score, feature_union, feature_union_concat, MISSING) from .utils import (to_indexable, to_keys, unzip, is_dask_collection, - is_pipeline, _get_est_type) + _get_est_type) try: from cytoolz import get, pluck @@ -160,7 +160,7 @@ def _group_fit_params(steps, fit_params): def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, X, y, fit_params, n_splits, error_score, scorer, return_train_score): - if not is_pipeline(est): + if not isinstance(est, Pipeline): # Fitting and scoring can all be done as a single task n_and_fit_params = _get_fit_params(cv, fit_params, n_splits) @@ -221,7 +221,7 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): - if is_pipeline(est) and params is not None: + if isinstance(est, Pipeline) and params is not None: return _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, False) else: @@ -259,7 +259,7 @@ def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score): - if is_pipeline(est) and params is not None: + if isinstance(est, Pipeline) and params is not None: return _do_pipeline(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fit_params, n_splits, error_score, True) elif isinstance(est, FeatureUnion) and params is not None: diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index 9849e72..c44701a 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -8,17 +8,7 @@ from dask.delayed import delayed, Delayed from sklearn.utils.validation import indexable, _is_arraylike -from sklearn.pipeline import Pipeline as sk_Pipeline - - -def is_pipeline(estimator): - if isinstance(estimator, sk_Pipeline): - return True - try: - from elm.pipeline import Pipeline as elm_Pipeline - return isinstance(estimator, elm_Pipeline) - except Exception as e: - return False +from sklearn.pipeline import Pipeline def _get_est_type(est): From e73381dead10851d81be3198676feb8f99e7b4ef Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Wed, 8 Nov 2017 03:49:57 -0800 Subject: [PATCH 14/16] pep8 fixes --- dask_searchcv/methods.py | 10 ++++------ dask_searchcv/model_selection.py | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/dask_searchcv/methods.py b/dask_searchcv/methods.py index 1e7f1d4..f45c736 100644 --- a/dask_searchcv/methods.py +++ b/dask_searchcv/methods.py @@ -17,7 +17,7 @@ from sklearn.utils import safe_indexing from sklearn.utils.validation import check_consistent_length, _is_arraylike -from .utils import copy_estimator, _split_Xy, _is_xy_tuple +from .utils import copy_estimator, _split_Xy # Copied from scikit-learn/sklearn/utils/fixes.py, can be removed once we drop # support for scikit-learn < 0.18.1 or numpy < 1.12.0. @@ -123,18 +123,16 @@ def _extract_pairwise(self, X, y, n, is_train=True): self.cache[n, True, is_train] = result return result + def cv_split(cv, X, y, groups, is_pairwise, cache): - kw = dict(pairwise=is_pairwise, cache=cache) splits = list(cv.split(X, y, groups)) if not cache or isinstance(cache, bool): check_consistent_length(X, y, groups) return CVCache(list(cv.split(X, y, groups)), is_pairwise, cache) - replace = dict(pairwise=is_pairwise, + params = dict(pairwise=is_pairwise, cache={}, splits=splits) - params = tuple(cache.get_params()) - for key, val in replace.items(): - cache.set_params(**{key:val}) + cache.set_params(**params) return cache diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index b0113dd..ca155c9 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -795,7 +795,7 @@ def fit(self, X, y=None, groups=None, **fit_params): raise ValueError("error_score must be the string 'raise' or a" " numeric value.") dsk, keys, n_splits = build_graph(estimator, self.cv, self.scorer_, - list(self._get_param_iterator()), + list(self._get_param_iterator()), X=X, y=y, groups=groups, fit_params=fit_params, iid=self.iid, From c58293f612e22c78806cac60d3b210c2dfa86e9d Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Wed, 8 Nov 2017 08:32:05 -0800 Subject: [PATCH 15/16] remove _get_est_type --- dask_searchcv/model_selection.py | 13 ++++++------- dask_searchcv/utils.py | 8 -------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index ca155c9..445366f 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -27,7 +27,7 @@ LeavePGroupsOut, PredefinedSplit, _CVIterableWrapper) -from sklearn.pipeline import FeatureUnion, Pipeline +from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.utils.metaestimators import if_delegate_has_method from sklearn.utils.multiclass import type_of_target from sklearn.utils.validation import _num_samples, check_is_fitted @@ -38,8 +38,7 @@ cv_n_samples, cv_extract, cv_extract_params, decompress_params, score, feature_union, feature_union_concat, MISSING) -from .utils import (to_indexable, to_keys, unzip, is_dask_collection, - _get_est_type) +from .utils import to_indexable, to_keys, unzip, is_dask_collection try: from cytoolz import get, pluck @@ -164,7 +163,7 @@ def do_fit_and_score(dsk, main_token, est, cv, fields, tokens, params, # Fitting and scoring can all be done as a single task n_and_fit_params = _get_fit_params(cv, fit_params, n_splits) - est_type = _get_est_type(est) + est_type = type(est).__name__.lower() est_name = '%s-%s' % (est_type, main_token) score_name = '%s-fit-score-%s' % (est_type, main_token) dsk[est_name] = est @@ -232,7 +231,7 @@ def do_fit(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, fields = None token = next_token(est) - est_type = _get_est_type(est) + est_type = type(est).__name__.lower() est_name = '%s-%s' % (est_type, token) fit_name = '%s-fit-%s' % (est_type, token) dsk[est_name] = est @@ -273,13 +272,13 @@ def do_fit_transform(dsk, next_token, est, cv, fields, tokens, params, Xs, ys, params = tokens = repeat(None) fields = None - name = _get_est_type(est) + name = type(est).__name__.lower() token = next_token(est) fit_Xt_name = '%s-fit-transform-%s' % (name, token) fit_name = '%s-fit-%s' % (name, token) Xt_name = '%s-transform-X-%s' % (name, token) yt_name = '%s-transform-y-%s' % (name, token) - est_name = '%s-%s' % (_get_est_type(est), token) + est_name = '%s-%s' % (type(est).__name__.lower(), token) dsk[est_name] = est seen = {} diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index c44701a..5afa3bf 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -11,14 +11,6 @@ from sklearn.pipeline import Pipeline -def _get_est_type(est): - if hasattr(est, '_cls_name'): - est_type = est._cls_name.lower() - else: - est_type = type(est).__name__.lower() - return est_type - - if LooseVersion(dask.__version__) > '0.15.4': from dask.base import is_dask_collection else: From 6367c219284bd7b6a6a7a976d29bf7feda87a6c4 Mon Sep 17 00:00:00 2001 From: Peter Steinberg Date: Wed, 8 Nov 2017 10:30:02 -0800 Subject: [PATCH 16/16] pep8 fixes --- dask_searchcv/model_selection.py | 3 ++- dask_searchcv/utils.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_searchcv/model_selection.py b/dask_searchcv/model_selection.py index 445366f..0036c63 100644 --- a/dask_searchcv/model_selection.py +++ b/dask_searchcv/model_selection.py @@ -64,9 +64,10 @@ def __call__(self, est): def build_graph(estimator, cv, scorer, candidate_params, X, y=None, groups=None, fit_params=None, iid=True, refit=True, error_score='raise', return_train_score=True, cache_cv=True): - # "pairwise" estimators require a different graph for CV splitting + X, y, groups = to_indexable(X, y, groups) cv = check_cv(cv, y, is_classifier(estimator)) + # "pairwise" estimators require a different graph for CV splitting is_pairwise = getattr(estimator, '_pairwise', False) dsk = {} X_name, y_name, groups_name = to_keys(dsk, X, y, groups) diff --git a/dask_searchcv/utils.py b/dask_searchcv/utils.py index 5afa3bf..fcaec7c 100644 --- a/dask_searchcv/utils.py +++ b/dask_searchcv/utils.py @@ -8,7 +8,6 @@ from dask.delayed import delayed, Delayed from sklearn.utils.validation import indexable, _is_arraylike -from sklearn.pipeline import Pipeline if LooseVersion(dask.__version__) > '0.15.4':