From 09128917460f2daf5772d974c7d09c1e352f1327 Mon Sep 17 00:00:00 2001 From: vadim_potemkin Date: Wed, 20 Sep 2023 18:30:28 +0200 Subject: [PATCH] added: unit-test and integration, docstrings, init input_data method cleaned up TimeSeriesClassifier_Preset --- fedot_ind/api/utils/input_data.py | 33 +++++++ .../experiment/TimeSeriesClassifierPreset.py | 62 +------------ .../transformation/basis/abstract_basis.py | 5 -- .../transformation/basis/eigen_basis.py | 34 ++----- .../operation/transformation/data/eigen.py | 90 ++++++++++++------- .../operation/transformation/data/hankel.py | 2 - .../transformation/data/kernel_matrix.py | 12 +-- .../test_timeseriesclassifier_preset.py | 35 ++++++++ .../unit/architecture/experiment/__init__.py | 0 .../experiment/test_TimeSeriesClassifier.py | 40 +++++++++ .../test_TimeSeriesClassifierPreset.py | 37 ++++++++ .../experiment/test_TimeSeriesRegression.py | 33 +++++++ .../transformation/basis/__init__.py | 0 .../transformation/basis/test_eigen_basis.py | 56 ++++++++++++ .../operation/transformation/data/__init__.py | 0 .../transformation/data/test_HankelMatrix.py | 57 ++++++++++++ .../transformation/data/test_eigen.py | 36 ++++++++ .../transformation/data/test_kernel_matrix.py | 52 +++++++++++ 18 files changed, 450 insertions(+), 134 deletions(-) create mode 100644 fedot_ind/api/utils/input_data.py create mode 100644 tests/integration/experiment/test_timeseriesclassifier_preset.py create mode 100644 tests/unit/architecture/experiment/__init__.py create mode 100644 tests/unit/architecture/experiment/test_TimeSeriesClassifier.py create mode 100644 tests/unit/architecture/experiment/test_TimeSeriesClassifierPreset.py create mode 100644 tests/unit/architecture/experiment/test_TimeSeriesRegression.py create mode 100644 tests/unit/operation/transformation/basis/__init__.py create mode 100644 tests/unit/operation/transformation/basis/test_eigen_basis.py create mode 100644 tests/unit/operation/transformation/data/__init__.py create mode 100644 tests/unit/operation/transformation/data/test_HankelMatrix.py create mode 100644 tests/unit/operation/transformation/data/test_eigen.py create mode 100644 tests/unit/operation/transformation/data/test_kernel_matrix.py diff --git a/fedot_ind/api/utils/input_data.py b/fedot_ind/api/utils/input_data.py new file mode 100644 index 000000000..60cb3e632 --- /dev/null +++ b/fedot_ind/api/utils/input_data.py @@ -0,0 +1,33 @@ +import numpy as np +import pandas as pd +from fedot.core.data.data import InputData +from fedot.core.repository.dataset_types import DataTypesEnum +from fedot.core.repository.tasks import Task, TaskTypesEnum + + +def init_input_data(X: pd.DataFrame, y: np.ndarray) -> InputData: + """Method for initialization of InputData object from pandas DataFrame and numpy array with target values. + + Args: + X: pandas DataFrame with features + y: numpy array with target values + + Returns: + InputData object convenient for FEDOT framework + + """ + is_multivariate_data = True if isinstance(X.iloc[0, 0], pd.Series) else False + if is_multivariate_data: + input_data = InputData(idx=np.arange(len(X)), + features=np.array(X.values.tolist()), + target=y.reshape(-1, 1), + task=Task(TaskTypesEnum.classification), + data_type=DataTypesEnum.image) + else: + input_data = InputData(idx=np.arange(len(X)), + features=X.values, + target=np.ravel(y).reshape(-1, 1), + task=Task(TaskTypesEnum.classification), + data_type=DataTypesEnum.table) + + return input_data diff --git a/fedot_ind/core/architecture/experiment/TimeSeriesClassifierPreset.py b/fedot_ind/core/architecture/experiment/TimeSeriesClassifierPreset.py index 3723f18cc..90d278e83 100644 --- a/fedot_ind/core/architecture/experiment/TimeSeriesClassifierPreset.py +++ b/fedot_ind/core/architecture/experiment/TimeSeriesClassifierPreset.py @@ -11,16 +11,13 @@ from fedot.core.pipelines.pipeline import Pipeline from fedot.core.pipelines.pipeline_builder import PipelineBuilder from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder -from fedot.core.repository.dataset_types import DataTypesEnum from fedot.core.repository.quality_metrics_repository import ClassificationMetricsEnum -from fedot.core.repository.tasks import Task, TaskTypesEnum from golem.core.tuning.sequential import SequentialTuner -from golem.core.tuning.simultaneous import SimultaneousTuner +from fedot_ind.api.utils.input_data import init_input_data from fedot_ind.api.utils.path_lib import default_path_to_save_results from fedot_ind.api.utils.saver_collections import ResultSaver from fedot_ind.core.metrics.evaluation import PerformanceAnalyzer -from fedot_ind.core.operation.caching import DataCacher from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels np.random.seed(0) @@ -70,47 +67,8 @@ def __init__(self, params: Optional[OperationParameters] = None): self.logger.info(f'TimeSeriesClassifierPreset initialised with [{self.branch_nodes}] nodes and ' f'[{self.tuning_iters}] tuning iterations and [{self.tuning_timeout}] timeout') - def __check_multivariate_data(self, data: pd.DataFrame) -> bool: - """Method for checking if the data is multivariate. - - Args: - X: pandas DataFrame with features - - Returns: - True if data is multivariate, False otherwise - - """ - if isinstance(data.iloc[0, 0], pd.Series): - return True - else: - return False - def _init_input_data(self, X: pd.DataFrame, y: np.ndarray) -> InputData: - """Method for initialization of InputData object from pandas DataFrame and numpy array with target values. - - Args: - X: pandas DataFrame with features - y: numpy array with target values - - Returns: - InputData object convenient for FEDOT framework - - """ - is_multivariate_data = self.__check_multivariate_data(X) - if is_multivariate_data: - input_data = InputData(idx=np.arange(len(X)), - features=np.array(X.values.tolist()), - target=y.reshape(-1, 1), - task=Task(TaskTypesEnum.classification), - data_type=DataTypesEnum.image) - else: - input_data = InputData(idx=np.arange(len(X)), - features=X.values, - target=np.ravel(y).reshape(-1, 1), - task=Task(TaskTypesEnum.classification), - data_type=DataTypesEnum.table) - - return input_data + return init_input_data(X, y) def _build_pipeline(self): """ @@ -232,13 +190,6 @@ def predict(self, features: pd.DataFrame, target: np.array) -> dict: test_data = self._init_input_data(features, target) test_data_preprocessed = self.preprocessing_pipeline.root_node.predict(test_data) - # data_cacher = DataCacher() - # get unique hash of input data - # test_predict_hash = data_cacher.hash_info(data=features) - # compare it to existed hash - # if self.test_predict_hash != test_predict_hash: - # test_data = self._init_input_data(features, target) - # test_data_preprocessed = self.preprocessing_pipeline.root_node.predict(test_data) if test_data.features.shape[0] == 1: test_data_preprocessed.predict = np.squeeze(test_data_preprocessed.predict).reshape(1, -1) @@ -255,16 +206,7 @@ def predict(self, features: pd.DataFrame, target: np.array) -> dict: return self.prediction_label - # else: - # return self.prediction_label - def predict_proba(self, features, target) -> dict: - # data_cacher = DataCacher() - # # get unique hash of input data - # test_predict_hash = data_cacher.hash_info(data=features, - # obj_info_dict=self.__dict__) - # # compare it to existed hash - # if self.test_predict_hash != test_predict_hash: test_data = self._init_input_data(features, target) test_data_preprocessed = self.preprocessing_pipeline.root_node.predict(test_data) self.test_data_preprocessed.predict = np.squeeze(test_data_preprocessed.predict) diff --git a/fedot_ind/core/operation/transformation/basis/abstract_basis.py b/fedot_ind/core/operation/transformation/basis/abstract_basis.py index a9da34cfb..f49252177 100644 --- a/fedot_ind/core/operation/transformation/basis/abstract_basis.py +++ b/fedot_ind/core/operation/transformation/basis/abstract_basis.py @@ -31,11 +31,6 @@ def __init__(self, params: Optional[OperationParameters] = None): def _get_basis(self, data): basis = Either.insert(data).then(self._get_1d_basis if type(data) != list else self._get_multidim_basis).value - - # if type(data) == list: - # basis = self._get_multidim_basis(data) - # else: - # basis = self._get_1d_basis(data) return basis def fit(self, data): diff --git a/fedot_ind/core/operation/transformation/basis/eigen_basis.py b/fedot_ind/core/operation/transformation/basis/eigen_basis.py index 5629e5043..85a676bec 100644 --- a/fedot_ind/core/operation/transformation/basis/eigen_basis.py +++ b/fedot_ind/core/operation/transformation/basis/eigen_basis.py @@ -1,13 +1,11 @@ from typing import Optional, Tuple, TypeVar -from typing import Optional, Tuple, TypeVar import numpy as np import pandas as pd import tensorly as tl from fedot.core.data.data import InputData - from fedot.core.operations.operation_parameters import OperationParameters -from joblib import Parallel, delayed +from joblib import delayed, Parallel from pymonad.either import Either from pymonad.list import ListMonad from scipy import stats @@ -25,12 +23,12 @@ class EigenBasisImplementation(BasisDecompositionImplementation): - """DataDriven basis + """Eigen basis decomposition implementation Example: ts1 = np.random.rand(200) ts2 = np.random.rand(200) ts = [ts1, ts2] - bss = EigenBasisImplementation({'sv_selector': 'median', 'window_size': 30}) + bss = EigenBasisImplementation({'window_size': 30}) basis_multi = bss._transform(ts) basis_1d = bss._transform(ts1) """ @@ -41,11 +39,8 @@ def __init__(self, params: Optional[OperationParameters] = None): self.low_rank_approximation = params.get('low_rank_approximation', True) self.basis = None self.SV_threshold = None - self.sv_selector = 'median' self.svd_estimator = RSVDDecomposition() - self.logging_params.update({'WS': self.window_size, - 'SV_selector': self.sv_selector, - }) + self.logging_params.update({'WS': self.window_size}) def _combine_components(self, predict): count = 0 @@ -77,36 +72,21 @@ def _transform(self, input_data: InputData) -> np.array: features = np.array([series[~np.isnan(series)] for series in features]) if self.SV_threshold is None: - self.SV_threshold = self.get_threshold(data=features, - selector=self.sv_selector) + self.SV_threshold = self.get_threshold(data=features) self.logging_params.update({'SV_thr': self.SV_threshold}) parallel = Parallel(n_jobs=self.n_processes, verbose=0, pre_dispatch="2*n_jobs") v = parallel(delayed(self._transform_one_sample)(sample) for sample in features) predict = np.array(v) - # new_shape = predict[0].shape[0] - # - # reduce_dimension = True - # while reduce_dimension: - # predict = self._combine_components(predict) - # if predict[0].shape[0] == new_shape or predict[0].shape[0] == 1: - # reduce_dimension = False - # new_shape = predict[0].shape[0] - # predict = self._clean_predict(np.array(v)) return predict - def get_threshold(self, data, selector: str): - - selectors = {'median': stats.mode, - 'mode': stats.mode} - + def get_threshold(self, data) -> int: svd_numbers = [] with tqdm(total=len(data), desc='SVD estimation') as pbar: for signal in data: svd_numbers.append(self._transform_one_sample(signal, svd_flag=True)) pbar.update(1) - - return selectors[selector](svd_numbers).mode[0] + return stats.mode(svd_numbers).mode[0] def _transform_one_sample(self, series: np.array, svd_flag: bool = False): trajectory_transformer = HankelMatrix(time_series=series, window_size=self.window_size) diff --git a/fedot_ind/core/operation/transformation/data/eigen.py b/fedot_ind/core/operation/transformation/data/eigen.py index 0d8ad0225..b12cf549d 100644 --- a/fedot_ind/core/operation/transformation/data/eigen.py +++ b/fedot_ind/core/operation/transformation/data/eigen.py @@ -1,18 +1,37 @@ import numpy as np -import pandas as pd -import copy +from typing import List, Tuple -def weighted_inner_product(F_i, F_j, window_length, ts_length): - # Calculate the weights +def weighted_inner_product(F_i: np.ndarray, F_j: np.ndarray, window_length: int, ts_length: int) -> float: + """Calculate the weighted inner product of two vectors. + + Args: + F_i: First vector. + F_j: Second vector. + window_length: Length of the window. + ts_length: Total length of the time series. + + Returns: + Weighted inner product. + """ first = list(np.arange(window_length) + 1) - second = [window_length] * (ts_length - 2*window_length) + second = [window_length] * (ts_length - 2 * window_length) third = list(np.arange(window_length) + 1)[::-1] w = np.array(first + second + third) - return w.dot(F_i * F_j) + return float(w.dot(F_i * F_j)) + +def calculate_matrix_norms(TS_comps: np.ndarray, window_length: int, ts_length: int) -> np.ndarray: + """Calculate matrix norms for the time series components. -def calculate_matrix_norms(TS_comps, window_length, ts_length): + Args: + TS_comps: The time series components. + window_length: Length of the window. + ts_length: Total length of the time series. + + Returns: + Array of matrix norms. + """ r = [] for i in range(TS_comps.shape[1]): r.append(weighted_inner_product(TS_comps[:, i], TS_comps[:, i], window_length, ts_length)) @@ -21,45 +40,48 @@ def calculate_matrix_norms(TS_comps, window_length, ts_length): return F_wnorms -def calculate_corr_matrix(TS_comps, F_wnorms, window_length, ts_length): - Wcorr = np.identity(TS_comps.shape[1]) +def calculate_corr_matrix(ts_comps: np.ndarray, + f_wnorms: np.ndarray, + window_length: int, + ts_length: int) -> Tuple[np.ndarray, List[int]]: + """Calculate the w-correlation matrix for the time series components. + + Args: + ts_comps: The time series components. + f_wnorms: Matrix norms of the time series components. + window_length: Length of the window. + ts_length: Total length of the time series. + + Returns: + W-correlation matrix and a list of component indices. + """ + Wcorr = np.identity(ts_comps.shape[1]) for i in range(Wcorr.shape[0]): for j in range(i + 1, Wcorr.shape[0]): Wcorr[i, j] = abs( - weighted_inner_product(TS_comps[:, i], TS_comps[:, j], window_length, ts_length) * - F_wnorms[i] * F_wnorms[j]) + weighted_inner_product(ts_comps[:, i], ts_comps[:, j], window_length, ts_length) * + f_wnorms[i] * f_wnorms[j]) Wcorr[j, i] = Wcorr[i, j] return Wcorr, [i for i in range(Wcorr.shape[0])] -def combine_eigenvectors(TS_comps, window_length, correlation_level: float = 0.8): - """Calculates the w-correlation matrix for the time series. +def combine_eigenvectors(ts_comps: np.ndarray, window_length: int) -> List[np.ndarray]: + """Combine eigenvectors based on the w-correlation matrix for the time series. Args: - TS_comps (np.ndarray): The time series components. - correlation_level (float): threshold value of Pearson correlation, using for merging eigenvectors. - ts_length (int): The length of TS . - window_length (int): The length of TS window. - + ts_comps (np.ndarray): The time series components. + window_length (int): Length of the window. + Returns: + List[np.ndarray]: List of combined eigenvectors. """ - combined_components = [] - ts_length = TS_comps.shape[0] - # Calculated weighted norms - F_wnorms = calculate_matrix_norms(TS_comps, window_length, ts_length) - - # Calculate Wcorr. - Wcorr, components = calculate_corr_matrix(TS_comps, F_wnorms, window_length, ts_length) - + ts_length = ts_comps.shape[0] + F_wnorms = calculate_matrix_norms(ts_comps, window_length, ts_length) + Wcorr, components = calculate_corr_matrix(ts_comps, F_wnorms, window_length, ts_length) combined_components = [] current_group = [] for i in range(len(components)): - if i == 0 or Wcorr[i, i-1] > correlation_level: - current_group.append(TS_comps[:, i]) - else: - combined_components.append(np.array(current_group).sum(axis=0)) - current_group = [TS_comps[:, i]] - - - + combined_components.append(np.array(current_group).sum(axis=0)) + current_group = [ts_comps[:, i]] + combined_components.append(np.array(current_group).sum(axis=0)) return combined_components diff --git a/fedot_ind/core/operation/transformation/data/hankel.py b/fedot_ind/core/operation/transformation/data/hankel.py index d67a25bc1..d68c1ade4 100644 --- a/fedot_ind/core/operation/transformation/data/hankel.py +++ b/fedot_ind/core/operation/transformation/data/hankel.py @@ -35,7 +35,6 @@ def __check_windows_length(self): if not 2 <= self.__window_length <= self.__ts_length / 2: self.__window_length = int(self.__ts_length / 3) - def __convert_ts_to_array(self): if type(self.__time_series) == pd.DataFrame: self.__time_series = self.__time_series.values.reshape(-1, 1) @@ -44,7 +43,6 @@ def __convert_ts_to_array(self): else: self.__time_series = self.__time_series - def __get_trajectory_matrix(self): if len(self.__time_series.shape) > 1: return [hankel(time_series[:self.__window_length + 1], time_series[self.__window_length:]) for time_series diff --git a/fedot_ind/core/operation/transformation/data/kernel_matrix.py b/fedot_ind/core/operation/transformation/data/kernel_matrix.py index c1edfbcb8..3f873cc64 100644 --- a/fedot_ind/core/operation/transformation/data/kernel_matrix.py +++ b/fedot_ind/core/operation/transformation/data/kernel_matrix.py @@ -13,7 +13,7 @@ def __init__(self, time_series, min_signal_ratio, max_signal_ratio, rec_metric): def ts_to_recurrence_matrix(self, threshold=None): - distance_matrix = pdist(metric=self.rec_metric, X=self.time_series.reshape(-1,1)) + distance_matrix = pdist(metric=self.rec_metric, X=self.time_series.reshape(-1, 1)) distance_matrix = np.ones(shape=distance_matrix.shape[0]) - distance_matrix distance_matrix = self.binarization(distance_matrix, threshold=threshold) self.recurrence_matrix = squareform(distance_matrix) @@ -22,7 +22,7 @@ def ts_to_recurrence_matrix(self, def binarization(self, distance_matrix, threshold): best_threshold_flag = False signal_ratio_list = [] - reccurence_matrix = None + recurrence_matrix = None if threshold is None: for threshold_baseline in self.threshold_baseline: threshold = threshold_baseline @@ -33,10 +33,10 @@ def binarization(self, distance_matrix, threshold): if self.min_signal_ratio < signal_ratio < self.max_signal_ratio: best_ratio = signal_ratio - reccurence_matrix = tmp_array + recurrence_matrix = tmp_array best_threshold_flag = True if signal_ratio > best_ratio: - reccurence_matrix = tmp_array + recurrence_matrix = tmp_array else: signal_ratio_list.append(abs(self.max_signal_ratio - signal_ratio)) @@ -45,8 +45,8 @@ def binarization(self, distance_matrix, threshold): if not best_threshold_flag: distance_matrix[distance_matrix < self.threshold_baseline[0]] = 0.0 distance_matrix[distance_matrix >= self.threshold_baseline[0]] = 1.0 - reccurence_matrix = distance_matrix - return reccurence_matrix + recurrence_matrix = distance_matrix + return recurrence_matrix def get_recurrence_metrics(self): if self.recurrence_matrix is None: diff --git a/tests/integration/experiment/test_timeseriesclassifier_preset.py b/tests/integration/experiment/test_timeseriesclassifier_preset.py new file mode 100644 index 000000000..c3ef7b914 --- /dev/null +++ b/tests/integration/experiment/test_timeseriesclassifier_preset.py @@ -0,0 +1,35 @@ +import pytest + +from fedot_ind.core.architecture.experiment.TimeSeriesClassifierPreset import TimeSeriesClassifierPreset +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator + + +@pytest.fixture +def dataset(): + (X_train, y_train), (X_test, y_test) = TimeSeriesDatasetsGenerator(num_samples=30, + max_ts_len=50, + n_classes=2, + test_size=0.5).generate_data() + return X_train, y_train, X_test, y_test + + +@pytest.fixture +def classifier_for_fit(): + params = dict(branch_nodes=['eigen_basis'], + dataset='dataset', + tuning_iters=2, + tuning_timeout=2, + model_params={'problem': 'classification', + 'n_jobs': -1, + 'timeout': 1}, + output_folder='.') + return TimeSeriesClassifierPreset(params) + + +def test_fit_predict(classifier_for_fit, dataset): + X_train, y_train, X_test, y_test = dataset + classifier_for_fit.fit(features=X_train, target=y_train) + labels = classifier_for_fit.predict(features=X_test, target=y_test) + assert len(labels) == len(y_test) + assert classifier_for_fit.preprocessing_pipeline.is_fitted is True + assert classifier_for_fit.predictor.current_pipeline.is_fitted is True diff --git a/tests/unit/architecture/experiment/__init__.py b/tests/unit/architecture/experiment/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/architecture/experiment/test_TimeSeriesClassifier.py b/tests/unit/architecture/experiment/test_TimeSeriesClassifier.py new file mode 100644 index 000000000..efbbb8119 --- /dev/null +++ b/tests/unit/architecture/experiment/test_TimeSeriesClassifier.py @@ -0,0 +1,40 @@ +import pytest + +from fedot_ind.core.architecture.experiment.TimeSeriesClassifier import TimeSeriesClassifier + + +@pytest.fixture +def params(): + return dict(task='ts_classification', + dataset='Ham', + strategy='quantile', + use_cache=False, + timeout=1, + n_jobs=-1, + window_mode=True, + window_size=20) + + +@pytest.fixture +def classifier(params): + return TimeSeriesClassifier(params) + + +def test_init(classifier): + assert classifier.strategy == 'quantile' + assert classifier.model_hyperparams is None + assert classifier.generator_runner is None + assert classifier.dataset_name == 'Ham' + assert classifier.output_folder is None + assert classifier.saver is not None + assert classifier.logger is not None + assert classifier.datacheck is not None + assert classifier.prediction_proba is None + assert classifier.test_predict_hash is None + assert classifier.prediction_label is None + assert classifier.predictor is None + assert classifier.y_train is None + assert classifier.train_features is None + assert classifier.test_features is None + assert classifier.input_test_data is None + assert classifier.logger.name == 'TimeSeriesClassifier' diff --git a/tests/unit/architecture/experiment/test_TimeSeriesClassifierPreset.py b/tests/unit/architecture/experiment/test_TimeSeriesClassifierPreset.py new file mode 100644 index 000000000..5464d304c --- /dev/null +++ b/tests/unit/architecture/experiment/test_TimeSeriesClassifierPreset.py @@ -0,0 +1,37 @@ +import pytest +from fedot.core.pipelines.pipeline import Pipeline + +from fedot_ind.core.architecture.experiment.TimeSeriesClassifierPreset import TimeSeriesClassifierPreset +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator + + +@pytest.fixture +def dataset(): + (X_train, y_train), (X_test, y_test) = TimeSeriesDatasetsGenerator(num_samples=30, + max_ts_len=50, + n_classes=2, + test_size=0.5).generate_data() + return X_train, y_train, X_test, y_test + + +@pytest.fixture +def params(): + return dict(branch_nodes=['eigen_basis'], + dataset='FordA', + model_params={'task': 'classification', + 'n_jobs': -1, + 'timeout': 1}, + output_folder='.') + + +@pytest.fixture +def classifier(params): + return TimeSeriesClassifierPreset(params) + + +def test_init(classifier): + assert classifier.branch_nodes == ['eigen_basis'] + assert classifier.tuning_iters == 30 + assert classifier.tuning_timeout == 15.0 + assert isinstance(classifier.preprocessing_pipeline, Pipeline) + assert classifier.output_folder == '.' diff --git a/tests/unit/architecture/experiment/test_TimeSeriesRegression.py b/tests/unit/architecture/experiment/test_TimeSeriesRegression.py new file mode 100644 index 000000000..39ebfd71d --- /dev/null +++ b/tests/unit/architecture/experiment/test_TimeSeriesRegression.py @@ -0,0 +1,33 @@ +import pytest + +from fedot_ind.core.architecture.experiment.TimeSeriesRegression import TimeSeriesRegression +from fedot_ind.core.models.quantile.quantile_extractor import QuantileExtractor + + +@pytest.fixture +def params(): + return dict(strategy='quantile', + model_params={'problem': 'regression', + 'timeout': 1, + 'n_jobs': 2, + 'metric': 'rmse'}, + generator_class=QuantileExtractor({'window_mode': True, 'window_size': 20}), + use_cache=True, + dataset='ApplianceEnergy', + output_folder='.', + explained_variance=0.9,) + + +@pytest.fixture +def regressor(params): + return TimeSeriesRegression(params) + + +def test_init(regressor): + assert regressor.dataset_name == 'ApplianceEnergy' + assert isinstance(regressor.generator_runner, QuantileExtractor) + assert regressor.strategy == 'quantile' + assert regressor.use_cache is True + assert regressor.pca.n_components == 0.9 + assert regressor.pca.svd_solver == 'full' + assert regressor.model_hyperparams['metric'] == 'rmse' diff --git a/tests/unit/operation/transformation/basis/__init__.py b/tests/unit/operation/transformation/basis/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/operation/transformation/basis/test_eigen_basis.py b/tests/unit/operation/transformation/basis/test_eigen_basis.py new file mode 100644 index 000000000..8dd570afd --- /dev/null +++ b/tests/unit/operation/transformation/basis/test_eigen_basis.py @@ -0,0 +1,56 @@ +import numpy as np +import pytest +from fedot.core.data.data import OutputData + +from fedot_ind.api.utils.input_data import init_input_data +from fedot_ind.core.operation.transformation.basis.eigen_basis import EigenBasisImplementation +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator + + +@pytest.fixture +def dataset(): + (X_train, y_train), (X_test, y_test) = TimeSeriesDatasetsGenerator(num_samples=20, + max_ts_len=50, + n_classes=2, + test_size=0.5).generate_data() + return X_train, y_train, X_test, y_test + + +@pytest.fixture +def input_test(dataset): + X_train, y_train, X_test, y_test = dataset + input_test_data = init_input_data(X_test, y_test) + return input_test_data + + +@pytest.fixture +def input_train(dataset): + X_train, y_train, X_test, y_test = dataset + input_train_data = init_input_data(X_train, y_train) + return input_train_data + + +def test_transform(input_test): + input_train_data = input_test + basis = EigenBasisImplementation({'window_size': 30}) + train_features = basis.transform(input_data=input_train_data) + assert isinstance(train_features, OutputData) + assert train_features.features.shape[0] == input_train_data.features.shape[0] + + +def test_get_threshold(input_train): + basis = EigenBasisImplementation({'window_size': 30}) + threshold = basis.get_threshold(input_train.features) + assert isinstance(threshold, np.int64) + assert threshold > 0 + assert threshold < input_train.features.shape[1] + + +def test_transform_one_sample(input_train): + basis = EigenBasisImplementation({'window_size': 30}) + basis.SV_threshold = 3 + sample = input_train.features[0] + transformed_sample = basis._transform_one_sample(sample) + assert isinstance(transformed_sample, np.ndarray) + assert transformed_sample.shape[0] == basis.SV_threshold + assert transformed_sample.shape[1] == len(sample) diff --git a/tests/unit/operation/transformation/data/__init__.py b/tests/unit/operation/transformation/data/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/operation/transformation/data/test_HankelMatrix.py b/tests/unit/operation/transformation/data/test_HankelMatrix.py new file mode 100644 index 000000000..fefe13ab8 --- /dev/null +++ b/tests/unit/operation/transformation/data/test_HankelMatrix.py @@ -0,0 +1,57 @@ +import pytest + +from fedot_ind.core.operation.transformation.data.hankel import HankelMatrix +from fedot_ind.tools.synthetic.ts_generator import TimeSeriesGenerator + +TS_LENGTH = 1000 + + +@pytest.fixture +def ts_data(): + ts_config = {'ts_type': 'random_walk', + 'length': TS_LENGTH, + 'start_val': 36.6} + return TimeSeriesGenerator(ts_config).get_ts() + + +def test_valid_trajectory_matrix(ts_data, valid_window_size): + trajectory_matrix = HankelMatrix(time_series=ts_data, + window_size=valid_window_size).trajectory_matrix + + assert trajectory_matrix is not None + assert trajectory_matrix.shape[0] == valid_window_size + 1 + assert trajectory_matrix.shape[1] == TS_LENGTH - valid_window_size + + +def test_zero_trajectory_matrix(ts_data, zero_window_size): + trajectory_matrix = HankelMatrix(time_series=ts_data, + window_size=zero_window_size).trajectory_matrix + + made_up_window = int(TS_LENGTH / 3) + assert trajectory_matrix is not None + assert trajectory_matrix.shape[0] == made_up_window + 1 + assert trajectory_matrix.shape[1] == TS_LENGTH - made_up_window + + +def test_over_half_trajectory_matrix(ts_data, over_half_window_size): + trajectory_matrix = HankelMatrix(time_series=ts_data, + window_size=over_half_window_size).trajectory_matrix + made_up_window = int(TS_LENGTH / 3) + assert trajectory_matrix is not None + assert trajectory_matrix.shape[0] == made_up_window + 1 + assert trajectory_matrix.shape[1] == TS_LENGTH - made_up_window + + +@pytest.fixture +def valid_window_size(): + return 100 + + +@pytest.fixture +def over_half_window_size(): + return TS_LENGTH // 2 + 1 + + +@pytest.fixture +def zero_window_size(): + return 0 diff --git a/tests/unit/operation/transformation/data/test_eigen.py b/tests/unit/operation/transformation/data/test_eigen.py new file mode 100644 index 000000000..33cc2c6fb --- /dev/null +++ b/tests/unit/operation/transformation/data/test_eigen.py @@ -0,0 +1,36 @@ +import numpy as np + +from fedot_ind.core.operation.transformation.data.eigen import calculate_corr_matrix, calculate_matrix_norms, \ + combine_eigenvectors, weighted_inner_product + +SAMPLE_DATA = np.array([1, 2, 3, 4, 5]) +TS_LENGTH = 5 +WINDOW_LENGTH = 2 +N_COMPONENTS = 15 +TS_COMPS = np.random.rand(TS_LENGTH, N_COMPONENTS) + + +def test_weighted_inner_product(): + result = weighted_inner_product(SAMPLE_DATA, SAMPLE_DATA, WINDOW_LENGTH, TS_LENGTH) + assert isinstance(result, float) + + +def test_calculate_matrix_norms(): + result = calculate_matrix_norms(TS_COMPS, WINDOW_LENGTH, TS_LENGTH) + assert isinstance(result, np.ndarray) + + +def test_calculate_corr_matrix(): + result, components = calculate_corr_matrix(ts_comps=TS_COMPS, + f_wnorms=calculate_matrix_norms(TS_COMPS, WINDOW_LENGTH, TS_LENGTH), + window_length=WINDOW_LENGTH, + ts_length=TS_LENGTH) + assert isinstance(result, np.ndarray) + assert isinstance(components, list) + assert np.max(result) <= 1 + assert np.min(result) >= 0 + + +def test_combine_eigenvectors(): + result = combine_eigenvectors(TS_COMPS, WINDOW_LENGTH) + assert isinstance(result, list) diff --git a/tests/unit/operation/transformation/data/test_kernel_matrix.py b/tests/unit/operation/transformation/data/test_kernel_matrix.py new file mode 100644 index 000000000..3615b49a4 --- /dev/null +++ b/tests/unit/operation/transformation/data/test_kernel_matrix.py @@ -0,0 +1,52 @@ +import json + +import numpy as np +import pytest +from scipy.spatial.distance import pdist + +from fedot_ind.api.utils.path_lib import PATH_TO_DEFAULT_PARAMS +from fedot_ind.core.operation.transformation.data.kernel_matrix import TSTransformer +from fedot_ind.tools.synthetic.ts_generator import TimeSeriesGenerator + + +@pytest.fixture +def params(): + random_walk_config = { + 'ts_type': 'random_walk', + 'length': 500, + 'start_val': 36.6} + ts = TimeSeriesGenerator(random_walk_config).get_ts() + + with open(PATH_TO_DEFAULT_PARAMS, 'r') as file: + recurrence_extractor_params = json.load(file)['recurrence_extractor'] + + return dict(time_series=ts, + min_signal_ratio=recurrence_extractor_params['min_signal_ratio'], + max_signal_ratio=recurrence_extractor_params['max_signal_ratio'], + rec_metric=recurrence_extractor_params['rec_metric']) + + +@pytest.fixture +def ts_transformer(params): + return TSTransformer(**params) + + +def test_ts_to_recurrence_matrix(ts_transformer, params): + matrix = ts_transformer.ts_to_recurrence_matrix() + assert matrix.shape[0] == matrix.shape[1] + assert matrix.shape[0] == params['time_series'].shape[0] + + +def test_binarization(ts_transformer, params): + dist_matrix = pdist(metric=ts_transformer.rec_metric, + X=params['time_series'].reshape(-1, 1)) + bin_matrix = ts_transformer.binarization(dist_matrix, threshold=None) + + assert len(bin_matrix.shape) == 1 + assert len(np.unique(bin_matrix)) == 2 + + +def test_get_recurrence_metrics(ts_transformer, params): + matrix = ts_transformer.get_recurrence_metrics() + assert matrix.shape[0] == matrix.shape[1] + assert matrix.shape[0] == params['time_series'].shape[0]