From 0b3aa00d9957abfc98e3307d539751f73af44036 Mon Sep 17 00:00:00 2001 From: vadim_potemkin Date: Fri, 22 Sep 2023 14:11:59 +0200 Subject: [PATCH] refactored recur extractor, added test for basises, cleaned up search space --- .../models/recurrence/reccurence_extractor.py | 64 ++--- fedot_ind/core/models/recurrence/sequences.py | 264 ++++++------------ .../topological/topological_extractor.py | 33 +-- .../operation/transformation/basis/fourier.py | 21 +- .../operation/transformation/basis/wavelet.py | 8 +- .../data/default_operation_params.json | 2 +- fedot_ind/core/tuning/search_space.py | 17 +- tests/unit/models/test_feature_extraction.py | 137 --------- tests/unit/models/test_quantile_extractor.py | 56 ++++ .../unit/models/test_topological_extractor.py | 44 +++ .../basis/test_fourier_basis.py | 46 +++ .../basis/test_wavelet_basis.py | 77 +++++ 12 files changed, 373 insertions(+), 396 deletions(-) delete mode 100644 tests/unit/models/test_feature_extraction.py create mode 100644 tests/unit/models/test_quantile_extractor.py create mode 100644 tests/unit/models/test_topological_extractor.py create mode 100644 tests/unit/operation/transformation/basis/test_fourier_basis.py create mode 100644 tests/unit/operation/transformation/basis/test_wavelet_basis.py diff --git a/fedot_ind/core/models/recurrence/reccurence_extractor.py b/fedot_ind/core/models/recurrence/reccurence_extractor.py index cb59dab7c..9de4d85af 100644 --- a/fedot_ind/core/models/recurrence/reccurence_extractor.py +++ b/fedot_ind/core/models/recurrence/reccurence_extractor.py @@ -1,45 +1,43 @@ -from functools import partial -from multiprocessing import Pool from typing import Optional -import numpy as np from fedot.core.data.data import InputData -from fedot.core.repository.dataset_types import DataTypesEnum -from sklearn.preprocessing import StandardScaler from fedot.core.operations.operation_parameters import OperationParameters -from joblib import Parallel, delayed -from tqdm import tqdm -from fedot_ind.core.operation.transformation.data.hankel import HankelMatrix +from fedot.core.repository.dataset_types import DataTypesEnum + from fedot_ind.core.metrics.metrics_implementation import * from fedot_ind.core.models.base_extractor import BaseExtractor +from fedot_ind.core.models.recurrence.sequences import RecurrenceFeatureExtractor +from fedot_ind.core.operation.transformation.data.hankel import HankelMatrix from fedot_ind.core.operation.transformation.data.kernel_matrix import TSTransformer -from fedot_ind.core.models.recurrence.sequences import ReccurenceFeaturesExtractor class RecurrenceExtractor(BaseExtractor): """Class responsible for wavelet feature generator experiment. - Args: - window_mode: boolean flag - if True, window mode is used. Defaults to False. - use_cache: boolean flag - if True, cache is used. Defaults to False. + Attributes: transformer: TSTransformer object. - self.extractor: ReccurenceExtractor object. - train_feats: train features. - test_feats: test features. + self.extractor: RecurrenceExtractor object. + self.window_mode: bool, if True, then the window mode is used. + self.min_signal_ratio: float, the minimum signal ratio. + self.max_signal_ratio: float, the maximum signal ratio. + self.rec_metric: str, the metric for calculating the recurrence matrix. + self.window_size: int, the window size. + Example: - from fedot.core.pipelines.pipeline_builder import PipelineBuilder - from examples.fedot.fedot_ex import init_input_data - from fedot_ind.core.architecture.preprocessing.DatasetLoader import DataLoader - from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels - - train_data, test_data = DataLoader(dataset_name='Ham').load_data() - with IndustrialModels(): - pipeline = PipelineBuilder().add_node('eigen_basis').add_node('recurrence_extractor').add_node( - 'rf').build() - input_data = init_input_data(train_data[0], train_data[1]) - pipeline.fit(input_data) - features = pipeline.predict(input_data) - print(features) + To use this operation you can create pipeline as follows:: + from fedot.core.pipelines.pipeline_builder import PipelineBuilder + from examples.fedot.fedot_ex import init_input_data + from fedot_ind.core.architecture.preprocessing.DatasetLoader import DataLoader + from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels + + train_data, test_data = DataLoader(dataset_name='Ham').load_data() + with IndustrialModels(): + pipeline = PipelineBuilder().add_node('eigen_basis').add_node('recurrence_extractor').add_node( + 'rf').build() + input_data = init_input_data(train_data[0], train_data[1]) + pipeline.fit(input_data) + features = pipeline.predict(input_data) + print(features) """ def __init__(self, params: Optional[OperationParameters] = None): @@ -50,9 +48,9 @@ def __init__(self, params: Optional[OperationParameters] = None): self.min_signal_ratio = params.get('min_signal_ratio') self.max_signal_ratio = params.get('max_signal_ratio') self.rec_metric = params.get('rec_metric') - self.window_size = 10 + self.window_size = params.get('window_size') self.transformer = TSTransformer - self.extractor = ReccurenceFeaturesExtractor + self.extractor = RecurrenceFeatureExtractor def _generate_features_from_ts(self, ts: np.array): @@ -68,7 +66,7 @@ def _generate_features_from_ts(self, ts: np.array): feature_df = specter.ts_to_recurrence_matrix() if not self.image_mode: - feature_df = self.extractor(recurrence_matrix=feature_df).recurrence_quantification_analysis() + feature_df = self.extractor(recurrence_matrix=feature_df).quantification_analysis() features = np.nan_to_num(np.array(list(feature_df.values()))) recurrence_features = InputData(idx=np.arange(len(features)), @@ -79,7 +77,7 @@ def _generate_features_from_ts(self, ts: np.array): supplementary_data={'feature_name': list(feature_df.keys())}) return recurrence_features - def generate_reccurence_features(self, ts: np.array) -> InputData: + def generate_recurrence_features(self, ts: np.array) -> InputData: if len(ts.shape) == 1: aggregation_df = self._generate_features_from_ts(ts) @@ -90,4 +88,4 @@ def generate_reccurence_features(self, ts: np.array) -> InputData: def generate_features_from_ts(self, ts_data: np.array, dataset_name: str = None): - return self.generate_reccurence_features(ts=ts_data) + return self.generate_recurrence_features(ts=ts_data) diff --git a/fedot_ind/core/models/recurrence/sequences.py b/fedot_ind/core/models/recurrence/sequences.py index b2db6330d..b1134b7a6 100644 --- a/fedot_ind/core/models/recurrence/sequences.py +++ b/fedot_ind/core/models/recurrence/sequences.py @@ -1,14 +1,63 @@ from __future__ import division, print_function + import numpy as np -class ReccurenceFeaturesExtractor: - def __init__(self, - recurrence_matrix: np.ndarray = None): +class RecurrenceFeatureExtractor: + def __init__(self, recurrence_matrix: np.ndarray = None): self.recurrence_matrix = recurrence_matrix - def calculate_DFD(self, number_of_vectors): - # Calculating the diagonal frequency distribution - P(l) + def quantification_analysis(self, MDL: int = 3, MVL: int = 3, MWVL: int = 2): + + n_vectors = self.recurrence_matrix.shape[0] + recurrence_rate = float(np.sum(self.recurrence_matrix)) / np.power(n_vectors, 2) + + diagonal_frequency_dist = self.calculate_diagonal_frequency(number_of_vectors=n_vectors) + vertical_frequency_dist = self.calculate_vertical_frequency(number_of_vectors=n_vectors, not_white=1) + white_vertical_frequency_dist = self.calculate_vertical_frequency(number_of_vectors=n_vectors, + not_white=0) + + determinism = self.laminarity_or_determinism(MDL, n_vectors, diagonal_frequency_dist, lam=False) + laminarity = self.laminarity_or_determinism(MVL, n_vectors, vertical_frequency_dist, lam=True) + + average_diagonal_line_length = self.average_line_length(MDL, n_vectors, diagonal_frequency_dist) + average_vertical_line_length = self.average_line_length(MVL, n_vectors, vertical_frequency_dist) + average_white_vertical_line_length = self.average_line_length(MWVL, n_vectors, white_vertical_frequency_dist) + + longest_diagonal_line_length = self.longest_line_length(diagonal_frequency_dist, n_vectors, diag=True) + longest_vertical_line_length = self.longest_line_length(vertical_frequency_dist, n_vectors, diag=False) + longest_white_vertical_line_length = self.longest_line_length(white_vertical_frequency_dist, + n_vectors, diag=False) + + entropy_diagonal_lines = self.entropy_lines(MDL, n_vectors, diagonal_frequency_dist, diag=True) + entropy_vertical_lines = self.entropy_lines(MVL, n_vectors, vertical_frequency_dist, diag=False) + entropy_white_vertical_lines = self.entropy_lines(MWVL, n_vectors, + white_vertical_frequency_dist, diag=False) + + return {'RR': recurrence_rate, 'DET': determinism, 'ADLL': average_diagonal_line_length, + 'LDLL': longest_diagonal_line_length, 'DIV': 1. / longest_diagonal_line_length, + 'EDL': entropy_diagonal_lines, 'LAM': laminarity, 'AVLL': average_vertical_line_length, + 'LVLL': longest_vertical_line_length, 'EVL': entropy_vertical_lines, + 'AWLL': average_white_vertical_line_length, 'LWLL': longest_white_vertical_line_length, + 'EWLL': entropy_white_vertical_lines, 'RDRR': determinism / recurrence_rate, + 'RLD': laminarity / determinism} + + def calculate_vertical_frequency(self, number_of_vectors, not_white: int): + vertical_frequency_distribution = np.zeros(number_of_vectors + 1) + for i in range(number_of_vectors): + vertical_line_length = 0 + for j in range(number_of_vectors): + if self.recurrence_matrix[i, j] == not_white: + vertical_line_length += 1 + if j == (number_of_vectors - 1): + vertical_frequency_distribution[vertical_line_length] += 1.0 + else: + if vertical_line_length != 0: + vertical_frequency_distribution[vertical_line_length] += 1.0 + vertical_line_length = 0 + return vertical_frequency_distribution + + def calculate_diagonal_frequency(self, number_of_vectors): diagonal_frequency_distribution = np.zeros(number_of_vectors + 1) for i in range(number_of_vectors - 1, -1, -1): diagonal_line_length = 0 @@ -36,176 +85,35 @@ def calculate_DFD(self, number_of_vectors): diagonal_line_length = 0 return diagonal_frequency_distribution - def calculate_VFD(self, number_of_vectors): - # Calculating the vertical frequency distribution - P(v) - vertical_frequency_distribution = np.zeros(number_of_vectors + 1) - for i in range(number_of_vectors): - vertical_line_length = 0 - for j in range(number_of_vectors): - if self.recurrence_matrix[i, j] == 1: - vertical_line_length += 1 - if j == (number_of_vectors - 1): - vertical_frequency_distribution[vertical_line_length] += 1.0 - else: - if vertical_line_length != 0: - vertical_frequency_distribution[vertical_line_length] += 1.0 - vertical_line_length = 0 - return vertical_frequency_distribution - - def calculate_WVFD(self, number_of_vectors): - # Calculating the white vertical frequency distribution - P(w) - white_vertical_frequency_distribution = np.zeros(number_of_vectors + 1) - for i in range(number_of_vectors): - white_vertical_line_length = 0 - for j in range(number_of_vectors): - if self.recurrence_matrix[i, j] == 0: - white_vertical_line_length += 1 - if j == (number_of_vectors - 1): - white_vertical_frequency_distribution[white_vertical_line_length] += 1.0 - else: - if white_vertical_line_length != 0: - white_vertical_frequency_distribution[white_vertical_line_length] += 1.0 - white_vertical_line_length = 0 - return white_vertical_frequency_distribution - - def calculate_EVWL(self, white_vertical_frequency_distribution, MWVL, number_of_vectors): - longest_white_vertical_line_length = 1 - # Calculating the longest white vertical line length - Wmax - for w in range(number_of_vectors, 0, -1): - if white_vertical_frequency_distribution[w] != 0: - longest_white_vertical_line_length = w - break - - # Calculating the entropy white vertical lines - Wentr - sum_white_vertical_frequency_distribution = float( - np.sum(white_vertical_frequency_distribution[MWVL:])) - entropy_white_vertical_lines = 0 - for w in range(MWVL, number_of_vectors + 1): - if white_vertical_frequency_distribution[w] != 0: - entropy_white_vertical_lines += (white_vertical_frequency_distribution[ - w] / sum_white_vertical_frequency_distribution) * np.log( - white_vertical_frequency_distribution[w] / sum_white_vertical_frequency_distribution) - entropy_white_vertical_lines *= -1 - - return entropy_white_vertical_lines, longest_white_vertical_line_length - - def recurrence_quantification_analysis(self, - MDL=3, - MVL=3, - MWVL=2): - # Calculating the number of states - N - number_of_vectors = self.recurrence_matrix.shape[0] - - # Calculating the recurrence rate - RR - recurrence_rate = float(np.sum(self.recurrence_matrix)) / np.power(number_of_vectors, 2) - - diagonal_frequency_distribution = self.calculate_DFD( - number_of_vectors=number_of_vectors) - - vertical_frequency_distribution = self.calculate_VFD(number_of_vectors=number_of_vectors) - - white_vertical_frequency_distribution = self.calculate_WVFD(number_of_vectors=number_of_vectors) - - # Calculating the determinism - DET - numerator = np.sum( - [l * diagonal_frequency_distribution[l] for l in range(MDL, number_of_vectors)]) - denominator = np.sum([l * diagonal_frequency_distribution[l] for l in range(1, number_of_vectors)]) - determinism = numerator / denominator - - # Calculating the average diagonal line length - L - numerator = np.sum( - [l * diagonal_frequency_distribution[l] for l in range(MDL, number_of_vectors)]) - denominator = np.sum( - [diagonal_frequency_distribution[l] for l in range(MDL, number_of_vectors)]) - average_diagonal_line_length = numerator / denominator - longest_diagonal_line_length = 1 - - # Calculating the longest diagonal line length - Lmax - for l in range(number_of_vectors - 1, 0, -1): - if diagonal_frequency_distribution[l] != 0: - longest_diagonal_line_length = l - break - - # Calculating the divergence - DIV - divergence = 1. / longest_diagonal_line_length - - # Calculating the entropy diagonal lines - Lentr - sum_diagonal_frequency_distribution = float( - np.sum(diagonal_frequency_distribution[MDL:-1])) - entropy_diagonal_lines = 0 - for l in range(MDL, number_of_vectors): - if diagonal_frequency_distribution[l] != 0: - entropy_diagonal_lines += (diagonal_frequency_distribution[ - l] / sum_diagonal_frequency_distribution) * np.log( - diagonal_frequency_distribution[l] / sum_diagonal_frequency_distribution) - entropy_diagonal_lines *= -1 - - # Calculating the ratio determinism_recurrence - DET/RR - ratio_determinism_recurrence_rate = determinism / recurrence_rate - - # Calculating the laminarity - LAM - numerator = np.sum([v * vertical_frequency_distribution[v] for v in - range(MVL, number_of_vectors + 1)]) - denominator = np.sum([v * vertical_frequency_distribution[v] for v in range(1, number_of_vectors + 1)]) - laminarity = numerator / denominator - - # Calculating the average vertical line length - V - numerator = np.sum([v * vertical_frequency_distribution[v] for v in - range(MVL, number_of_vectors + 1)]) - denominator = np.sum( - [vertical_frequency_distribution[v] for v in range(MVL, number_of_vectors + 1)]) - average_vertical_line_length = numerator / denominator - - longest_vertical_line_length = 1 - # Calculating the longest vertical line length - Vmax - for v in range(number_of_vectors, 0, -1): - if vertical_frequency_distribution[v] != 0: - longest_vertical_line_length = v - break - - # Calculating the entropy vertical lines - Ventr - sum_vertical_frequency_distribution = float( - np.sum(vertical_frequency_distribution[MVL:])) - entropy_vertical_lines = 0 - for v in range(MVL, number_of_vectors + 1): - if vertical_frequency_distribution[v] != 0: - entropy_vertical_lines += (vertical_frequency_distribution[ - v] / sum_vertical_frequency_distribution) * np.log( - vertical_frequency_distribution[v] / sum_vertical_frequency_distribution) - entropy_vertical_lines *= -1 - - # Calculatint the ratio laminarity_determinism - LAM/DET - ratio_laminarity_determinism = laminarity / determinism - - # Calculating the average white vertical line length - W - numerator = np.sum([w * white_vertical_frequency_distribution[w] for w in - range(MWVL, number_of_vectors + 1)]) - denominator = np.sum([white_vertical_frequency_distribution[w] for w in - range(MWVL, number_of_vectors + 1)]) - average_white_vertical_line_length = numerator / denominator - - entropy_white_vertical_lines, longest_white_vertical_line_length = self.calculate_EVWL( - white_vertical_frequency_distribution=white_vertical_frequency_distribution, - MWVL=MWVL, - number_of_vectors=number_of_vectors) - - feature_dict = { - # 'DFD': diagonal_frequency_distribution, - # 'VFD': vertical_frequency_distribution, - # 'WVFD': white_vertical_frequency_distribution, - 'RR': recurrence_rate, - 'DET': determinism, - 'ADLL': average_diagonal_line_length, - 'LDLL': longest_diagonal_line_length, - 'Div': divergence, - 'EDL': entropy_diagonal_lines, - 'Lam': laminarity, - 'AVLL': average_vertical_line_length, - 'LVLL': longest_vertical_line_length, - 'EVL': entropy_vertical_lines, - 'AWLL': average_white_vertical_line_length, - 'LWLL': longest_white_vertical_line_length, - 'EWLL': entropy_white_vertical_lines, - 'RDRR': ratio_determinism_recurrence_rate, - 'RLD': ratio_laminarity_determinism} - return feature_dict + def entropy_lines(self, factor, number_of_vectors, distribution, diag: bool): + if diag: + sum_frequency_distribution = float(np.sum(distribution[factor:-1])) + else: + number_of_vectors = number_of_vectors + 1 + sum_frequency_distribution = float(np.sum(distribution[factor:])) + + entropy_lines = 0 + for i in range(factor, number_of_vectors): + if distribution[i] != 0: + entropy_lines += (distribution[i] / sum_frequency_distribution) * \ + np.log(distribution[i] / sum_frequency_distribution) + return -entropy_lines + + def laminarity_or_determinism(self, factor, number_of_vectors, distribution, lam: bool): + if lam: + number_of_vectors = number_of_vectors + 1 + numerator = np.sum([i * distribution[i] for i in range(factor, number_of_vectors)]) + denominator = np.sum([i * distribution[i] for i in range(1, number_of_vectors)]) + return numerator / denominator + + def longest_line_length(self, frequency_distribution, number_of_vectors, diag: bool): + longest_line_length = 1 + for i in range(number_of_vectors, 0, -1): + if frequency_distribution[i] != 0: + return i + return longest_line_length + + def average_line_length(self, factor, number_of_vectors, distribution): + numerator = np.sum([i * distribution[i] for i in range(factor, number_of_vectors + 1)]) + denominator = np.sum([distribution[i] for i in range(factor, number_of_vectors + 1)]) + return numerator / denominator diff --git a/fedot_ind/core/models/topological/topological_extractor.py b/fedot_ind/core/models/topological/topological_extractor.py index f27a1368e..b7ebf2a1b 100644 --- a/fedot_ind/core/models/topological/topological_extractor.py +++ b/fedot_ind/core/models/topological/topological_extractor.py @@ -41,23 +41,25 @@ class TopologicalExtractor(BaseExtractor): """Class for extracting topological features from time series data. + Args: params: parameters for operation - Example: - from fedot.core.pipelines.pipeline_builder import PipelineBuilder - from examples.fedot.fedot_ex import init_input_data - from fedot_ind.core.architecture.preprocessing.DatasetLoader import DataLoader - from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels - - train_data, test_data = DataLoader(dataset_name='Ham').load_data() - with IndustrialModels(): - pipeline = PipelineBuilder().add_node('eigen_basis').add_node('topological_extractor').add_node( - 'rf').build() - input_data = init_input_data(train_data[0], train_data[1]) - pipeline.fit(input_data) - features = pipeline.predict(input_data) - print(features) + Example: + To use this operation you can create pipeline as follows:: + from fedot.core.pipelines.pipeline_builder import PipelineBuilder + from fedot_ind.api.utils.input_data import init_input_data + from fedot_ind.core.architecture.preprocessing.DatasetLoader import DataLoader + from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels + + train_data, test_data = DataLoader(dataset_name='Ham').load_data() + with IndustrialModels(): + pipeline = PipelineBuilder().add_node('eigen_basis').add_node('topological_extractor').add_node( + 'rf').build() + input_data = init_input_data(train_data[0], train_data[1]) + pipeline.fit(input_data) + features = pipeline.predict(input_data) + print(features) """ def __init__(self, params: Optional[OperationParameters] = None): @@ -102,7 +104,7 @@ def generate_topological_features(self, ts: np.array, persistence_params: dict = None) -> InputData: if persistence_params is not None: - self._evaluate_persistence_params(ts) + self.__evaluate_persistence_params(ts) if len(ts.shape) == 1: aggregation_df = self._generate_features_from_ts(ts, persistence_params) @@ -150,4 +152,3 @@ def get_embedding_params_from_batch(self, ts_data: pd.DataFrame, method: str = ' @staticmethod def _mode(arr: list) -> int: return int(stats.mode(arr)[0][0]) - diff --git a/fedot_ind/core/operation/transformation/basis/fourier.py b/fedot_ind/core/operation/transformation/basis/fourier.py index 4687b7b48..ba15bb031 100644 --- a/fedot_ind/core/operation/transformation/basis/fourier.py +++ b/fedot_ind/core/operation/transformation/basis/fourier.py @@ -1,6 +1,8 @@ from typing import Optional + import numpy as np from fedot.core.operations.operation_parameters import OperationParameters + from fedot_ind.core.operation.transformation.basis.abstract_basis import BasisDecompositionImplementation @@ -11,36 +13,25 @@ class FourierBasisImplementation(BasisDecompositionImplementation): ts1 = np.random.rand(200) ts2 = np.random.rand(200) ts = [ts1, ts2] - bss = FourierBasisImplementation({'spectrum_type': 'real'}) - basis_multi = bss._transform(ts) - basis_1d = bss._transform(ts1) + bss = FourierBasisImplementation({'threshold': 20000'}) + basis_multi = bss.transform(ts) + basis_1d = bss.transform(ts1) """ def __init__(self, params: Optional[OperationParameters] = None): super().__init__(params) - self.spectrum_type = params.get('spectrum_type') self.threshold = params.get('threshold') self.basis = None self.logging_params.update({'threshold': self.threshold}) - def low_pass(self, input_data): + def _decompose_signal(self, input_data): fourier_coef = np.fft.rfft(input_data) frequencies = np.fft.rfftfreq(input_data.size, d=2e-3 / input_data.size) fourier_coef[frequencies > self.threshold] = 0 return np.fft.irfft(fourier_coef) - def _decompose_signal(self, input_data): - spectrum = np.fft.fft(input_data) - if self.spectrum_type == 'imaginary': - spectrum = spectrum.imag - elif self.spectrum_type == 'smoothed': - spectrum = self.low_pass(input_data) - else: - spectrum = spectrum.real - return spectrum - def _transform_one_sample(self, series: np.array): return self._get_basis(series) diff --git a/fedot_ind/core/operation/transformation/basis/wavelet.py b/fedot_ind/core/operation/transformation/basis/wavelet.py index b9d14cf1c..b8b84564b 100644 --- a/fedot_ind/core/operation/transformation/basis/wavelet.py +++ b/fedot_ind/core/operation/transformation/basis/wavelet.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Tuple import numpy as np import pywt from fedot.core.operations.operation_parameters import OperationParameters @@ -27,7 +27,7 @@ def __init__(self, params: Optional[OperationParameters] = None): self.continuous_wavelets = pywt.wavelist(kind='continuous') self.scales = [2, 4, 10, 20] - def _decompose_signal(self, input_data): + def _decompose_signal(self, input_data) -> Tuple[np.array, np.array]: if self.wavelet in self.discrete_wavelets: high_freq, low_freq = pywt.dwt(input_data, self.wavelet, 'smooth') else: @@ -39,7 +39,7 @@ def _decompose_signal(self, input_data): low_freq = low_freq[np.newaxis, :] return high_freq, low_freq - def _decomposing_level(self): + def _decomposing_level(self) -> int: """The level of decomposition of the time series. Returns: @@ -50,7 +50,7 @@ def _decomposing_level(self): def _transform_one_sample(self, series: np.array): return self._get_basis(series) - def _get_1d_basis(self, data): + def _get_1d_basis(self, data) -> np.array: decompose = lambda signal: ListMonad(self._decompose_signal(signal)) threshold = lambda Monoid: ListMonad([Monoid[0][ diff --git a/fedot_ind/core/repository/data/default_operation_params.json b/fedot_ind/core/repository/data/default_operation_params.json index 319ad2e23..46716bd2c 100644 --- a/fedot_ind/core/repository/data/default_operation_params.json +++ b/fedot_ind/core/repository/data/default_operation_params.json @@ -167,7 +167,7 @@ }, "recurrence_extractor": { "window_size": 20, - "win_mode": true, + "window_mode": true, "min_signal_ratio": 0.5, "max_signal_ratio": 0.75, "rec_metric": "euclidean" diff --git a/fedot_ind/core/tuning/search_space.py b/fedot_ind/core/tuning/search_space.py index 5104a8d08..2e2387300 100644 --- a/fedot_ind/core/tuning/search_space.py +++ b/fedot_ind/core/tuning/search_space.py @@ -6,24 +6,18 @@ industrial_search_space = { 'eigen_basis': - { - # 'sv_selector': {'hyperopt-dist': hp.choice, 'sampling-scope': [['median', '0.75%', '0.25%']]}, - 'window_size': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(5, 50, 5)]]}}, + {'window_size': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(5, 50, 5)]]}}, 'wavelet_basis': {'n_components': {'hyperopt-dist': hp.uniformint, 'sampling-scope': [2, 10]}, 'wavelet': {'hyperopt-dist': hp.choice, 'sampling-scope': [['mexh', 'shan', 'morl', 'cmor', 'fbsp', 'db5', 'sym5']]}}, 'fourier_basis': - {'spectrum': {'hyperopt-dist': hp.choice, 'sampling-scope': [['smoothed']]}, - 'threshold': {'hyperopt-dist': hp.uniformint, 'sampling-scope': [10000, 50000]}}, - + {'threshold': {'hyperopt-dist': hp.uniformint, 'sampling-scope': [10000, 50000]}}, 'quantile_extractor': {'window_mode': {'hyperopt-dist': hp.choice, 'sampling-scope': [[True, True]]}, - # 'var_threshold': {'hyperopt-dist': hp.choice, 'sampling-scope': [np.linspace(0, 0.02, 35)]}, 'window_size': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(1, 50, 3)]]}}, - 'recurrence_extractor': - {'win_mode': (hp.choice, [[True, False]]), + {'window_mode': (hp.choice, [[True, False]]), 'window_size': (hp.uniformint, [1, 50]), 'min_signal_ratio': (hp.uniform, [0, 0.5]), 'max_signal_ratio': (hp.uniform, [0.5, 1]), @@ -32,9 +26,8 @@ {'wavelet': {'hyperopt-dist': hp.choice, 'sampling-scope': [['mexh', 'shan', 'morl', 'cmor', 'fbsp', 'db5', 'sym5']]}}, 'data_driven_basis_for_forecasting': - { - 'window_size': {'hyperopt-dist': hp.uniformint, 'sampling-scope': [5, 200]}} -} + {'window_size': {'hyperopt-dist': hp.uniformint, 'sampling-scope': [5, 200]}} + } def get_industrial_search_space(self): diff --git a/tests/unit/models/test_feature_extraction.py b/tests/unit/models/test_feature_extraction.py deleted file mode 100644 index 3f76d03c1..000000000 --- a/tests/unit/models/test_feature_extraction.py +++ /dev/null @@ -1,137 +0,0 @@ -import json -import os - -import pytest - - -from fedot_ind.api.utils.path_lib import PATH_TO_DEFAULT_PARAMS, PROJECT_PATH -from fedot_ind.core.models.recurrence.reccurence_extractor import RecurrenceExtractor -from fedot_ind.core.models.quantile.quantile_extractor import QuantileExtractor -from fedot_ind.core.models.topological.topological_extractor import TopologicalExtractor -from fedot_ind.core.architecture.preprocessing.DatasetLoader import DataLoader - - -@pytest.fixture() -def feature_generators_list(): - return [QuantileExtractor, RecurrenceExtractor, TopologicalExtractor] - - -@pytest.fixture() -def get_multilabel_data(): - path_to_local_folder = os.path.join(PROJECT_PATH, 'tests', 'data', 'datasets', 'classification_multi') - - (train_features, train_target), (test_features, test_target) = DataLoader(dataset_name='ECG5000_small', - folder=path_to_local_folder).load_data() - return (train_features, train_target), (test_features, test_target) - - -@pytest.fixture() -def get_binary_data(): - path_to_local_folder = os.path.join(PROJECT_PATH, 'tests', 'data', 'datasets', 'classification_binary') - - (train_features, train_target), (test_features, test_target) = DataLoader(dataset_name='ECG200_small', - folder=path_to_local_folder).load_data() - return (train_features, train_target), (test_features, test_target) - - -def get_generator_params(generator_name: str): - with open(PATH_TO_DEFAULT_PARAMS, 'r') as file: - _feature_gen_params = json.load(file) - params = _feature_gen_params[f'{generator_name}_extractor'] - return params - - -@pytest.fixture() -def get_topological_extractor(): - _params = get_generator_params('topological') - return TopologicalExtractor(_params) - - -@pytest.fixture() -def get_quantile_extractor(): - _params = get_generator_params('quantile') - return QuantileExtractor(_params) - - -@pytest.fixture() -def get_recurrence_extractor(): - _params = get_generator_params('recurrence') - return RecurrenceExtractor(_params) - - -def test_topological_binary(get_binary_data, get_topological_extractor): - (train_features, train_target), (test_features, test_target) = get_binary_data - model = get_topological_extractor - - train_features = model.generate_features_from_ts(train_features) - test_features = model.generate_features_from_ts(test_features) - - assert train_features is not None - assert test_features is not None - assert train_features.shape[0] == train_target.shape[0] - assert test_features.shape[0] == test_target.shape[0] - - -def test_topological_multilabel(get_multilabel_data, get_topological_extractor): - (train_features, train_target), (test_features, test_target) = get_multilabel_data - model = get_topological_extractor - - train_features = model.generate_features_from_ts(train_features) - test_features = model.generate_features_from_ts(test_features) - - assert train_features is not None - assert test_features is not None - assert train_features.shape[0] == train_target.shape[0] - assert test_features.shape[0] == test_target.shape[0] - - -def test_stats_binary(get_binary_data, get_quantile_extractor): - (train_features, train_target), (test_features, test_target) = get_binary_data - model = get_quantile_extractor - - train_features = model.generate_features_from_ts(train_features) - test_features = model.generate_features_from_ts(test_features) - - assert train_features is not None - assert test_features is not None - assert train_features.shape[0] == train_target.shape[0] - assert test_features.shape[0] == test_target.shape[0] - - -def test_stats_multilabel(get_multilabel_data, get_quantile_extractor): - (train_features, train_target), (test_features, test_target) = get_multilabel_data - model = get_quantile_extractor - - train_features = model.generate_features_from_ts(train_features) - test_features = model.generate_features_from_ts(test_features) - - assert train_features is not None - assert test_features is not None - assert train_features.shape[0] == train_target.shape[0] - assert test_features.shape[0] == test_target.shape[0] - - -def test_recurrence_binary(get_binary_data, get_recurrence_extractor): - (train_features, train_target), (test_features, test_target) = get_binary_data - model = get_recurrence_extractor - - train_features = model.generate_features_from_ts(train_features) - test_features = model.generate_features_from_ts(test_features) - - assert train_features is not None - assert test_features is not None - assert train_features.shape[0] == train_target.shape[0] - assert test_features.shape[0] == test_target.shape[0] - - -def test_recurrence_multilabel(get_multilabel_data, get_recurrence_extractor): - (train_features, train_target), (test_features, test_target) = get_multilabel_data - model = get_recurrence_extractor - - train_features = model.generate_features_from_ts(train_features) - test_features = model.generate_features_from_ts(test_features) - - assert train_features is not None - assert test_features is not None - assert train_features.shape[0] == train_target.shape[0] - assert test_features.shape[0] == test_target.shape[0] diff --git a/tests/unit/models/test_quantile_extractor.py b/tests/unit/models/test_quantile_extractor.py new file mode 100644 index 000000000..91d797290 --- /dev/null +++ b/tests/unit/models/test_quantile_extractor.py @@ -0,0 +1,56 @@ +import math + +import numpy as np +import pytest +from fedot.core.data.data import OutputData + +from fedot_ind.api.utils.input_data import init_input_data +from fedot_ind.core.models.quantile.quantile_extractor import QuantileExtractor +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator +from fedot_ind.core.models.quantile.stat_methods import stat_methods, stat_methods_global + + +FEATURES = list(stat_methods.keys()) + list(stat_methods_global.keys()) + + +def dataset(n_classes): + (X_train, y_train), (X_test, y_test) = TimeSeriesDatasetsGenerator(num_samples=20, + max_ts_len=50, + n_classes=n_classes, + test_size=0.5).generate_data() + return X_train, y_train, X_test, y_test + + +@pytest.fixture +def input_data(): + N_CLASSES = np.random.choice([2, 3]) + X_train, y_train, X_test, y_test = dataset(N_CLASSES) + input_train_data = init_input_data(X_train, y_train) + return input_train_data + + +@pytest.fixture +def quantile_extractor(): + return QuantileExtractor({'window_mode': False}) + + +@pytest.fixture +def quantile_extractor_window(): + return QuantileExtractor({'window_mode': True, 'window_size': 20}) + + +def test_transform(quantile_extractor, input_data): + train_features = quantile_extractor.transform(input_data=input_data) + assert train_features is not None + assert isinstance(train_features, OutputData) + assert len(FEATURES) == train_features.predict.shape[1] + + +def test_transform_window(quantile_extractor_window, input_data): + train_features_window = quantile_extractor_window.transform(input_data=input_data) + window = quantile_extractor_window.window_size + len_ts = input_data.features.shape[1] + expected_n_features = len(stat_methods_global.keys()) + math.ceil(len_ts / (len_ts*window/100)) * len(stat_methods.keys()) + assert train_features_window is not None + assert isinstance(train_features_window, OutputData) + assert expected_n_features == train_features_window.predict.shape[1] diff --git a/tests/unit/models/test_topological_extractor.py b/tests/unit/models/test_topological_extractor.py new file mode 100644 index 000000000..f40cd7f86 --- /dev/null +++ b/tests/unit/models/test_topological_extractor.py @@ -0,0 +1,44 @@ +import math + +import numpy as np +import pytest +from fedot.core.data.data import InputData, OutputData + +from fedot_ind.api.utils.input_data import init_input_data +from fedot_ind.core.models.topological.topological_extractor import TopologicalExtractor +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator + + +def dataset(n_classes): + (X_train, y_train), (X_test, y_test) = TimeSeriesDatasetsGenerator(num_samples=20, + max_ts_len=50, + n_classes=n_classes, + test_size=0.5).generate_data() + return X_train, y_train, X_test, y_test + + +@pytest.fixture +def input_data(): + N_CLASSES = np.random.choice([2, 3]) + X_train, y_train, X_test, y_test = dataset(N_CLASSES) + input_train_data = init_input_data(X_train, y_train) + return input_train_data + + +@pytest.fixture +def topological_extractor(): + return TopologicalExtractor({'window_size': 50}) + + +def test_transform(topological_extractor, input_data): + train_features = topological_extractor.transform(input_data=input_data) + assert train_features is not None + assert isinstance(train_features, OutputData) + + +def test_generate_topological_features(topological_extractor, input_data): + sample = input_data.features[0] + train_features = topological_extractor.generate_topological_features(sample) + assert train_features is not None + assert isinstance(train_features, InputData) + assert train_features.features.shape[0] == 1 diff --git a/tests/unit/operation/transformation/basis/test_fourier_basis.py b/tests/unit/operation/transformation/basis/test_fourier_basis.py new file mode 100644 index 000000000..138e467d4 --- /dev/null +++ b/tests/unit/operation/transformation/basis/test_fourier_basis.py @@ -0,0 +1,46 @@ +import numpy as np +import pytest +from fedot.core.data.data import OutputData + +from fedot_ind.api.utils.input_data import init_input_data +from fedot_ind.core.operation.transformation.basis.fourier import FourierBasisImplementation +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator + + +@pytest.fixture +def dataset(): + (X_train, y_train), (X_test, y_test) = TimeSeriesDatasetsGenerator(num_samples=20, + max_ts_len=50, + n_classes=2, + test_size=0.5).generate_data() + return X_train, y_train, X_test, y_test + + +@pytest.fixture +def input_train(dataset): + X_train, y_train, X_test, y_test = dataset + input_train_data = init_input_data(X_train, y_train) + return input_train_data + + +def test_transform(input_train): + basis = FourierBasisImplementation({"threshold": 20000}) + train_features = basis.transform(input_data=input_train) + assert isinstance(train_features, OutputData) + assert train_features.features.shape[0] == input_train.features.shape[0] + + +def test_transform_one_sample(input_train): + basis = FourierBasisImplementation({"threshold": 20000}) + sample = input_train.features[0] + transformed_sample = basis._transform_one_sample(sample) + assert isinstance(transformed_sample, np.ndarray) + assert transformed_sample.shape[0] == len(sample) + + +def test_decompose_signal(input_train): + basis = FourierBasisImplementation({"threshold": 20000}) + sample = input_train.features[0] + transformed_sample = basis._decompose_signal(sample) + assert isinstance(transformed_sample, np.ndarray) + assert transformed_sample.shape[0] == len(sample) diff --git a/tests/unit/operation/transformation/basis/test_wavelet_basis.py b/tests/unit/operation/transformation/basis/test_wavelet_basis.py new file mode 100644 index 000000000..5faaf05ac --- /dev/null +++ b/tests/unit/operation/transformation/basis/test_wavelet_basis.py @@ -0,0 +1,77 @@ +import numpy as np +import pytest +import pywt +from fedot.core.data.data import OutputData + +from fedot_ind.api.utils.input_data import init_input_data +from fedot_ind.core.operation.transformation.basis.wavelet import WaveletBasisImplementation +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator + + +WAVELETS = ['mexh', 'shan', 'morl', 'cmor', 'fbsp', 'db5', 'sym5'] +N_COMPONENTS = list(range(2, 12, 2)) + +@pytest.fixture +def dataset(): + (X_train, y_train), (X_test, y_test) = TimeSeriesDatasetsGenerator(num_samples=20, + max_ts_len=50, + n_classes=2, + test_size=0.5).generate_data() + return X_train, y_train, X_test, y_test + + +@pytest.fixture +def input_train(dataset): + X_train, y_train, X_test, y_test = dataset + input_train_data = init_input_data(X_train, y_train) + return input_train_data + + +def test_transform(input_train): + wavelet = np.random.choice(WAVELETS) + n_comps = np.random.choice(N_COMPONENTS) + basis = WaveletBasisImplementation({"wavelet": wavelet, "n_components": n_comps}) + train_features = basis.transform(input_data=input_train) + assert isinstance(train_features, OutputData) + assert train_features.features.shape[0] == input_train.features.shape[0] + + +def test_decompose_signal(input_train): + wavelet = np.random.choice(WAVELETS) + n_comps = np.random.choice(N_COMPONENTS) + basis = WaveletBasisImplementation({"wavelet": wavelet, "n_components": n_comps}) + sample = input_train.features[0] + transformed_sample = basis._decompose_signal(sample) + assert isinstance(transformed_sample, tuple) + assert len(transformed_sample) == 2 + + +def test_decomposing_level(input_train): + wavelet = np.random.choice(WAVELETS) + n_comps = np.random.choice(N_COMPONENTS) + basis = WaveletBasisImplementation({"wavelet": wavelet, "n_components": n_comps}) + sample = input_train.features[0] + discrete_wavelets = pywt.wavelist(kind='discrete') + basis.time_series = sample + basis.wavelet = np.random.choice(discrete_wavelets) + decomposing_level = basis._decomposing_level() + assert isinstance(decomposing_level, int) + assert decomposing_level > 0 + + +def test_transform_one_sample(input_train): + wavelet = np.random.choice(WAVELETS) + n_comps = np.random.choice(N_COMPONENTS) + basis = WaveletBasisImplementation({"wavelet": wavelet, "n_components": n_comps}) + sample = input_train.features[0] + transformed_sample = basis._transform_one_sample(sample) + assert isinstance(transformed_sample, np.ndarray) + + +def test_get_1d_bassis(input_train): + wavelet = np.random.choice(WAVELETS) + n_comps = np.random.choice(N_COMPONENTS) + basis = WaveletBasisImplementation({"wavelet": wavelet, "n_components": n_comps}) + sample = input_train.features[0] + extracted_basis = basis._get_1d_basis(sample) + assert isinstance(extracted_basis, np.ndarray)