diff --git a/gamma/_base.py b/gamma/_base.py index 3060945..e8439c8 100644 --- a/gamma/_base.py +++ b/gamma/_base.py @@ -7,17 +7,18 @@ import warnings from abc import ABCMeta, abstractmethod from time import time + import numpy as np from scipy.special import logsumexp from sklearn import cluster -from sklearn.base import BaseEstimator -from sklearn.base import DensityMixin +from sklearn.base import BaseEstimator, DensityMixin from sklearn.exceptions import ConvergenceWarning from sklearn.utils import check_array, check_random_state from sklearn.utils.validation import check_is_fitted from .seismic_ops import initialize_centers + def _check_shape(param, param_shape, name): """Validate the shape of the input parameter 'param'. @@ -31,8 +32,9 @@ def _check_shape(param, param_shape, name): """ param = np.array(param) if param.shape != param_shape: - raise ValueError("The parameter '%s' should have the shape of %s, " - "but got %s" % (name, param_shape, param.shape)) + raise ValueError( + "The parameter '%s' should have the shape of %s, " "but got %s" % (name, param_shape, param.shape) + ) def _check_X(X, n_components=None, n_features=None, ensure_min_samples=1): @@ -48,16 +50,16 @@ def _check_X(X, n_components=None, n_features=None, ensure_min_samples=1): ------- X : array, shape (n_samples, n_features) """ - X = check_array(X, dtype=[np.float64, np.float32], - ensure_min_samples=ensure_min_samples) + X = check_array(X, dtype=[np.float64, np.float32], ensure_min_samples=ensure_min_samples) if n_components is not None and X.shape[0] < n_components: - raise ValueError('Expected n_samples >= n_components ' - 'but got n_components = %d, n_samples = %d' - % (n_components, X.shape[0])) + raise ValueError( + "Expected n_samples >= n_components " + "but got n_components = %d, n_samples = %d" % (n_components, X.shape[0]) + ) if n_features is not None and X.shape[-1] != n_features: - raise ValueError("Expected the input data X have %d features, " - "but got %d features" - % (n_features, X.shape[-1])) + raise ValueError( + "Expected the input data X have %d features, " "but got %d features" % (n_features, X.shape[-1]) + ) return X @@ -68,10 +70,22 @@ class BaseMixture(DensityMixin, BaseEstimator, metaclass=ABCMeta): provides basic common methods for mixture models. """ - def __init__(self, n_components, tol, reg_covar, - max_iter, n_init, init_params, random_state, warm_start, - verbose, verbose_interval, - dummy_comp=False, dummy_prob=0.01, dummy_quantile=0.1): + def __init__( + self, + n_components, + tol, + reg_covar, + max_iter, + n_init, + init_params, + random_state, + warm_start, + verbose, + verbose_interval, + dummy_comp=False, + dummy_prob=0.01, + dummy_quantile=0.1, + ): self.n_components = n_components self.tol = tol self.reg_covar = reg_covar @@ -94,30 +108,29 @@ def _check_initial_parameters(self, X): X : array-like of shape (n_samples, n_features) """ if self.n_components < 1: - raise ValueError("Invalid value for 'n_components': %d " - "Estimation requires at least one component" - % self.n_components) + raise ValueError( + "Invalid value for 'n_components': %d " "Estimation requires at least one component" % self.n_components + ) - if self.tol < 0.: - raise ValueError("Invalid value for 'tol': %.5f " - "Tolerance used by the EM must be non-negative" - % self.tol) + if self.tol < 0.0: + raise ValueError( + "Invalid value for 'tol': %.5f " "Tolerance used by the EM must be non-negative" % self.tol + ) if self.n_init < 1: - raise ValueError("Invalid value for 'n_init': %d " - "Estimation requires at least one run" - % self.n_init) + raise ValueError("Invalid value for 'n_init': %d " "Estimation requires at least one run" % self.n_init) if self.max_iter < 1: - raise ValueError("Invalid value for 'max_iter': %d " - "Estimation requires at least one iteration" - % self.max_iter) + raise ValueError( + "Invalid value for 'max_iter': %d " "Estimation requires at least one iteration" % self.max_iter + ) - if self.reg_covar < 0.: - raise ValueError("Invalid value for 'reg_covar': %.5f " - "regularization on covariance must be " - "non-negative" - % self.reg_covar) + if self.reg_covar < 0.0: + raise ValueError( + "Invalid value for 'reg_covar': %.5f " + "regularization on covariance must be " + "non-negative" % self.reg_covar + ) # Check all the parameters values of the derived class self._check_parameters(X) @@ -145,25 +158,26 @@ def _initialize_parameters(self, X, random_state): """ n_samples, _ = X.shape - if self.init_params == 'kmeans': + if self.init_params == "kmeans": resp = np.zeros((n_samples, self.n_components)) if self.dummy_comp: - label = cluster.KMeans(n_clusters=self.n_components-1, n_init=1, - random_state=random_state).fit(X).labels_ + label = ( + cluster.KMeans(n_clusters=self.n_components - 1, n_init=1, random_state=random_state).fit(X).labels_ + ) else: - label = cluster.KMeans(n_clusters=self.n_components, n_init=1, - random_state=random_state).fit(X).labels_ + label = cluster.KMeans(n_clusters=self.n_components, n_init=1, random_state=random_state).fit(X).labels_ resp[np.arange(n_samples), label] = 1 - elif self.init_params == 'random': + elif self.init_params == "random": resp = random_state.rand(n_samples, self.n_components) resp /= resp.sum(axis=1)[:, np.newaxis] - elif self.init_params == 'centers': + elif self.init_params == "centers": # resp = self._initialize_centers(X, random_state) - resp, centers, means = initialize_centers(X, self.phase_type, self.centers_init, self.station_locs, random_state) + resp, centers, means = initialize_centers( + X, self.phase_type, self.centers_init, self.station_locs, random_state + ) self.centers_init = centers else: - raise ValueError("Unimplemented initialization method '%s'" - % self.init_params) + raise ValueError("Unimplemented initialization method '%s'" % self.init_params) self._initialize(X, resp) @@ -233,10 +247,10 @@ def fit_predict(self, X, y=None): self._check_initial_parameters(X) # if we enable warm_start, we will have a unique initialisation - do_init = not(self.warm_start and hasattr(self, 'converged_')) + do_init = not (self.warm_start and hasattr(self, "converged_")) n_init = self.n_init if do_init else 1 - max_lower_bound = -np.infty + max_lower_bound = -np.inf self.converged_ = False random_state = check_random_state(self.random_state) @@ -248,15 +262,14 @@ def fit_predict(self, X, y=None): if do_init: self._initialize_parameters(X, random_state) - lower_bound = (-np.infty if do_init else self.lower_bound_) + lower_bound = -np.inf if do_init else self.lower_bound_ for n_iter in range(1, self.max_iter + 1): prev_lower_bound = lower_bound log_prob_norm, log_resp = self._e_step(X) self._m_step(X, log_resp) - lower_bound = self._compute_lower_bound( - log_resp, log_prob_norm) + lower_bound = self._compute_lower_bound(log_resp, log_prob_norm) change = lower_bound - prev_lower_bound self._print_verbose_msg_iter_end(n_iter, change) @@ -264,7 +277,7 @@ def fit_predict(self, X, y=None): if abs(change) < self.tol: self.converged_ = True break - + self._print_verbose_msg_init_end(lower_bound) if lower_bound > max_lower_bound: @@ -428,30 +441,36 @@ def sample(self, n_samples=1): if n_samples < 1: raise ValueError( "Invalid value for 'n_samples': %d . The sampling requires at " - "least one sample." % (self.n_components)) + "least one sample." % (self.n_components) + ) _, _, n_features = self.means_.shape rng = check_random_state(self.random_state) n_samples_comp = rng.multinomial(n_samples, self.weights_) - if self.covariance_type == 'full': - X = np.vstack([ - rng.multivariate_normal(mean, covariance, int(sample)) - for (mean, covariance, sample) in zip( - self.means_, self.covariances_, n_samples_comp)]) + if self.covariance_type == "full": + X = np.vstack( + [ + rng.multivariate_normal(mean, covariance, int(sample)) + for (mean, covariance, sample) in zip(self.means_, self.covariances_, n_samples_comp) + ] + ) elif self.covariance_type == "tied": - X = np.vstack([ - rng.multivariate_normal(mean, self.covariances_, int(sample)) - for (mean, sample) in zip( - self.means_, n_samples_comp)]) + X = np.vstack( + [ + rng.multivariate_normal(mean, self.covariances_, int(sample)) + for (mean, sample) in zip(self.means_, n_samples_comp) + ] + ) else: - X = np.vstack([ - mean + rng.randn(sample, n_features) * np.sqrt(covariance) - for (mean, covariance, sample) in zip( - self.means_, self.covariances_, n_samples_comp)]) + X = np.vstack( + [ + mean + rng.randn(sample, n_features) * np.sqrt(covariance) + for (mean, covariance, sample) in zip(self.means_, self.covariances_, n_samples_comp) + ] + ) - y = np.concatenate([np.full(sample, j, dtype=int) - for j, sample in enumerate(n_samples_comp)]) + y = np.concatenate([np.full(sample, j, dtype=int) for j, sample in enumerate(n_samples_comp)]) return (X, y) @@ -515,7 +534,7 @@ def _estimate_log_prob_resp(self, X): """ weighted_log_prob = self._estimate_weighted_log_prob(X) log_prob_norm = logsumexp(weighted_log_prob, axis=1) - with np.errstate(under='ignore'): + with np.errstate(under="ignore"): # ignore underflow log_resp = weighted_log_prob - log_prob_norm[:, np.newaxis] return log_prob_norm, log_resp @@ -536,8 +555,10 @@ def _print_verbose_msg_iter_end(self, n_iter, diff_ll): print(" Iteration %d" % n_iter) elif self.verbose >= 2: cur_time = time() - print(" Iteration %d\t time lapse %.5fs\t ll change %.5f" % ( - n_iter, cur_time - self._iter_prev_time, diff_ll)) + print( + " Iteration %d\t time lapse %.5fs\t ll change %.5f" + % (n_iter, cur_time - self._iter_prev_time, diff_ll) + ) self._iter_prev_time = cur_time def _print_verbose_msg_init_end(self, ll): @@ -545,5 +566,7 @@ def _print_verbose_msg_init_end(self, ll): if self.verbose == 1: print("Initialization converged: %s" % self.converged_) elif self.verbose >= 2: - print("Initialization converged: %s\t time lapse %.5fs\t ll %.5f" % - (self.converged_, time() - self._init_prev_time, ll)) + print( + "Initialization converged: %s\t time lapse %.5fs\t ll %.5f" + % (self.converged_, time() - self._init_prev_time, ll) + )