diff --git a/fedot/api/api_utils/api_composer.py b/fedot/api/api_utils/api_composer.py index 5a0545a85b..efc34d3a2a 100644 --- a/fedot/api/api_utils/api_composer.py +++ b/fedot/api/api_utils/api_composer.py @@ -1,5 +1,6 @@ import datetime import gc +from copy import deepcopy from typing import List, Optional, Sequence, Tuple, Union from golem.core.log import default_log @@ -97,7 +98,7 @@ def propose_and_fit_initial_assumption(self, train_data: InputData) -> Tuple[Seq with self.timer.launch_assumption_fit(): fitted_assumption = \ - assumption_handler.fit_assumption_and_check_correctness(initial_assumption[0], + assumption_handler.fit_assumption_and_check_correctness(deepcopy(initial_assumption[0]), pipelines_cache=self.pipelines_cache, preprocessing_cache=self.preprocessing_cache, eval_n_jobs=self.params.n_jobs) diff --git a/fedot/core/composer/gp_composer/specific_operators.py b/fedot/core/composer/gp_composer/specific_operators.py index 7799fe432f..ed8e11ba7c 100644 --- a/fedot/core/composer/gp_composer/specific_operators.py +++ b/fedot/core/composer/gp_composer/specific_operators.py @@ -19,7 +19,9 @@ def parameter_change_mutation(pipeline: Pipeline, requirements, graph_gen_params node_mutation_probability = get_mutation_prob(mut_id=parameters.mutation_strength, node=pipeline.root_node) for node in pipeline.nodes: - if random() < node_mutation_probability: + lagged = node.operation.metadata.id in ('lagged', 'sparse_lagged', 'exog_ts') + do_mutation = random() < (node_mutation_probability * (0.5 if lagged else 1)) + if do_mutation: operation_name = node.operation.operation_type current_params = node.parameters diff --git a/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py b/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py index 614c52bb2a..7222872be4 100644 --- a/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py +++ b/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py @@ -1,8 +1,11 @@ from copy import copy, deepcopy +from random import random from typing import Optional, Union import numpy as np import pandas as pd + +from fedot.utilities.window_size_selector import WindowSizeSelector, WindowSizeSelectorMethodsEnum from golem.core.log import default_log from scipy.ndimage import gaussian_filter from sklearn.decomposition import TruncatedSVD @@ -103,9 +106,9 @@ def transform_for_fit(self, input_data: InputData) -> OutputData: self._update_column_types(output_data) return output_data - def _check_and_correct_window_size(self, time_series: np.array, forecast_length: int): + def _check_and_correct_window_size(self, time_series: np.ndarray, forecast_length: int): """ Method check if the length of the time series is not enough for - lagged transformation - clip it + lagged transformation Args: time_series: time series for transformation @@ -114,10 +117,24 @@ def _check_and_correct_window_size(self, time_series: np.array, forecast_length: Returns: """ + max_allowed_window_size = max(1, len(time_series) - forecast_length - 1) + + if self.window_size == 0: + selector = WindowSizeSelector(method=WindowSizeSelectorMethodsEnum.HAC, window_range=(5, 60)) + new = int(selector.apply(time_series) * time_series.shape[0] * 0.01) + new = min(max_allowed_window_size, new) + self.log.message((f"Window size of lagged transformation was changed " + f"by WindowSizeSelector from {self.params.get('window_size')} to {new}")) + self.params.update(window_size=new) # Maximum threshold - if self.window_size + forecast_length > len(time_series): - raise ValueError(f"Window size is to high ({self.window_size}) for provided data len {len(time_series)}") + if self.window_size > max_allowed_window_size: + new = int(random() * max_allowed_window_size) + new = min(new, max_allowed_window_size) + new = max(new, self.window_size_minimum) + self.log.info(("Window size of lagged transformation was changed " + f"from {self.params.get('window_size')} to {new}")) + self.params.update(window_size=new) # Minimum threshold if self.window_size < self.window_size_minimum: diff --git a/fedot/core/repository/data/default_operation_params.json b/fedot/core/repository/data/default_operation_params.json index c3aeb5208c..7ebed0ffd4 100644 --- a/fedot/core/repository/data/default_operation_params.json +++ b/fedot/core/repository/data/default_operation_params.json @@ -50,7 +50,7 @@ "verbose": -1 }, "lagged": { - "window_size": 10 + "window_size": 0 }, "diff_filter": { "window_size": 3, diff --git a/fedot/utilities/window_size_selector.py b/fedot/utilities/window_size_selector.py new file mode 100644 index 0000000000..f60302c30c --- /dev/null +++ b/fedot/utilities/window_size_selector.py @@ -0,0 +1,250 @@ +import math +from enum import Enum, auto + +import numpy as np +import pandas as pd +from scipy.signal import find_peaks +from statsmodels.tsa.stattools import acf + + +class WindowSizeSelectorMethodsEnum(Enum): + DFF = auto() + HAC = auto() + MWF = auto() + SSS = auto() + + +class WindowSizeSelector: + """Class to select appropriate window size to catch periodicity for time series analysis. + There are two group of algorithms implemented: + Whole-Series-Based (WSB): + 1. WindowSizeSelectorMethodsEnum.HAC - highest_autocorrelation + 2. WindowSizeSelectorMethodsEnum.DFF - dominant_fourier_frequency + Subsequence-based (SB): + 1. WindowSizeSelectorMethodsEnum.MWF - multi_window_finder + 2. WindowSizeSelectorMethodsEnum.SSS - summary_statistics_subsequence + Args: + method: by ``default``, it is WindowSizeSelectorMethodsEnum.DFF. + window_range: % of time series length, by ``default`` it is (5, 50). + Attributes: + length_ts(int): length of the time_series. + window_max(int): maximum window size in real values. + window_min(int): minimum window size in real values. + dict_methods(dict): dictionary with all implemented methods. + Example: + To find window size for single time series:: + ts = np.random.rand(1000) + ws_selector = WindowSizeSelector(method='hac') + window_size = ws_selector.get_window_size(time_series=ts) + To find window size for multiple time series:: + ts = np.random.rand(1000, 10) + ws_selector = WindowSizeSelector(method='hac') + window_size = ws_selector.apply(time_series=ts, average='median') + Reference: + (c) "Windows Size Selection in Unsupervised Time Series Analytics: A Review and Benchmark. Arik Ermshaus, + Patrick Schafer, and Ulf Leser. 2022" + """ + + def __init__(self, + method: WindowSizeSelectorMethodsEnum = WindowSizeSelectorMethodsEnum.DFF, + window_range: tuple = (5, 50)): + + if window_range[0] >= window_range[1]: + raise ValueError('Upper bound of window range should be bigger than lower bound') + if window_range[0] < 0: + raise ValueError('Lower bound of window range should be bigger or equal to 0') + if window_range[1] > 100: + raise ValueError('Upper bound of window range should be lower or equal to 100') + + self.dict_methods = {WindowSizeSelectorMethodsEnum.HAC: self.autocorrelation, + WindowSizeSelectorMethodsEnum.DFF: self.dominant_fourier_frequency, + WindowSizeSelectorMethodsEnum.MWF: self.mwf, + WindowSizeSelectorMethodsEnum.SSS: self.summary_statistics_subsequence} + self.wss_algorithm = method + self.window_range = window_range + self.window_max = None + self.window_min = None + self.length_ts = None + + def apply(self, time_series: np.ndarray, average: str = 'median') -> int: + """Method to run WSS class over selected time series in parallel mode via joblib + Args: + time_series: time series to study + average: 'mean' or 'median' to average window size over all time series + Returns: + window_size_selected: value which has been chosen as appropriate window size + """ + methods = {'mean': np.mean, 'median': np.median} + if time_series.ndim == 1: + time_series = time_series.reshape((-1, 1)) + window_list = [self.get_window_size(time_series[:, i].ravel()) for i in range(time_series.shape[1])] + return round(methods[average](window_list)) + + def get_window_size(self, time_series: np.ndarray) -> int: + """Main function to run WSS class over selected time series + Note: + One of the reason of ValueError is that time series size can be equal or smaller than 50. + In case of it try to initially set window_size min and max. + Returns: + window_size_selected: value which has been chosen as appropriate window size + """ + self.length_ts = len(time_series) + + self.window_max = int(round(self.length_ts * self.window_range[1] / 100)) # in real values + self.window_min = int(round(self.length_ts * self.window_range[0] / 100)) # in real values + + window_size_selected = self.dict_methods[self.wss_algorithm](time_series=time_series) + window_size_selected = round(window_size_selected * 100 / self.length_ts) + window_size_selected = max(self.window_range[0], window_size_selected) + window_size_selected = min(self.window_range[1], window_size_selected) + return window_size_selected + + def dominant_fourier_frequency(self, time_series: np.ndarray) -> int: + """ + Method to find dominant fourier frequency in time series and return appropriate window size. It is based on + the assumption that the dominant frequency is the one with the highest magnitude in the Fourier transform. The + window size is then the inverse of the dominant frequency. + """ + fourier = np.fft.fft(time_series) + freq = np.fft.fftfreq(time_series.shape[0], 1) + + magnitudes, window_sizes = [], [] + + for coef, freq in zip(fourier, freq): + if coef and freq > 0: + window_size = int(1 / freq) + mag = math.sqrt(coef.real * coef.real + coef.imag * coef.imag) + + if self.window_min <= window_size < self.window_max: + window_sizes.append(window_size) + magnitudes.append(mag) + if window_sizes and magnitudes: + return window_sizes[np.argmax(magnitudes)] + else: + return self.window_min + + def autocorrelation(self, time_series: np.array) -> int: + """Method to find the highest autocorrelation in time series and return appropriate window size. It is based on + the assumption that the lag of highest autocorrelation coefficient corresponds to the window size that best + captures the periodicity of the time series. + """ + ts_len = time_series.shape[0] + acf_values = acf(time_series, fft=True, nlags=int(ts_len / 2)) + + peaks, _ = find_peaks(acf_values) + peaks = peaks[np.logical_and(peaks >= self.window_min, peaks < self.window_max)] + corrs = acf_values[peaks] + + if peaks.shape[0] == 0: # if there is no peaks in range (window_min, window_max) return window_min + return self.window_min + return peaks[np.argmax(corrs)] + + def mwf(self, time_series: np.array) -> int: + """ Method to find the window size that minimizes the moving average residual. It is based on the assumption + that the window size that best captures the periodicity of the time series is the one that minimizes the + difference between the moving average and the time series. + """ + + all_averages, window_sizes = [], [] + + for w in range(self.window_min, self.window_max, 1): + movingAvg = np.array(self.movmean(time_series, w)) + all_averages.append(movingAvg) + window_sizes.append(w) + + movingAvgResiduals = [] + + for i, w in enumerate(window_sizes): + moving_avg = all_averages[i][:len(all_averages[-1])] + movingAvgResidual = np.log(abs(moving_avg - (moving_avg).mean()).sum()) + movingAvgResiduals.append(movingAvgResidual) + + b = (np.diff(np.sign(np.diff(movingAvgResiduals))) > 0).nonzero()[0] + 1 # local min + + if len(b) == 0: + return self.window_min + if len(b) < 3: + return window_sizes[b[0]] + + reswin = np.array([window_sizes[b[i]] / (i + 1) for i in range(3)]) + w = np.mean(reswin) + + return int(w) + + def movmean(self, ts, w): + """Fast moving average function""" + moving_avg = np.cumsum(ts, dtype=float) + moving_avg[w:] = moving_avg[w:] - moving_avg[:-w] + return moving_avg[w - 1:] / w + + def summary_statistics_subsequence(self, time_series: np.array, threshold=.89) -> int: + """Method to find the window size that maximizes the subsequence unsupervised similarity score (SUSS). It is + based on the assumption that the window size that best captures the periodicity of the time series is the one + that maximizes the similarity between subsequences of the time series. + """ + # lbound = self.window_min + time_series = (time_series - time_series.min()) / (time_series.max() - time_series.min()) + + ts_mean = np.mean(time_series) + ts_std = np.std(time_series) + ts_min_max = np.max(time_series) - np.min(time_series) + + stats = (ts_mean, ts_std, ts_min_max) + + max_score = self.suss_score(time_series=time_series, window_size=1, stats=stats) + min_score = self.suss_score(time_series=time_series, window_size=time_series.shape[0] - 1, stats=stats) + + exp = 0 + + # exponential search (to find window size interval) + while True: + window_size = 2 ** exp + + if window_size > self.window_max: + break + + if window_size < self.window_min: + exp += 1 + continue + + score = 1 - (self.suss_score(time_series, window_size, stats) - min_score) / (max_score - min_score) + + if score > threshold: + break + + exp += 1 + + lbound, ubound = max(self.window_min, 2 ** (exp - 1)), 2 ** exp + 1 + + # binary search (to find window size in interval) + while lbound <= ubound: + window_size = int((lbound + ubound) / 2) + score = 1 - (self.suss_score(time_series, window_size, stats) - min_score) / (max_score - min_score) + + if score < threshold: + lbound = window_size + 1 + elif score > threshold: + ubound = window_size - 1 + else: + break + + return 2 * lbound + + def suss_score(self, time_series, window_size, stats): + roll = pd.Series(time_series).rolling(window_size) + ts_mean, ts_std, ts_min_max = stats + + roll_mean = roll.mean().to_numpy()[window_size:] + roll_std = roll.std(ddof=0).to_numpy()[window_size:] + roll_min = roll.min().to_numpy()[window_size:] + roll_max = roll.max().to_numpy()[window_size:] + + X = np.array([ + roll_mean - ts_mean, + roll_std - ts_std, + (roll_max - roll_min) - ts_min_max + ]) + + X = np.sqrt(np.sum(np.square(X), axis=0)) / np.sqrt(window_size) + + return np.mean(X) diff --git a/test/unit/data_operations/test_data_operation_params.py b/test/unit/data_operations/test_data_operation_params.py index 082037d743..303b2741c5 100644 --- a/test/unit/data_operations/test_data_operation_params.py +++ b/test/unit/data_operations/test_data_operation_params.py @@ -2,7 +2,6 @@ import numpy as np import pandas as pd -import pytest from fedot.core.data.data import InputData from fedot.core.data.data_split import train_test_data_setup @@ -54,8 +53,8 @@ def test_lagged_with_invalid_params_fit_correctly(): pipeline = get_ts_pipeline(window_size) # Fit it - with pytest.raises(ValueError): - pipeline.fit(ts_input) + pipeline.fit(ts_input) + assert 1 <= pipeline.nodes[-1].parameters['window_size'] <= len(time_series) - len_forecast def test_ransac_with_invalid_params_fit_correctly(): diff --git a/test/unit/data_operations/test_time_series_operations.py b/test/unit/data_operations/test_time_series_operations.py index 3c7b071fe1..39ad5e7851 100644 --- a/test/unit/data_operations/test_time_series_operations.py +++ b/test/unit/data_operations/test_time_series_operations.py @@ -1,7 +1,13 @@ +import logging + import numpy as np import pytest from fedot.core.data.data_split import train_test_data_setup +from fedot.core.optimisers.objective import MetricsObjective, PipelineObjectiveEvaluate +from fedot.core.optimisers.objective.data_source_splitter import DataSourceSplitter +from fedot.core.pipelines.pipeline_builder import PipelineBuilder +from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder from golem.core.log import default_log from fedot.core.data.data import InputData @@ -20,6 +26,25 @@ _FORECAST_LENGTH = 4 +def prepare_logging(): + old_factory = logging.getLogRecordFactory() + records = [] + + def record_factory(*args, **kwargs): + record = old_factory(*args, **kwargs) + records.append(record) + return record + + logging.setLogRecordFactory(record_factory) + return records + + +def check_window_size_selector_logging(records): + return ['LaggedTransformationImplementation' in str(record) and + 'WindowSizeSelector' in str(record) + for record in records] + + def synthetic_univariate_ts(): """ Method returns InputData for classical time series forecasting task """ task = Task(TaskTypesEnum.ts_forecasting, @@ -46,10 +71,15 @@ def synthetic_univariate_ts(): def get_timeseries(length=10, features_count=1, - target_count=1, forecast_length=_FORECAST_LENGTH): + target_count=1, forecast_length=_FORECAST_LENGTH, + random=False): task = Task(TaskTypesEnum.ts_forecasting, TsForecastingParams(forecast_length=forecast_length)) - features = np.arange(0, length * features_count) * 10 + if random: + features = np.random.rand(length, features_count) * 10 + features = features.ravel() if features_count == 1 else features + else: + features = np.arange(0, length * features_count) * 10 if features_count > 1: features = np.reshape(features, (features_count, length)).T for i in range(features_count): @@ -238,3 +268,93 @@ def test_lagged_node(length, features_count, target_count, window_size): predict = node.predict(test) assert np.all(predict.predict[-1, :] == np.reshape(test.features[-window_size:].T, (-1, ))) + + +def test_lagged_window_size_selector_tune_window_by_default(): + ts = get_timeseries(length=1000) + pipeline = PipelineBuilder().add_sequence('lagged', 'ridge').build() + origin_window_size = pipeline.nodes[-1].parameters['window_size'] + pipeline.fit(ts) + new_window_size = pipeline.nodes[-1].parameters['window_size'] + + assert origin_window_size != new_window_size + assert 0 < new_window_size < ts.features.shape[0] + + +@pytest.mark.parametrize('origin_window_size', [10, 20, 100]) +def test_lagged_window_size_selector_does_not_tune_defined_window(origin_window_size): + ts = get_timeseries(length=1000) + pipeline = (PipelineBuilder() + .add_node('lagged', params={'window_size': origin_window_size}) + .add_node('ridge').build()) + assert origin_window_size == pipeline.nodes[-1].parameters['window_size'] + pipeline.fit(ts) + assert origin_window_size == pipeline.nodes[-1].parameters['window_size'] + + +@pytest.mark.parametrize('window_size', [10, 20, 100]) +def test_lagged_window_size_selector_does_not_tune_manual_defined_window(window_size): + ts = get_timeseries(length=1000) + pipeline = PipelineBuilder().add_sequence('lagged', 'ridge').build() + pipeline.nodes[-1].parameters = {'window_size': window_size} + pipeline.fit(ts) + assert window_size == pipeline.nodes[-1].parameters['window_size'] + + +@pytest.mark.parametrize('freq', [5, 10, 20]) +def test_lagged_window_size_selector_adequate(freq): + ts = get_timeseries(length=1000) + time = np.linspace(0, 1, ts.features.shape[0]) + ts.features = np.sin(2 * np.pi * freq * time) + + pipeline = PipelineBuilder().add_sequence('lagged', 'ridge').build() + pipeline.fit(ts) + + window = pipeline.nodes[-1].parameters['window_size'] + expected_window = ts.features.shape[0] / (freq * 2) + + assert expected_window / 2 <= window <= expected_window * 2 + + +@pytest.mark.parametrize('n_jobs', (1, -1)) +def test_evaluation_correctly_work_with_window_size_selector(n_jobs): + ts = get_timeseries(length=1000) + data_splitter = DataSourceSplitter(cv_folds=3) + data_producer = data_splitter.build(ts) + objective = MetricsObjective('rmse', False) + objective_evaluator = PipelineObjectiveEvaluate(objective=objective, + data_producer=data_producer, + validation_blocks=data_splitter.validation_blocks, + eval_n_jobs=n_jobs) + objective_function = objective_evaluator.evaluate + + pipeline = PipelineBuilder().add_sequence('lagged', 'ridge').build() + + # prepare factory to get all records + records = prepare_logging() + + # run objective function + objective_function(pipeline) + + # check that WindowSizeSelector runs once + assert sum(check_window_size_selector_logging(records)) == 1 + + +def test_tuner_correctly_work_with_window_size_selector(): + ts = get_timeseries(length=1000, random=True) + + autotuned_pipeline = PipelineBuilder().add_sequence('lagged', 'ridge').build() + autotuned_pipeline.fit(ts) + autotuned_window = autotuned_pipeline.nodes[-1].parameters['window_size'] + + # prepare factory to get all records + records = prepare_logging() + + tuner_tuned_pipeline = PipelineBuilder().add_sequence('lagged', 'ridge').build() + tuner = TunerBuilder(task=ts.task).with_iterations(10).build(data=ts) + tuned_pipeline = tuner.tune(graph=tuner_tuned_pipeline, show_progress=False) + tuner_tuned_window = tuned_pipeline.nodes[-1].parameters['window_size'] + + assert autotuned_window != tuner_tuned_window + # check that WindowSizeSelector runs twice due to tuner graph copying in initialization + assert sum(check_window_size_selector_logging(records)) == 2 diff --git a/test/unit/optimizer/gp_operators/test_mutation.py b/test/unit/optimizer/gp_operators/test_mutation.py index 8cbdf06a82..d090c47477 100644 --- a/test/unit/optimizer/gp_operators/test_mutation.py +++ b/test/unit/optimizer/gp_operators/test_mutation.py @@ -131,8 +131,6 @@ def test_boosting_mutation_for_non_lagged_ts_model(): """ graph = PipelineAdapter().restore(get_ts_forecasting_graph()) - - boosting_graph = get_ts_forecasting_graph_with_boosting() requirements = PipelineComposerRequirements(primary=['ridge'], secondary=['ridge']) pipeline = boosting_mutation(graph, @@ -143,7 +141,11 @@ def test_boosting_mutation_for_non_lagged_ts_model(): data_train, data_test = get_ts_data() pipeline.fit(data_train) result = pipeline.predict(data_test) - assert boosting_graph.descriptive_id == pipeline.descriptive_id + + boosting_pipeline = PipelineAdapter().restore(get_ts_forecasting_graph_with_boosting()) + boosting_pipeline.fit(data_train) + + assert boosting_pipeline.descriptive_id == pipeline.descriptive_id assert result is not None diff --git a/test/unit/utilities/window_size_selector.py b/test/unit/utilities/window_size_selector.py new file mode 100644 index 0000000000..9dd9c5136c --- /dev/null +++ b/test/unit/utilities/window_size_selector.py @@ -0,0 +1,29 @@ +from itertools import combinations + +import numpy as np +import pytest + +from fedot.utilities.window_size_selector import WindowSizeSelector, WindowSizeSelectorMethodsEnum + + +@pytest.mark.parametrize('method', WindowSizeSelectorMethodsEnum) +@pytest.mark.parametrize(['window_min', 'window_max'], + [sorted(x) for x in combinations(map(int, np.random.rand(5) * 100), 2)] + + [(1, 2), (98, 99), (1, 99)]) +def test_window_size_selector(method, window_min, window_max): + selector = WindowSizeSelector(method=method, window_range=(window_min, window_max)) + ts = np.random.rand(1000) + + assert window_min <= selector.apply(time_series=ts) <= window_max + + +@pytest.mark.parametrize(['window_min', 'window_max'], + list(combinations(map(int, np.random.rand(10) * 200 - 50), 2)) + + [[-1, 10], [10, 5], [95, 105], [-10, -9], [105, 110]]) +def test_window_size_selector_with_uncorrect_window_params(window_min, window_max): + error = window_min < 0 + error |= window_max > 100 + error |= window_min >= window_max + if error: + with pytest.raises(ValueError): + WindowSizeSelector(window_range=(window_min, window_max))