diff --git a/examples/advanced/additional_learning.py b/examples/advanced/additional_learning.py index 0d40a56a23..fdd3023258 100644 --- a/examples/advanced/additional_learning.py +++ b/examples/advanced/additional_learning.py @@ -4,7 +4,7 @@ import pandas as pd from fedot import Fedot -from fedot.core.operations.atomized_model import AtomizedModel +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline from fedot.core.pipelines.pipeline_builder import PipelineBuilder diff --git a/fedot/api/api_utils/api_params_repository.py b/fedot/api/api_utils/api_params_repository.py index e1626db0b1..4712580bb2 100644 --- a/fedot/api/api_utils/api_params_repository.py +++ b/fedot/api/api_utils/api_params_repository.py @@ -1,6 +1,10 @@ import datetime from typing import Sequence +from fedot.core.optimisers.genetic_operators.crossover import fedot_subtree_crossover, fedot_one_point_crossover +from fedot.core.optimisers.genetic_operators.mutation import fedot_single_edge_mutation, fedot_single_add_mutation, \ + fedot_single_change_mutation, fedot_single_drop_mutation, insert_atomized_operation, fedot_tree_growth +from golem.core.optimisers.genetic.operators.crossover import CrossoverTypesEnum from golem.core.optimisers.genetic.operators.inheritance import GeneticSchemeTypesEnum from golem.core.optimisers.genetic.operators.mutation import MutationTypesEnum @@ -118,6 +122,7 @@ def get_params_for_gp_algorithm_params(self, params: dict) -> dict: gp_algorithm_params['genetic_scheme_type'] = GeneticSchemeTypesEnum.steady_state gp_algorithm_params['mutation_types'] = ApiParamsRepository._get_default_mutations(self.task_type, params) + gp_algorithm_params['crossover_types'] = ApiParamsRepository._get_default_crossovers(self.task_type, params) return gp_algorithm_params @staticmethod @@ -128,14 +133,41 @@ def _get_default_mutations(task_type: TaskTypesEnum, params) -> Sequence[Mutatio MutationTypesEnum.single_add, MutationTypesEnum.single_edge] - # TODO remove workaround after boosting mutation fix - # Boosting mutation does not work due to problem with __eq__ with it copy. - # ``partial`` refactor to ``def`` does not work - # Also boosting mutation does not work by it own. if task_type == TaskTypesEnum.ts_forecasting: + # TODO remove workaround after boosting mutation fix + # Boosting mutation does not work due to problem with __eq__ with it copy. + # ``partial`` refactor to ``def`` does not work + # Also boosting mutation does not work by it own. # mutations.append(partial(boosting_mutation, params=params)) - pass + + # TODO remove when tests will ends + # add for testing purpose + mutations = [parameter_change_mutation, + fedot_single_edge_mutation, + fedot_single_add_mutation, + fedot_single_change_mutation, + fedot_single_drop_mutation, + insert_atomized_operation] + # mutations = [fedot_single_edge_mutation, + # fedot_single_add_mutation, + # fedot_single_change_mutation, + # fedot_single_drop_mutation, + # fedot_tree_growth, + # insert_atomized_operation] + # mutations = [fedot_single_add_mutation, + # fedot_single_change_mutation, + # fedot_single_drop_mutation, + # insert_atomized_operation] else: mutations.append(add_resample_mutation) return mutations + + @staticmethod + def _get_default_crossovers(task_type: TaskTypesEnum, params) -> Sequence[MutationTypesEnum]: + if task_type == TaskTypesEnum.ts_forecasting: + crossovers = [fedot_subtree_crossover, fedot_one_point_crossover] + crossovers = [fedot_one_point_crossover] + else: + crossovers = [CrossoverTypesEnum.one_point] + return crossovers diff --git a/fedot/api/api_utils/assumptions/task_assumptions.py b/fedot/api/api_utils/assumptions/task_assumptions.py index 00aea28ca0..55a2adc1eb 100644 --- a/fedot/api/api_utils/assumptions/task_assumptions.py +++ b/fedot/api/api_utils/assumptions/task_assumptions.py @@ -2,10 +2,13 @@ from typing import List from fedot.api.api_utils.assumptions.operations_filter import OperationsFilter +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline from fedot.core.pipelines.pipeline_builder import PipelineBuilder from fedot.core.repository.operation_types_repository import OperationTypesRepository from fedot.core.repository.tasks import Task, TaskTypesEnum from fedot.utilities.custom_errors import AbstractMethodNotImplementError +from golem.core.optimisers.opt_graph_builder import OptGraphBuilder class TaskAssumptions: @@ -64,9 +67,17 @@ def builders(self): .add_branch('polyfit', 'lagged') .grow_branches(None, 'ridge') .join_branches('ridge'), - 'smoothing_ar': + 'atomized_ts_differ': PipelineBuilder() - .add_sequence('smoothing', 'ar'), + .add_branch('lagged', 'lagged') + .add_node('ridge', branch_idx=0) + .add_node('atomized_ts_differ', branch_idx=1, params={'pipeline': PipelineBuilder() + .add_node('ridge').build()}) + .join_branches('ridge'), + 'atomized_ts_to_time': + PipelineBuilder() + .add_sequence('lagged', 'ridge') + .add_node('atomized_ts_to_time', params={'pipeline': PipelineBuilder().add_node('ridge').build()}), } def ensemble_operation(self) -> str: diff --git a/fedot/core/data/supplementary_data.py b/fedot/core/data/supplementary_data.py index 77943a28e6..56f719adc6 100644 --- a/fedot/core/data/supplementary_data.py +++ b/fedot/core/data/supplementary_data.py @@ -1,5 +1,5 @@ -from dataclasses import dataclass -from typing import Dict, Optional +from dataclasses import dataclass, field +from typing import Dict, Optional, List import numpy as np @@ -31,6 +31,8 @@ class SupplementaryData: col_type_ids: Optional[Dict[str, np.ndarray]] = None # Was the data preprocessed before composer is_auto_preprocessed: bool = False + # time series bias for time series forecasting problem + time_series_bias: List[np.ndarray] = field(default_factory=list) @property def compound_mask(self): diff --git a/fedot/core/operations/atomized.py b/fedot/core/operations/atomized.py new file mode 100644 index 0000000000..5b7925d470 --- /dev/null +++ b/fedot/core/operations/atomized.py @@ -0,0 +1,14 @@ +from fedot.core.operations.model import Model +from fedot.core.repository.operation_types_repository import OperationTypesRepository + + +class Atomized(Model): + """ + Class with fit/predict methods defining the atomized strategy for the task + + :param operation_type: name of the model + """ + + def __init__(self, operation_type: str): + super().__init__(operation_type=operation_type) + self.operations_repo = OperationTypesRepository('atomized') diff --git a/fedot/core/operations/atomized_model/__init__.py b/fedot/core/operations/atomized_model/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model/atomized_model.py similarity index 82% rename from fedot/core/operations/atomized_model.py rename to fedot/core/operations/atomized_model/atomized_model.py index 3fce78bc22..b08245c169 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model/atomized_model.py @@ -1,18 +1,11 @@ from collections import Counter -from datetime import timedelta from functools import reduce from operator import and_, or_ from typing import Any, Callable, Dict, List, Optional, Set, Union -from golem.core.tuning.simultaneous import SimultaneousTuner - from fedot.core.data.data import InputData, OutputData from fedot.core.operations.operation import Operation from fedot.core.operations.operation_parameters import OperationParameters -from fedot.core.pipelines.node import PipelineNode -from fedot.core.pipelines.pipeline import Pipeline -from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder -from fedot.core.repository.metrics_repository import MetricCallable from fedot.core.repository.operation_types_repository import OperationMetaInfo, atomized_model_type @@ -26,6 +19,11 @@ def __init__(self, pipeline: 'Pipeline'): super().__init__(operation_type=atomized_model_type()) self.pipeline = pipeline + @property + def acceptable_task_types(self): + root_operation = self.pipeline.root_node.operation + return root_operation.acceptable_task_types + def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData) -> ('Pipeline', OutputData): predicted_train = self.pipeline.fit(input_data=data) fitted_atomized_operation = self.pipeline @@ -50,27 +48,31 @@ def predict_for_fit(self, return self.predict(fitted_operation, data, params, output_mode) def fine_tune(self, - metric_function: MetricCallable, + metric_function: 'MetricCallable', input_data: Optional[InputData] = None, iterations: int = 50, timeout: int = 5) -> 'AtomizedModel': """ Method for tuning hyperparameters """ - tuner = TunerBuilder(input_data.task) \ - .with_tuner(SimultaneousTuner) \ - .with_metric(metric_function) \ - .with_iterations(iterations) \ - .with_timeout(timedelta(minutes=timeout)) \ - .build(input_data) - tuned_pipeline = tuner.tune(self.pipeline) - tuned_atomized_model = AtomizedModel(tuned_pipeline) - return tuned_atomized_model + # TODO Fix tuner with atomized model + # cannot be made by that way due to problem with circular import + # TODO add tests for atomized tuning + # origin test was removed + # tuner = TunerBuilder(input_data.task) \ + # .with_tuner(SimultaneousTuner) \ + # .with_metric(metric_function) \ + # .with_iterations(iterations) \ + # .with_timeout(timedelta(minutes=timeout)) \ + # .build(input_data) + # tuned_pipeline = tuner.tune(self.pipeline) + # tuned_atomized_model = AtomizedModel(tuned_pipeline) + # return tuned_atomized_model @property def metadata(self) -> OperationMetaInfo: root_node = self.pipeline.root_node def extract_metadata_from_pipeline(attr_name: str, - node_filter: Optional[Callable[[PipelineNode], bool]] = None, + node_filter: Optional[Callable[['PipelineNode'], bool]] = None, reduce_function: Optional[Callable[[Set], Set]] = None) -> List[Any]: """ Extract metadata from atomized pipeline :param attr_name: extracting metadata property @@ -110,8 +112,8 @@ def description(self, operation_params: Optional[dict] = None) -> str: operation_types = map(lambda node: node.operation.operation_type, self.pipeline.nodes) operation_types_dict = dict(Counter(operation_types)) - return f'{operation_type}_length:{operation_length}_depth:{operation_depth}' \ - f'_types:{operation_types_dict}_id:{operation_id}' + return (f'{self.__class__}({operation_type}_length:{operation_length}_depth:{operation_depth}' + f'_types:{operation_types_dict}_id:{operation_id})') @staticmethod def assign_tabular_column_types(output_data: OutputData, diff --git a/fedot/core/operations/atomized_template.py b/fedot/core/operations/atomized_model/atomized_template.py similarity index 97% rename from fedot/core/operations/atomized_template.py rename to fedot/core/operations/atomized_model/atomized_template.py index 98c5f915a7..8f75a3aea9 100644 --- a/fedot/core/operations/atomized_template.py +++ b/fedot/core/operations/atomized_model/atomized_template.py @@ -10,7 +10,7 @@ def __init__(self, node: PipelineNode = None, operation_id: int = None, nodes_fr # Need use the imports inside the class because of the problem of circular imports. from fedot.core.pipelines.pipeline import Pipeline from fedot.core.pipelines.template import PipelineTemplate - from fedot.core.operations.atomized_model import AtomizedModel + from fedot.core.operations.atomized_model.atomized_model import AtomizedModel super().__init__() self.atomized_model_json_path = None diff --git a/fedot/core/operations/evaluation/atomized.py b/fedot/core/operations/evaluation/atomized.py new file mode 100644 index 0000000000..55163f3173 --- /dev/null +++ b/fedot/core/operations/evaluation/atomized.py @@ -0,0 +1,54 @@ +import warnings +from typing import Optional + +from fedot.core.data.data import InputData, OutputData +from fedot.core.operations.evaluation.evaluation_interfaces import EvaluationStrategy, SkLearnEvaluationStrategy +from fedot.core.operations.evaluation.operation_implementations.data_operations.decompose \ + import DecomposerRegImplementation +from fedot.core.operations.evaluation.operation_implementations.data_operations.sklearn_filters \ + import IsolationForestRegImplementation +from fedot.core.operations.evaluation.operation_implementations. \ + data_operations.sklearn_filters import LinearRegRANSACImplementation, NonLinearRegRANSACImplementation +from fedot.core.operations.evaluation.operation_implementations. \ + data_operations.sklearn_selectors import LinearRegFSImplementation, NonLinearRegFSImplementation +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_decompose import \ + AtomizedTimeSeriesDecomposer +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_differ import \ + AtomizedTimeSeriesDiffer +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_sampler import \ + AtomizedTimeSeriesSampler +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_scaler import \ + AtomizedTimeSeriesScaler +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_transform_to_time import \ + AtomizedTimeSeriesToTime +from fedot.core.operations.evaluation.operation_implementations.models.knn import FedotKnnRegImplementation +from fedot.core.operations.operation_parameters import OperationParameters +from fedot.utilities.random import ImplementationRandomStateHandler + +warnings.filterwarnings("ignore", category=UserWarning) + + +class FedotAtomizedStrategy(EvaluationStrategy): + _operations_by_types = { + 'atomized_ts_differ': AtomizedTimeSeriesDiffer, + 'atomized_ts_scaler': AtomizedTimeSeriesScaler, + 'atomized_ts_sampler': AtomizedTimeSeriesSampler, + 'atomized_ts_to_time': AtomizedTimeSeriesToTime, + 'atomized_ts_decomposer': AtomizedTimeSeriesDecomposer, + } + + def __init__(self, operation_type: str, params: Optional[OperationParameters] = None): + self.operation_impl = self._convert_to_operation(operation_type) + super().__init__(operation_type, params) + + def fit(self, train_data: InputData): + model = self.operation_impl(self.params_for_fit.get('pipeline')) + return model.fit(train_data) + + def predict(self, trained_operation, predict_data: InputData) -> OutputData: + prediction = trained_operation.predict(predict_data) + return prediction + + def predict_for_fit(self, trained_operation, predict_data: InputData) -> OutputData: + prediction = trained_operation.predict_for_fit(predict_data) + return prediction diff --git a/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py b/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py index 614c52bb2a..0428c5ed4c 100644 --- a/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py +++ b/fedot/core/operations/evaluation/operation_implementations/data_operations/ts_transformations.py @@ -3,6 +3,8 @@ import numpy as np import pandas as pd + +from fedot.utilities.window_size_selector import WindowSizeSelector from golem.core.log import default_log from scipy.ndimage import gaussian_filter from sklearn.decomposition import TruncatedSVD @@ -115,6 +117,11 @@ def _check_and_correct_window_size(self, time_series: np.array, forecast_length: """ + if self.params.get('autotune_window', 0) == 1: + new = int(WindowSizeSelector(method='hac', window_range=(5, 25)) + .get_window_size(time_series) * len(time_series) / 100) + self.params.update(window_size=new, autotune_window=0) + # Maximum threshold if self.window_size + forecast_length > len(time_series): raise ValueError(f"Window size is to high ({self.window_size}) for provided data len {len(time_series)}") diff --git a/fedot/core/operations/evaluation/operation_implementations/models/atomized/__init__.py b/fedot/core/operations/evaluation/operation_implementations/models/atomized/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_decompose.py b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_decompose.py new file mode 100644 index 0000000000..31055847fe --- /dev/null +++ b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_decompose.py @@ -0,0 +1,43 @@ +from typing import Optional + +from fedot.core.data.data import InputData, OutputData +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_mixins import \ + AtomizedTimeSeriesBuildFactoriesMixin +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline + + +class AtomizedTimeSeriesDecomposer(AtomizedTimeSeriesBuildFactoriesMixin): + def __init__(self, pipeline: Optional['Pipeline'] = None): + if pipeline is None: + pipeline = Pipeline(PipelineNode('ridge')) + self.pipeline = pipeline + + def _decompose(self, data: InputData, fit_stage: bool): + # get merged data from lagged and any model + forecast_length = data.task.task_params.forecast_length + data_from_lagged = data.features[:, :-forecast_length] + data_from_model = data.features[:, -forecast_length:] + new_target = data.target + if fit_stage: + new_target -= data_from_model + + new_data = InputData(idx=data.idx, + features=data_from_lagged, + target=new_target, + data_type=data.data_type, + task=data.task, + supplementary_data=data.supplementary_data) + return new_data + + def fit(self, data: InputData): + new_data = self._decompose(data, fit_stage=True) + self.pipeline.fit(new_data) + return self + + def predict(self, data: InputData) -> OutputData: + new_data = self._decompose(data, fit_stage=False) + return self.pipeline.predict(new_data) + + def predict_for_fit(self, data: InputData) -> OutputData: + return self.predict(data) diff --git a/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_differ.py b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_differ.py new file mode 100644 index 0000000000..0c2e0b4eff --- /dev/null +++ b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_differ.py @@ -0,0 +1,65 @@ +from typing import Optional + +import numpy as np + +from fedot.core.data.data import InputData, OutputData +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_mixins import \ + AtomizedTimeSeriesBuildFactoriesMixin +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.repository.tasks import TaskTypesEnum + + +class AtomizedTimeSeriesDiffer(AtomizedTimeSeriesBuildFactoriesMixin): + """ Get diff of timeseries, train model/forecast, integrate result """ + + def __init__(self, pipeline: Optional['Pipeline'] = None): + if pipeline is None: + pipeline = Pipeline(PipelineNode('ridge')) + self.pipeline = pipeline + + def _diff(self, data: InputData, fit_stage: bool): + new_features = np.diff(data.features, axis=1) + bias = data.features[:, -1:] + + if fit_stage: + target = data.target + if target.ndim == 1: + target = target.reshape((1, -1)) + + new_target = np.diff(np.concatenate([bias, target], axis=1), axis=1) + else: + new_target = data.target + + supplementary_data = data.supplementary_data + # supplementary_data.time_series_bias.append(bias) + + new_data = InputData(idx=data.idx, + features=new_features, + target=new_target, + task=data.task, + data_type=data.data_type, + supplementary_data=supplementary_data) + return new_data, bias + + def fit(self, data: InputData): + # TODO is there need for unfit? + if data.task.task_type is not TaskTypesEnum.ts_forecasting: + raise ValueError(f"{self.__class__} supports only time series forecasting task") + data, _ = self._diff(data, fit_stage=True) + self.pipeline.fit(data) + return self + + def predict(self, data: InputData) -> OutputData: + new_data, bias = self._diff(data, fit_stage=False) + prediction = self.pipeline.predict(new_data) + prediction.predict = np.cumsum(prediction.predict.reshape((bias.shape[0], -1)), axis=1) + bias + + prediction.idx = data.idx + prediction.target = data.target + if prediction.target is not None: + prediction.predict = np.reshape(prediction.predict, prediction.target.shape) + return prediction + + def predict_for_fit(self, data: InputData) -> OutputData: + return self.predict(data) diff --git a/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_mixins.py b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_mixins.py new file mode 100644 index 0000000000..ddbed9eddf --- /dev/null +++ b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_mixins.py @@ -0,0 +1,14 @@ +from fedot.core.pipelines.pipeline_node_factory import PipelineOptNodeFactory +from fedot.core.pipelines.random_pipeline_factory import RandomPipelineFactory +from fedot.core.repository.pipeline_operation_repository import PipelineOperationRepository + + +class AtomizedTimeSeriesBuildFactoriesMixin: + """ Add `build_factories` method that delete primary nodes from node factory """ + @classmethod + def build_factories(cls, requirements, graph_generation_params): + graph_model_repository = PipelineOperationRepository(operations_by_keys={'primary': requirements.secondary, + 'secondary': requirements.secondary}) + node_factory = PipelineOptNodeFactory(requirements, graph_generation_params.advisor, graph_model_repository) + random_pipeline_factory = RandomPipelineFactory(graph_generation_params.verifier, node_factory) + return node_factory, random_pipeline_factory \ No newline at end of file diff --git a/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_sampler.py b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_sampler.py new file mode 100644 index 0000000000..7823d34616 --- /dev/null +++ b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_sampler.py @@ -0,0 +1,96 @@ +from typing import Union, Optional, Any, Dict + +import numpy as np + +from fedot.core.data.data import InputData, OutputData +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel +from fedot.core.operations.operation_parameters import OperationParameters +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.repository.tasks import TaskTypesEnum + + +class AtomizedTimeSeriesSampler(AtomizedModel): + """ Increase data for fitting for short time series """ + + operation_type = 'atomized_ts_sampler' + + def __init__(self, pipeline: Optional['Pipeline'] = None, mode='sparse'): + if pipeline is None: + pipeline = Pipeline(PipelineNode('ridge')) + super().__init__(pipeline=pipeline) + + self.mode = mode + + def _sample(self, data: InputData): + # TODO refactor + if self.mode == 'sparse': + features = data.features + if features.shape[1] % 2 == 1: + features = features[:, 1:] + new_features = np.concatenate([features[:, ::2], + features[:, 1::2]], axis=0) + + new_target = data.target + if new_target is not None: + if new_target.ndim == 1: + target = new_target.reshape(1, -1) + new_target = np.concatenate([new_target, new_target], axis=0) + else: + raise ValueError(f"Unknown mode {self.mode}") + + new_data = InputData(idx=np.arange(new_features.shape[0]), + features=new_features, + target=new_target, + task=data.task, + data_type=data.data_type, + supplementary_data=data.supplementary_data) + return new_data + + def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData): + if data.task.task_type is not TaskTypesEnum.ts_forecasting: + raise ValueError(f"{self.__class__} supports only time series forecasting task") + + new_data = self._sample(data) + return super().fit(params, new_data) + + def _sample_predict(self, + fitted_operation: 'Pipeline', + data: InputData, + params: Optional[Union[OperationParameters, Dict[str, Any]]] = None, + output_mode: str = 'default') -> OutputData: + # TODO refactor + new_data = self._sample(data) + predictions = list() + for i in range(new_data.features.shape[0]): + new_data1 = InputData(idx=new_data.idx, + features=new_data.features[i, :].reshape((1, -1)), + target=new_data.target[i, :] if new_data.target is not None else new_data.target, + task=new_data.task, + data_type=new_data.data_type, + supplementary_data=new_data.supplementary_data) + prediction1 = super().predict(fitted_operation=fitted_operation, + data=new_data1, + params=params, + output_mode=output_mode) + predictions.append(prediction1) + + predicts = list() + limit = int(new_data.features.shape[0] // 2) + for i in range(limit): + predicts.append((predictions[i].predict + predictions[i + limit].predict) * 0.5) + predict = np.concatenate(predicts, axis=0) + predict = OutputData(idx=data.idx, + features=data.features, + target=data.target, + predict=predict, + task=data.task, + data_type=data.data_type, + supplementary_data=data.supplementary_data) + return predict + + def predict(self, *args, **kwargs) -> OutputData: + return self._sample_predict(*args, **kwargs) + + def predict_for_fit(self, *args, **kwargs) -> OutputData: + return self._sample_predict(*args, **kwargs) diff --git a/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_scaler.py b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_scaler.py new file mode 100644 index 0000000000..69e6a60429 --- /dev/null +++ b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_scaler.py @@ -0,0 +1,67 @@ +from typing import Union, Optional, Any, Dict + +from fedot.core.data.data import InputData, OutputData +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel +from fedot.core.operations.operation_parameters import OperationParameters +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.repository.tasks import TaskTypesEnum + + +class AtomizedTimeSeriesScaler(AtomizedModel): + """ Add bias to data in window """ + # TODO refactor with any sklearn scaler + + operation_type = 'atomized_ts_scaler' + + def __init__(self, pipeline: Optional['Pipeline'] = None): + if pipeline is None: + pipeline = Pipeline(PipelineNode('ridge')) + super().__init__(pipeline=pipeline) + + def _scale(self, data: InputData, fit_stage: bool): + new_features = (data.features - data.features[:, :1])[:, 1:] + + target_bias = data.features[:, -1:] + if fit_stage: + new_target = data.target - target_bias + else: + new_target = data.target + + supplementary_data = data.supplementary_data + supplementary_data.time_series_bias.append(target_bias) + + new_data = InputData(idx=data.idx, + features=new_features, + target=new_target, + task=data.task, + data_type=data.data_type, + supplementary_data=supplementary_data) + return new_data + + def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData): + if data.task.task_type is not TaskTypesEnum.ts_forecasting: + raise ValueError(f"{self.__class__} supports only time series forecasting task") + return super().fit(params, self._scale(data, fit_stage=True)) + + def _sample_predict(self, + fitted_operation: 'Pipeline', + data: InputData, + params: Optional[Union[OperationParameters, Dict[str, Any]]] = None, + output_mode: str = 'default') -> OutputData: + new_data = self._scale(data, fit_stage=False) + prediction = super().predict(fitted_operation=fitted_operation, + data=new_data, + params=params, + output_mode=output_mode) + bias = prediction.supplementary_data.time_series_bias.pop() + new_predict = prediction.predict.reshape((bias.shape[0], -1)) + bias + new_predict = new_predict.reshape(prediction.predict.shape) + prediction.predict = new_predict + return prediction + + def predict(self, *args, **kwargs) -> OutputData: + return self._sample_predict(*args, **kwargs) + + def predict_for_fit(self, *args, **kwargs) -> OutputData: + return self._sample_predict(*args, **kwargs) diff --git a/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_transform_to_time.py b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_transform_to_time.py new file mode 100644 index 0000000000..83f035506a --- /dev/null +++ b/fedot/core/operations/evaluation/operation_implementations/models/atomized/atomized_ts_transform_to_time.py @@ -0,0 +1,83 @@ +from typing import Optional + +import numpy as np + +from fedot.core.data.data import InputData, OutputData +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_mixins import \ + AtomizedTimeSeriesBuildFactoriesMixin +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.repository.dataset_types import DataTypesEnum +from fedot.core.repository.tasks import TaskTypesEnum, Task + + +class AtomizedTimeSeriesToTime(AtomizedTimeSeriesBuildFactoriesMixin): + """ Predict based on time not on time series """ + + def __init__(self, pipeline: Optional['Pipeline'] = None): + if pipeline is None: + pipeline = Pipeline(PipelineNode('ridge')) + self.pipeline = pipeline + + def _convert_task(self, data: InputData, fit_stage: bool): + # TODO add test that model correctly transform data + forecast_length = data.task.task_params.forecast_length + if fit_stage: + target_ts = np.concatenate([data.target[0, :].ravel(), data.target[1:, -1].ravel()]) + else: + target_ts = None + + dt = data.idx[1] - data.idx[0] + time = np.concatenate([np.arange(data.idx[0] - (data.features.shape[1] - 1) * dt, data.idx[0], dt), + data.idx, + np.arange(data.idx[-1], data.idx[-1] + forecast_length * dt, dt)]) + features = data.features + if features.shape[1] < forecast_length: + # previous model is ts-to-table + # do not use that data + previous_forecast = 0 + elif features.shape[1] == forecast_length: + # previous model is the only table-to-table + previous_forecast = np.concatenate([data.features[0, :].ravel(), data.features[1:, -1].ravel()]) + elif features.shape[1] % forecast_length == 0: + # previous models are some models of type table-to-table + features = np.mean(np.reshape(features, + (features.shape[0], forecast_length, -1), + order='F'), axis=2) + previous_forecast = np.concatenate([features[0, :].ravel(), features[1:, -1].ravel()]) + else: + raise ValueError('Previous nodes types cannot be defined') + + if fit_stage: + new_target = (target_ts - previous_forecast).reshape((-1, 1)) + else: + new_target = None + + predict_length = forecast_length + features.shape[0] - 1 + new_data = InputData(idx=time[-predict_length:], + features=time[-predict_length:].reshape((-1, 1)), + target=new_target, + data_type=DataTypesEnum.table, + task=Task(TaskTypesEnum.regression)) + return new_data, previous_forecast + + def fit(self, data: InputData): + new_data, previous_forecast = self._convert_task(data, fit_stage=True) + self.pipeline.fit(new_data) + return self + + def predict(self, data: InputData) -> OutputData: + new_data, previous_forecast = self._convert_task(data, fit_stage=False) + prediction = self.pipeline.predict(new_data) + predict = prediction.predict.ravel() + previous_forecast + + prediction.idx = data.idx + prediction.target = data.target + if prediction.target is not None: + window = data.target.shape[1] + predict = np.array([predict[i:window + i] for i in range(predict.shape[0] - window + 1)]) + prediction.predict = predict + return prediction + + def predict_for_fit(self, data: InputData) -> OutputData: + return self.predict(data) diff --git a/fedot/core/operations/factory.py b/fedot/core/operations/factory.py index d98ef820bf..2bed084c1e 100644 --- a/fedot/core/operations/factory.py +++ b/fedot/core/operations/factory.py @@ -1,3 +1,4 @@ +from fedot.core.operations.atomized import Atomized from fedot.core.operations.automl import AutoML from fedot.core.operations.data_operation import DataOperation from fedot.core.operations.model import Model @@ -15,7 +16,9 @@ class OperationFactory: def __init__(self, operation_name): self.operation_name = operation_name - self.operation_type = self._define_operation_type() + self.operation_type = (OperationTypesRepository('all') + .operation_info_by_id(self.operation_name) + .repository_name) def get_operation(self) -> Operation: """ @@ -30,39 +33,9 @@ def get_operation(self) -> Operation: operation = DataOperation(operation_type=self.operation_name) elif self.operation_type == 'automl': operation = AutoML(operation_type=self.operation_name) + elif self.operation_type == 'atomized': + operation = Atomized(operation_type=self.operation_name) else: raise ValueError(f'Operation type {self.operation_type} is not supported') return operation - - @property - def operation_type_name(self): - return self.operation_type - - def _define_operation_type(self) -> str: - """ - The method determines what type of operations is set for this node - - :return : operations type 'model', 'automl' or 'data_operation' - """ - - # Get available models from model_repository.json file - operations_repo = OperationTypesRepository('data_operation') - operations = operations_repo.operations - if 'automl' in OperationTypesRepository.get_available_repositories(): - automl_repo = OperationTypesRepository('automl') - models_automl = automl_repo.operations - else: - models_automl = [] - - operation_name = get_operation_type_from_id(self.operation_name) - - # If there is a such model in the list - if any(operation_name == model.id for model in operations): - operation_type = 'data_operation' - elif any(operation_name == model.id for model in models_automl): - operation_type = 'automl' - # Otherwise - it is model - else: - operation_type = 'model' - return operation_type diff --git a/fedot/core/optimisers/genetic_operators/__init__.py b/fedot/core/optimisers/genetic_operators/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/fedot/core/optimisers/genetic_operators/atomized_operators_wrapper.py b/fedot/core/optimisers/genetic_operators/atomized_operators_wrapper.py new file mode 100644 index 0000000000..a0c6b5f227 --- /dev/null +++ b/fedot/core/optimisers/genetic_operators/atomized_operators_wrapper.py @@ -0,0 +1,38 @@ +from typing import Dict + +from golem.core.optimisers.graph import OptGraph + + +def extract_graphs_from_atomized(graph: OptGraph) -> Dict[str, OptGraph]: + """ Get all graphs from graph with atomized nodes + Return dict with key as node uid (where graph is stored in atomized models) + and values as graphs """ + graphs = {'': graph} + for node in graph.nodes: + if 'pipeline' in node.parameters: + extracted_graphs = extract_graphs_from_atomized(node.parameters['pipeline']) + for k, v in extracted_graphs.items(): + graphs[k or node.uid] = v + return graphs + + +def insert_graphs_to_atomized(full_graph: OptGraph, node_uid: str, graph: OptGraph) -> OptGraph: + """ Insert graph to full_graph with atomized model in node with uid node_uid """ + if node_uid == '': + full_graph = graph + else: + full_graph = full_graph + # look for node with uid == node_uid + nodes = full_graph.nodes[:] + while nodes: + node = nodes.pop() + if node.uid == node_uid: + break + if 'pipeline' in node.content['params']: + nodes.extend(node.content['params']['pipeline'].nodes) + else: + raise ValueError(f"Unknown node uid: {node_uid}") + if 'pipeline' not in node.content['params']: + raise ValueError(f"Cannot insert graph to non atomized model") + node.content['params']['pipeline'] = graph + return full_graph diff --git a/fedot/core/optimisers/genetic_operators/crossover.py b/fedot/core/optimisers/genetic_operators/crossover.py new file mode 100644 index 0000000000..75fda4ee75 --- /dev/null +++ b/fedot/core/optimisers/genetic_operators/crossover.py @@ -0,0 +1,48 @@ +from copy import deepcopy +from functools import WRAPPER_ASSIGNMENTS +from random import choice + +from typing import Callable, Tuple + +from fedot.core.optimisers.genetic_operators.atomized_operators_wrapper import \ + extract_graphs_from_atomized, insert_graphs_to_atomized +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.repository.operation_types_repository import OperationTypesRepository +from golem.core.adapter import register_native +from golem.core.optimisers.genetic.operators.base_mutations import \ + single_edge_mutation, single_add_mutation, \ + single_change_mutation, single_drop_mutation +from golem.core.optimisers.genetic.operators.crossover import CrossoverCallable, one_point_crossover, subtree_crossover +from golem.core.optimisers.graph import OptGraph +from golem.core.optimisers.optimization_parameters import GraphRequirements +from golem.core.optimisers.optimizer import GraphGenerationParams +from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters + + +def atomized_crossover(crossover_fun: CrossoverCallable) -> CrossoverCallable: + def crossover_for_atomized_graph(graph1: OptGraph, + graph2: OptGraph, + max_depth: int + ) -> Tuple[OptGraph, OptGraph]: + graphs = [deepcopy(graph1), deepcopy(graph2)] + graphs1, graphs2 = map(extract_graphs_from_atomized, graphs) + node_uid1, graph_for_crossover1 = choice(list(graphs1.items())) + node_uid2, graph_for_crossover2 = choice(list(graphs2.items())) + + graph_after_crossover1, graph_after_crossover2 = crossover_fun(graph_for_crossover1, graph_for_crossover2, max_depth) + + new_graph1 = insert_graphs_to_atomized(graph1, node_uid1, graph_after_crossover1) + new_graph2 = insert_graphs_to_atomized(graph2, node_uid2, graph_after_crossover2) + return new_graph1, new_graph2 + + # TODO use functools.wraps. now it brokes something in GOLEM. + for attr in WRAPPER_ASSIGNMENTS: + setattr(crossover_for_atomized_graph, attr, getattr(crossover_fun, attr)) + crossover_for_atomized_graph.__wrapped__ = crossover_fun + + return crossover_for_atomized_graph + + +fedot_one_point_crossover = register_native(atomized_crossover(one_point_crossover)) +fedot_subtree_crossover = register_native(atomized_crossover(subtree_crossover)) diff --git a/fedot/core/optimisers/genetic_operators/mutation.py b/fedot/core/optimisers/genetic_operators/mutation.py new file mode 100644 index 0000000000..997a3735db --- /dev/null +++ b/fedot/core/optimisers/genetic_operators/mutation.py @@ -0,0 +1,131 @@ +from copy import deepcopy +from functools import WRAPPER_ASSIGNMENTS +from random import choice + +from typing import Callable, Union + +from fedot.core.optimisers.genetic_operators.atomized_operators_wrapper import \ + extract_graphs_from_atomized, insert_graphs_to_atomized +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.repository.operation_types_repository import OperationTypesRepository +from golem.core.adapter import register_native +from golem.core.optimisers.genetic.operators.base_mutations import \ + single_edge_mutation, single_add_mutation, \ + single_change_mutation, single_drop_mutation, tree_growth +from golem.core.optimisers.graph import OptGraph +from golem.core.optimisers.optimization_parameters import GraphRequirements +from golem.core.optimisers.optimizer import GraphGenerationParams +from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters + + +OperationTypesRepository.init_repository('atomized') +ATOMIZED_OPERATION_REPOSITORY = OperationTypesRepository('atomized') + +MutationFun = Callable[[OptGraph, GraphRequirements, GraphGenerationParams, GPAlgorithmParameters], OptGraph] + + +def atomized_mutation(mutation_fun: MutationFun) -> MutationFun: + def mutation_for_atomized_graph(graph: OptGraph, + requirements: GraphRequirements, + graph_gen_params: GraphGenerationParams, + parameters: GPAlgorithmParameters, + ) -> OptGraph: + graph = deepcopy(graph) + graphs = extract_graphs_from_atomized(graph) + node_uid, graph_to_mutate = choice(list(graphs.items())) + + mutated_graph = mutation_fun(graph_to_mutate, + requirements=requirements, + graph_gen_params=graph_gen_params, + parameters=parameters) + + new_graph = insert_graphs_to_atomized(graph, node_uid, mutated_graph) + return new_graph + + # TODO use functools.wraps. now it brokes something in GOLEM. + for attr in WRAPPER_ASSIGNMENTS: + setattr(mutation_for_atomized_graph, attr, getattr(mutation_fun, attr)) + mutation_for_atomized_graph.__wrapped__ = mutation_fun + + return mutation_for_atomized_graph + + + +fedot_single_edge_mutation = register_native(atomized_mutation(single_edge_mutation)) +fedot_single_add_mutation = register_native(atomized_mutation(single_add_mutation)) +fedot_single_change_mutation = register_native(atomized_mutation(single_change_mutation)) +fedot_single_drop_mutation = register_native(atomized_mutation(single_drop_mutation)) +fedot_tree_growth = register_native(atomized_mutation(tree_growth)) + + +@atomized_mutation +def insert_atomized_operation(pipeline: Pipeline, + requirements: GraphRequirements, + graph_gen_params: GraphGenerationParams, + parameters: GPAlgorithmParameters, + ) -> Pipeline: + """ Wrap part of pipeline to atomized operation + """ + task_type = graph_gen_params.advisor.task.task_type + atomized_operations = ATOMIZED_OPERATION_REPOSITORY.suitable_operation(task_type=task_type, + tags=['non-default'], + forbidden_tags=['not-for-mutation']) + if atomized_operations: + atomized_operation = choice(atomized_operations) + + info = ATOMIZED_OPERATION_REPOSITORY.operation_info_by_id(atomized_operation) + it, ot = set(info.input_types), set(info.output_types) + + if atomized_operation == 'atomized_ts_decomposer': + lagged_nodes = ('lagged', 'sparse_lagged', 'exog_ts') + nodes = list() + for node in pipeline.nodes: + correct_data_types = (set(node.operation.metadata.input_types) == it and + set(node.operation.metadata.output_types) == ot) + correct_input_nodes = (isinstance(node.nodes_from, list) + and len(node.nodes_from) == 2 + and node.nodes_from[0].operation.operation_type in lagged_nodes + and node.nodes_from[1].operation.metadata.repository_name != 'data_operation') + if correct_data_types and correct_input_nodes: + nodes.append(node) + if not nodes: + root_node = pipeline.root_node + + laggeds = [node for node in pipeline.nodes if node.operation.operation_type in lagged_nodes] + if not laggeds: + laggeds.append(PipelineNode('lagged')) + lagged = choice(laggeds) + + models = [node for node in pipeline.nodes + if node.operation.metadata.repository_name != 'data_operation' + and node != root_node] + if not models: + models.append(PipelineNode('ridge', nodes_from=[lagged])) + model = choice(models) + + nodes = [PipelineNode('ridge', nodes_from=[lagged, model])] + root_node.nodes_from.append(nodes[-1]) + pipeline = Pipeline(root_node) + else: + nodes = list() + for node in pipeline.nodes: + if (set(node.operation.metadata.input_types) == it and + set(node.operation.metadata.output_types) == ot): + nodes.append(node) + + if nodes: + node = choice(nodes) + inner_pipeline = Pipeline(PipelineNode(content=node.content)) + + # create new node_factory and graph_random_factory for new pipeline + strategy = info.current_strategy(graph_gen_params.advisor.task) + operation_class = strategy._operations_by_types[atomized_operation] + node_factory, graph_random_factory = operation_class.build_factories(requirements, graph_gen_params) + inner_pipeline.node_factory = node_factory + inner_pipeline.graph_random_factory = graph_random_factory + + # build new node with inner pipeline + new_node = PipelineNode(content={'name': atomized_operation, 'params': {'pipeline': inner_pipeline}}) + pipeline.update_node(node, new_node) + return pipeline diff --git a/fedot/core/optimisers/objective/data_objective_eval.py b/fedot/core/optimisers/objective/data_objective_eval.py index 7c6d125f30..1f2a6e472e 100644 --- a/fedot/core/optimisers/objective/data_objective_eval.py +++ b/fedot/core/optimisers/objective/data_objective_eval.py @@ -3,6 +3,7 @@ from typing import Callable, Iterable, Optional, Tuple import numpy as np + from golem.core.log import default_log, is_test_session from golem.core.optimisers.fitness import Fitness from golem.core.optimisers.objective.objective import Objective, to_fitness diff --git a/fedot/core/pipelines/adapters.py b/fedot/core/pipelines/adapters.py index d589f55fa4..a7a74459a8 100644 --- a/fedot/core/pipelines/adapters.py +++ b/fedot/core/pipelines/adapters.py @@ -1,11 +1,12 @@ from copy import deepcopy from typing import Any, Optional, Dict +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel from golem.core.adapter import BaseOptimizationAdapter from golem.core.dag.graph_utils import map_dag_nodes -from golem.core.optimisers.graph import OptGraph, OptNode +from golem.core.optimisers.graph import OptGraph -from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.node import PipelineNode, OptNode from fedot.core.pipelines.pipeline import Pipeline @@ -17,6 +18,8 @@ class PipelineAdapter(BaseOptimizationAdapter[Pipeline]): fitted models) that can be used for reconstructing Pipelines. """ + # TODO add tests for correct convertation of Atomized + def __init__(self, use_input_preprocessing: bool = True): super().__init__(base_graph_class=Pipeline) @@ -25,25 +28,45 @@ def __init__(self, use_input_preprocessing: bool = True): @staticmethod def _transform_to_opt_node(node: PipelineNode) -> OptNode: # Prepare content for nodes, leave only simple data - operation_name = str(node.operation) - content = {'name': operation_name, - 'params': node.parameters, - 'metadata': node.metadata} - return OptNode(deepcopy(content)) + content = dict(name=str(node.operation), + params=deepcopy(node.parameters), + metadata=deepcopy(node.metadata)) + + # add data about inner graph if it is atomized model + if isinstance(node.operation, AtomizedModel): + content['inner_graph'] = PipelineAdapter()._adapt(node.operation.pipeline) + + # add data about inner graph if it is atomized + if 'pipeline' in content['params']: + content['params']['pipeline'] = PipelineAdapter()._adapt(content['params']['pipeline']) + + return OptNode(content) @staticmethod def _transform_to_pipeline_node(node: OptNode) -> PipelineNode: - # deepcopy to avoid accidental information sharing between opt graphs & pipelines - content = deepcopy(node.content) - return PipelineNode(operation_type=content['name'], content=content) + if 'inner_graph' in node.content: + atomized_pipeline = PipelineAdapter()._restore(node.content['inner_graph']) + return PipelineNode(node.content['atomized_class'](atomized_pipeline)) + else: + # deepcopy to avoid accidental information sharing between opt graphs & pipelines + content = deepcopy(node.content) + if 'params' in content and 'pipeline' in content['params']: + content['params']['pipeline'] = PipelineAdapter()._restore(content['params']['pipeline']) + return PipelineNode(operation_type=content['name'], content=content) def _adapt(self, adaptee: Pipeline) -> OptGraph: adapted_nodes = map_dag_nodes(self._transform_to_opt_node, adaptee.nodes) - return OptGraph(adapted_nodes) + return OptGraph(adapted_nodes, + node_factory=adaptee.node_factory, + random_graph_factory=adaptee.random_graph_factory) def _restore(self, opt_graph: OptGraph, metadata: Optional[Dict[str, Any]] = None) -> Pipeline: restored_nodes = map_dag_nodes(self._transform_to_pipeline_node, opt_graph.nodes) - pipeline = Pipeline(restored_nodes, use_input_preprocessing=self.use_input_preprocessing) + pipeline = Pipeline(restored_nodes, + use_input_preprocessing=self.use_input_preprocessing, + node_factory=opt_graph.node_factory, + random_graph_factory=opt_graph.random_graph_factory + ) metadata = metadata or {} pipeline.computation_time = metadata.get('computation_time_in_seconds') diff --git a/fedot/core/pipelines/node.py b/fedot/core/pipelines/node.py index 86ccb73121..4221bccc73 100644 --- a/fedot/core/pipelines/node.py +++ b/fedot/core/pipelines/node.py @@ -4,8 +4,11 @@ from typing import Any, List, Optional, Tuple, Union import numpy as np + +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel from golem.core.dag.linked_graph_node import LinkedGraphNode from golem.core.log import default_log +from golem.core.optimisers.graph import OptNode as GolemOptNode from golem.core.optimisers.timer import Timer from golem.serializers.serializer import register_serializable @@ -30,6 +33,21 @@ class NodeMetadata: metric: Optional[float] = None +class OptNode(GolemOptNode): + """ Wrap for GOLEM OptNode that adds ability to descript nodes with AtomizedModel + It is used in PipelineAdapter for convert graph to GOLEM representation """ + def description(self) -> str: + # TODO add test + node_label = super().description() + if 'inner_graph' in self.content: + root_nodes = self.content['inner_graph'].root_nodes() + node_label = f"{node_label}(INNER{''.join(node.descriptive_id for node in root_nodes)}INNER)" + if 'params' in self.content and 'pipeline' in self.content['params']: + root_nodes = self.content['params']['pipeline'].root_nodes() + node_label = f"{node_label}(INNER{''.join(node.descriptive_id for node in root_nodes)}INNER)" + return node_label + + class PipelineNode(LinkedGraphNode): """The class defines the interface of nodes modifying tha data flow in the :class:`Pipeline` @@ -82,6 +100,15 @@ def is_primary(self): if not self.nodes_from or len(self.nodes_from) == 0: return True + def description(self) -> str: + # TODO add test + # TODO there is description in `Operation` why is it not used? + node_label = super().description() + if isinstance(self.operation, AtomizedModel): + root_nodes = self.operation.pipeline.root_nodes() + node_label = f"{node_label}(INNER{''.join(node.descriptive_id for node in root_nodes)}INNER)" + return node_label + def _process_content_init(self, passed_content: dict) -> Operation: """ Updating content in the node """ if isinstance(passed_content['name'], str): @@ -201,7 +228,6 @@ def fit(self, input_data: InputData) -> OutputData: data=input_data) self.fit_time_in_seconds = round(t.seconds_from_start, 3) else: - operation_predict = self.operation.predict_for_fit(fitted_operation=self.fitted_operation, data=input_data, params=self._parameters) diff --git a/fedot/core/pipelines/pipeline.py b/fedot/core/pipelines/pipeline.py index 9f60afc370..cbd14a6f09 100644 --- a/fedot/core/pipelines/pipeline.py +++ b/fedot/core/pipelines/pipeline.py @@ -10,6 +10,8 @@ from golem.core.dag.graph_utils import distance_to_primary_level, graph_structure from golem.core.dag.linked_graph import LinkedGraph from golem.core.log import default_log +from golem.core.optimisers.opt_node_factory import OptNodeFactory +from golem.core.optimisers.random_graph_factory import RandomGraphFactory from golem.core.optimisers.timer import Timer from golem.core.paths import copy_doc from golem.utilities.serializable import Serializable @@ -39,8 +41,16 @@ class Pipeline(GraphDelegate, Serializable): use_input_preprocessing: whether to do input preprocessing or not, ``True`` by default. """ - def __init__(self, nodes: Union[PipelineNode, Sequence[PipelineNode]] = (), use_input_preprocessing: bool = True): - super().__init__(nodes, _graph_nodes_to_pipeline_nodes) + def __init__(self, + nodes: Union[PipelineNode, Sequence[PipelineNode]] = (), + use_input_preprocessing: bool = True, + node_factory: Optional[OptNodeFactory] = None, + random_graph_factory: Optional[RandomGraphFactory] = None, + ): + super().__init__(nodes, + _graph_nodes_to_pipeline_nodes, + node_factory=node_factory, + random_graph_factory=random_graph_factory) self.computation_time = None self.log = default_log(self) diff --git a/fedot/core/pipelines/pipeline_node_factory.py b/fedot/core/pipelines/pipeline_node_factory.py index 81379adac7..ed5d1d8e53 100644 --- a/fedot/core/pipelines/pipeline_node_factory.py +++ b/fedot/core/pipelines/pipeline_node_factory.py @@ -1,4 +1,4 @@ -from random import choice +from random import choice, random from typing import Optional, List from golem.core.optimisers.graph import OptNode @@ -45,8 +45,8 @@ def get_parent_node(self, node: OptNode, is_primary: bool): candidates = self.filter_specific_candidates(candidates) return self._return_node(candidates) - def get_node(self, - is_primary: bool): + def get_node(self, is_primary: Optional[bool] = None): + is_primary = is_primary or bool(random() > 0.5) candidates = self.graph_model_repository.get_operations(is_primary=is_primary) candidates = self.filter_specific_candidates(candidates) return self._return_node(candidates) diff --git a/fedot/core/pipelines/template.py b/fedot/core/pipelines/template.py index b1e53e66c2..ec03f24c6c 100644 --- a/fedot/core/pipelines/template.py +++ b/fedot/core/pipelines/template.py @@ -10,7 +10,7 @@ import numpy as np from golem.core.log import default_log -from fedot.core.operations.atomized_template import AtomizedModelTemplate +from fedot.core.operations.atomized_model.atomized_template import AtomizedModelTemplate from fedot.core.operations.operation_template import OperationTemplate, check_existing_path from fedot.core.pipelines.node import PipelineNode diff --git a/fedot/core/pipelines/verification.py b/fedot/core/pipelines/verification.py index 308663c0bf..068641b8af 100644 --- a/fedot/core/pipelines/verification.py +++ b/fedot/core/pipelines/verification.py @@ -25,7 +25,7 @@ has_no_conflicts_with_data_flow, has_no_data_flow_conflicts_in_ts_pipeline, has_primary_nodes, - only_non_lagged_operations_are_primary, has_correct_location_of_resample + only_non_lagged_operations_are_primary, has_correct_location_of_resample, correct_connection_with_atomized ) from fedot.core.repository.tasks import TaskTypesEnum @@ -40,7 +40,8 @@ has_no_conflicts_in_decompose, has_correct_data_connections, has_correct_data_sources, - has_correct_location_of_resample] + has_correct_location_of_resample, + correct_connection_with_atomized] ts_rules = [only_non_lagged_operations_are_primary, has_no_data_flow_conflicts_in_ts_pipeline] diff --git a/fedot/core/pipelines/verification_rules.py b/fedot/core/pipelines/verification_rules.py index 10eed4ddc7..914d68a61a 100644 --- a/fedot/core/pipelines/verification_rules.py +++ b/fedot/core/pipelines/verification_rules.py @@ -1,6 +1,8 @@ +from collections import Counter from typing import Optional -from fedot.core.operations.atomized_model import AtomizedModel +from fedot.core.operations.atomized import Atomized +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel from fedot.core.operations.model import Model from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline @@ -29,6 +31,8 @@ def has_final_operation_as_model(pipeline: Pipeline): root_node = pipeline.root_node if root_node.operation.operation_type == atomized_model_type(): has_final_operation_as_model(root_node.operation.pipeline) + elif isinstance(root_node.operation, Atomized): + has_final_operation_as_model(root_node.content['params']['pipeline']) elif type(root_node.operation) is not Model: raise ValueError(f'{ERROR_PREFIX} Root operation is not a model') return True @@ -63,6 +67,8 @@ def has_correct_data_connections(pipeline: Pipeline): # check atomized pipeline if isinstance(node.operation, AtomizedModel): has_correct_data_connections(node.operation.pipeline) + if 'pipeline' in node.parameters: + has_correct_data_connections(node.parameters['pipeline']) # skip custom node if node.operation.metadata.id == 'custom': @@ -290,6 +296,30 @@ def has_no_conflicts_during_multitask(pipeline: Pipeline): return True +def correct_connection_with_atomized(pipeline: Pipeline): + for node in pipeline.nodes: + if isinstance(node.operation, Atomized): + if node.operation.operation_type == 'atomized_ts_to_time': + counter = Counter(parent.operation.metadata.repository_name for parent in node.nodes_from) + if counter.get('data_operation', 0) > 0: + if counter['data_operation'] > 1 or (counter['data_operation'] != sum(counter.values())): + return False + elif node.operation.operation_type == 'atomized_ts_decomposer': + if not isinstance(node.nodes_from, list) or len(node.nodes_from) != 2: + return False + if not (node.nodes_from[0].operation.operation_type in ('lagged', 'sparse_lagged', 'exog_ts') + and node.nodes_from[1].operation.metadata.repository_name != 'data_operation'): + return False + + inner_pipeline = node.parameters['pipeline'] + for inner_node in inner_pipeline.primary_nodes: + if DataTypesEnum.table not in inner_node.operation.metadata.input_types: + return False + if not correct_connection_with_atomized(inner_pipeline): + return False + return True + + def has_no_conflicts_after_class_decompose(pipeline: Pipeline): """ After the class_decompose operation, a regression model is required. diff --git a/fedot/core/repository/data/atomized_model_repository.json b/fedot/core/repository/data/atomized_model_repository.json new file mode 100644 index 0000000000..b91ceb1c8f --- /dev/null +++ b/fedot/core/repository/data/atomized_model_repository.json @@ -0,0 +1,39 @@ +{ + "metadata": { + "regr": { + "accepted_node_types": ["any"], + "description": "Implementations of the regression models", + "forbidden_node_types": "[]", + "input_type": "[DataTypesEnum.table]", + "output_type": "[DataTypesEnum.table]", + "tags": ["ml", "atomized", "non-default"], + "tasks": "[TaskTypesEnum.regression, TaskTypesEnum.ts_forecasting]", + "strategies": ["fedot.core.operations.evaluation.atomized", "FedotAtomizedStrategy"] + } + }, + "operations": { + "atomized_ts_differ": { + "meta": "regr", + "presets": ["fast_train", "ts"], + "tags": ["linear"] + }, + "atomized_ts_sampler": { + "meta": "regr", + "presets": ["fast_train", "ts"], + "tags": ["linear", "ts", "not-for-mutation"] + }, + "atomized_ts_scaler": { + "meta": "regr", + "presets": ["fast_train", "ts"], + "tags": ["linear", "ts", "not-for-mutation"] + }, + "atomized_ts_to_time": { + "meta": "regr", + "presets": ["fast_train", "ts"] + }, + "atomized_ts_decomposer": { + "meta": "regr", + "presets": ["fast_train", "ts", "not-for-mutation"] + } + } +} \ No newline at end of file diff --git a/fedot/core/repository/data/data_operation_repository.json b/fedot/core/repository/data/data_operation_repository.json index 8003ea40ce..bc46839fc6 100644 --- a/fedot/core/repository/data/data_operation_repository.json +++ b/fedot/core/repository/data/data_operation_repository.json @@ -224,12 +224,12 @@ "ransac_lin_reg": { "meta": "regression_preprocessing", "presets": ["fast_train", "*tree"], - "tags": ["affects_target", "linear", "filtering", "correct_params", "non_applicable_for_ts"] + "tags": ["affects_target", "linear", "filtering", "non-default", "correct_params", "non_applicable_for_ts"] }, "ransac_non_lin_reg": { "meta": "regression_preprocessing", "presets": ["*tree"], - "tags": ["affects_target", "non_linear", "filtering", + "tags": ["affects_target", "non_linear", "filtering", "non-default", "correct_params", "non_applicable_for_ts"] }, "isolation_forest_reg": { diff --git a/fedot/core/repository/data/default_operation_params.json b/fedot/core/repository/data/default_operation_params.json index c3aeb5208c..1dcbaa732c 100644 --- a/fedot/core/repository/data/default_operation_params.json +++ b/fedot/core/repository/data/default_operation_params.json @@ -50,7 +50,8 @@ "verbose": -1 }, "lagged": { - "window_size": 10 + "window_size": 10, + "autotune_window": 1 }, "diff_filter": { "window_size": 3, diff --git a/fedot/core/repository/operation_types_repository.py b/fedot/core/repository/operation_types_repository.py index 6555a35242..1c8cf7fbf9 100644 --- a/fedot/core/repository/operation_types_repository.py +++ b/fedot/core/repository/operation_types_repository.py @@ -16,7 +16,7 @@ if TYPE_CHECKING: from fedot.core.operations.evaluation.evaluation_interfaces import EvaluationStrategy -AVAILABLE_REPO_NAMES = ['all', 'model', 'data_operation', 'automl'] +AVAILABLE_REPO_NAMES = ['all', 'model', 'data_operation', 'automl', 'atomized'] @dataclass @@ -29,6 +29,7 @@ class OperationMetaInfo: allowed_positions: List[str] tags: Optional[List[str]] = None presets: Optional[List[str]] = None + repository_name: Optional[str] = None def current_strategy(self, task: TaskTypesEnum) -> Optional['EvaluationStrategy']: """ @@ -74,7 +75,8 @@ class OperationTypesRepository: 'model': {'file': 'model_repository.json', 'initialized_repo': None, 'default_tags': DEFAULT_MODEL_TAGS}, 'data_operation': {'file': 'data_operation_repository.json', 'initialized_repo': None, 'default_tags': DEFAULT_DATA_OPERATION_TAGS}, - 'automl': {'file': 'automl_repository.json', 'initialized_repo': None, 'default_tags': []} + 'automl': {'file': 'automl_repository.json', 'initialized_repo': None, 'default_tags': []}, + 'atomized': {'file': 'atomized_model_repository.json', 'initialized_repo': None, 'default_tags': []} } @@ -112,6 +114,13 @@ def get_available_repositories(cls): operation_types.append(t) return operation_types + @classmethod + def init_repository(cls, repo_name: str): + if repo_name not in cls.__repository_dict__: + raise ValueError(f"Unknown repo {repo_name}") + file_name = cls.__repository_dict__[repo_name]['file'] + cls.assign_repo(repo_name, file_name) + @classmethod def init_automl_repository(cls): default_automl_repo_file = cls.__repository_dict__['automl']['file'] @@ -205,6 +214,11 @@ def _initialise_repo(cls, repo_path: str) -> List[OperationMetaInfo]: # Unit tags tags = meta_tags + operation_tags + repo = [k for k, v in cls.__repository_dict__.items() if os.path.basename(repo_path) == v['file']] + if not repo: + raise ValueError('There is no repo_path in __repository_dict__') + repo = repo[0] + operation = OperationMetaInfo(id=current_operation_key, input_types=input_type, output_types=output_type, @@ -212,7 +226,8 @@ def _initialise_repo(cls, repo_path: str) -> List[OperationMetaInfo]: supported_strategies=supported_strategies, allowed_positions=allowed_positions, tags=tags, - presets=presets) + presets=presets, + repository_name=repo) operations_list.append(operation) return operations_list diff --git a/fedot/utilities/window_size_selector.py b/fedot/utilities/window_size_selector.py new file mode 100644 index 0000000000..6fd58b594c --- /dev/null +++ b/fedot/utilities/window_size_selector.py @@ -0,0 +1,234 @@ +import math +from typing import Union + +import numpy as np +import pandas as pd +from scipy.signal import find_peaks +from statsmodels.tsa.stattools import acf + + +class WindowSizeSelector: + """Class to select appropriate window size to catch periodicity for time series analysis. + There are two group of algorithms implemented: + Whole-Series-Based (WSB): + 1. 'hac' - highest_autocorrelation + 2. 'dff' - dominant_fourier_frequency + Subsequence-based (SB): + 1. 'mwf' - multi_window_finder + 2. 'sss' - summary_statistics_subsequence + Args: + method: by ``default``, it is 'dff'. + You can choose between: 'hac', 'dff', 'sss' or 'mwf'. + window_range: % of time series length, by ``default`` it is (5, 50). + Attributes: + length_ts(int): length of the time_series. + window_max(int): maximum window size in real values. + window_min(int): minimum window size in real values. + dict_methods(dict): dictionary with all implemented methods. + Example: + To find window size for single time series:: + ts = np.random.rand(1000) + ws_selector = WindowSizeSelector(method='hac') + window_size = ws_selector.get_window_size(time_series=ts) + To find window size for multiple time series:: + ts = np.random.rand(1000, 10) + ws_selector = WindowSizeSelector(method='hac') + window_size = ws_selector.apply(time_series=ts, average='median') + Reference: + (c) "Windows Size Selection in Unsupervised Time Series Analytics: A Review and Benchmark. Arik Ermshaus, + Patrick Schafer, and Ulf Leser. 2022" + """ + + def __init__(self, method: str = 'dff', window_range: tuple = (5, 50)): + + assert window_range[0] < window_range[1], 'Upper bound of window range should be bigger than lower bound' + + self.dict_methods = {'hac': self.autocorrelation, + 'dff': self.dominant_fourier_frequency, + 'mwf': self.mwf, + 'sss': self.summary_statistics_subsequence} + self.wss_algorithm = method + self.window_range = window_range + self.window_max = None + self.window_min = None + self.length_ts = None + + def apply(self, time_series: Union[pd.DataFrame, np.array], average: str = 'median') -> int: + """Method to run WSS class over selected time series in parallel mode via joblib + Args: + time_series: time series to study + average: 'mean' or 'median' to average window size over all time series + Returns: + window_size_selected: value which has been chosen as appropriate window size + """ + methods = {'mean': np.mean, 'median': np.median} + assert average in methods.keys(), 'Hyperparameters error: `average` should be mean or median' + + if isinstance(time_series, pd.DataFrame): + time_series = time_series.values + + window_list = [self.get_window_size(ts) for ts in time_series] + return round(methods[average](window_list)) + + def get_window_size(self, time_series: np.array) -> int: + """Main function to run WSS class over selected time series + Note: + One of the reason of ValueError is that time series size can be equal or smaller than 50. + In case of it try to initially set window_size min and max. + Returns: + window_size_selected: value which has been chosen as appropriate window size + """ + if time_series.shape[0] == 1: # If time series is a part of multivariate one + time_series = np.array(time_series[0]) + self.length_ts = len(time_series) + + self.window_max = int(round(self.length_ts * self.window_range[1] / 100)) # in real values + self.window_min = int(round(self.length_ts * self.window_range[0] / 100)) # in real values + + window_size_selected = self.dict_methods[self.wss_algorithm](time_series=time_series) + return round(window_size_selected * 100 / self.length_ts) # in % + + def dominant_fourier_frequency(self, time_series: np.array) -> int: + """ + Method to find dominant fourier frequency in time series and return appropriate window size. It is based on + the assumption that the dominant frequency is the one with the highest magnitude in the Fourier transform. The + window size is then the inverse of the dominant frequency. + """ + fourier = np.fft.fft(time_series) + freq = np.fft.fftfreq(time_series.shape[0], 1) + + magnitudes, window_sizes = [], [] + + for coef, freq in zip(fourier, freq): + if coef and freq > 0: + window_size = int(1 / freq) + mag = math.sqrt(coef.real * coef.real + coef.imag * coef.imag) + + if self.window_min <= window_size < self.window_max: + window_sizes.append(window_size) + magnitudes.append(mag) + + return window_sizes[np.argmax(magnitudes)] + + def autocorrelation(self, time_series: np.array) -> int: + """Method to find the highest autocorrelation in time series and return appropriate window size. It is based on + the assumption that the lag of highest autocorrelation coefficient corresponds to the window size that best + captures the periodicity of the time series. + """ + ts_len = time_series.shape[0] + acf_values = acf(time_series, fft=True, nlags=int(ts_len / 2)) + + peaks, _ = find_peaks(acf_values) + peaks = peaks[np.logical_and(peaks >= self.window_min, peaks < self.window_max)] + corrs = acf_values[peaks] + + if peaks.shape[0] == 0: # if there is no peaks in range (window_min, window_max) return window_min + return self.window_min + return peaks[np.argmax(corrs)] + + def mwf(self, time_series: np.array) -> int: + """ Method to find the window size that minimizes the moving average residual. It is based on the assumption + that the window size that best captures the periodicity of the time series is the one that minimizes the + difference between the moving average and the time series. + """ + + all_averages, window_sizes = [], [] + + for w in range(self.window_min, self.window_max, 1): + movingAvg = np.array(self.movmean(time_series, w)) + all_averages.append(movingAvg) + window_sizes.append(w) + + movingAvgResiduals = [] + + for i, w in enumerate(window_sizes): + moving_avg = all_averages[i][:len(all_averages[-1])] + movingAvgResidual = np.log(abs(moving_avg - (moving_avg).mean()).sum()) + movingAvgResiduals.append(movingAvgResidual) + + b = (np.diff(np.sign(np.diff(movingAvgResiduals))) > 0).nonzero()[0] + 1 # local min + + if len(b) == 0: + return self.window_min + if len(b) < 3: + return window_sizes[b[0]] + + reswin = np.array([window_sizes[b[i]] / (i + 1) for i in range(3)]) + w = np.mean(reswin) + + return int(w) + + def movmean(self, ts, w): + """Fast moving average function""" + moving_avg = np.cumsum(ts, dtype=float) + moving_avg[w:] = moving_avg[w:] - moving_avg[:-w] + return moving_avg[w - 1:] / w + + def summary_statistics_subsequence(self, time_series: np.array, threshold=.89) -> int: + """Method to find the window size that maximizes the subsequence unsupervised similarity score (SUSS). It is + based on the assumption that the window size that best captures the periodicity of the time series is the one + that maximizes the similarity between subsequences of the time series. + """ + # lbound = self.window_min + time_series = (time_series - time_series.min()) / (time_series.max() - time_series.min()) + + ts_mean = np.mean(time_series) + ts_std = np.std(time_series) + ts_min_max = np.max(time_series) - np.min(time_series) + + stats = (ts_mean, ts_std, ts_min_max) + + max_score = self.suss_score(time_series=time_series, window_size=1, stats=stats) + min_score = self.suss_score(time_series=time_series, window_size=time_series.shape[0] - 1, stats=stats) + + exp = 0 + + # exponential search (to find window size interval) + while True: + window_size = 2 ** exp + + if window_size < self.window_min: + exp += 1 + continue + + score = 1 - (self.suss_score(time_series, window_size, stats) - min_score) / (max_score - min_score) + + if score > threshold: + break + + exp += 1 + + lbound, ubound = max(self.window_min, 2 ** (exp - 1)), 2 ** exp + 1 + + # binary search (to find window size in interval) + while lbound <= ubound: + window_size = int((lbound + ubound) / 2) + score = 1 - (self.suss_score(time_series, window_size, stats) - min_score) / (max_score - min_score) + + if score < threshold: + lbound = window_size + 1 + elif score > threshold: + ubound = window_size - 1 + else: + break + + return 2 * lbound + + def suss_score(self, time_series, window_size, stats): + roll = pd.Series(time_series).rolling(window_size) + ts_mean, ts_std, ts_min_max = stats + + roll_mean = roll.mean().to_numpy()[window_size:] + roll_std = roll.std(ddof=0).to_numpy()[window_size:] + roll_min = roll.min().to_numpy()[window_size:] + roll_max = roll.max().to_numpy()[window_size:] + + X = np.array([ + roll_mean - ts_mean, + roll_std - ts_std, + (roll_max - roll_min) - ts_min_max + ]) + + X = np.sqrt(np.sum(np.square(X), axis=0)) / np.sqrt(window_size) + + return np.mean(X) \ No newline at end of file diff --git a/test/integration/api/test_main_api.py b/test/integration/api/test_main_api.py index a8ea407373..203d4eb90b 100644 --- a/test/integration/api/test_main_api.py +++ b/test/integration/api/test_main_api.py @@ -1,11 +1,15 @@ import os import shutil from copy import deepcopy +from itertools import chain from typing import Optional import numpy as np import pandas as pd import pytest + +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel +from fedot.core.pipelines.adapters import PipelineAdapter from golem.core.dag.graph_utils import graph_structure from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split @@ -549,6 +553,43 @@ def test_default_forecast(): assert np.array_equal(model.test_data.idx, train_data.idx) +def test_atomized_model_are_mutated(): + # prepare pipeline with atomized model as initial assumption + node = PipelineNode('lagged') + node = PipelineNode('ridge', nodes_from=[node]) + inner_node = PipelineNode('rfr', nodes_from=[PipelineNode('linear')]) + node = PipelineNode(AtomizedModel(Pipeline(inner_node)), nodes_from=[node]) + initial_assumption = Pipeline(node) + + # prepare data + forecast_length = 2 + train_data, test_data, _ = get_dataset('ts_forecasting', + forecast_length=forecast_length, + validation_blocks=1) + # get and fit fedot + model = Fedot(problem='ts_forecasting', + pop_size=10, + num_of_generations=1, + with_tuning=False, + timeout=1, + task_params=TsForecastingParams(forecast_length=forecast_length), + initial_assumption=initial_assumption) + model.fit(train_data) + + # extract descriptive_id of atomized pipeline + pipelines_in_atomized_descriptive_id = [] + for generation in model.history.generations: + for ind in generation: + if 'atomized' in ind.graph.descriptive_id: + ppl = PipelineAdapter()._restore(ind.graph) + for node in ppl.nodes: + if isinstance(node.operation, AtomizedModel): + pipelines_in_atomized_descriptive_id.append(node.operation.pipeline.descriptive_id) + + # check that there are some different atomized pipelines after composition + assert len(set(pipelines_in_atomized_descriptive_id)) > 1 + + @pytest.mark.parametrize('horizon', [1, 2, 3, 4]) def test_forecast_with_different_horizons(horizon): forecast_length = 2 diff --git a/test/integration/models/atomized_models/__init__.py b/test/integration/models/atomized_models/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/integration/models/test_atomized_model.py b/test/integration/models/atomized_models/test_atomized_model.py similarity index 85% rename from test/integration/models/test_atomized_model.py rename to test/integration/models/atomized_models/test_atomized_model.py index d66eb3347a..ba587b2b2e 100644 --- a/test/integration/models/test_atomized_model.py +++ b/test/integration/models/atomized_models/test_atomized_model.py @@ -8,7 +8,7 @@ from fedot.core.composer.metrics import RMSE from fedot.core.data.data import InputData -from fedot.core.operations.atomized_model import AtomizedModel +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline from fedot.core.utils import fedot_project_root @@ -211,30 +211,3 @@ def test_create_empty_atomized_model_raised_exception(): with pytest.raises(Exception): empty_pipeline = Pipeline() AtomizedModel(empty_pipeline) - - -def test_fine_tune_atomized_model_correct(): - train_data, test_data = create_input_data() - - atm_model = create_atomized_model() - dummy_atomized_model = create_atomized_model() - - fine_tuned_atomized_model = atm_model.fine_tune(metric_function=RMSE.get_value, - input_data=train_data, - iterations=5, - timeout=1) - dummy_atomized_model.fit(None, train_data) - - fitted_dummy_model, _ = dummy_atomized_model.fit(None, train_data) - fitted_fine_tuned_atomized_model, _ = fine_tuned_atomized_model.fit(None, train_data) - - after_tuning_output = fine_tuned_atomized_model.predict(fitted_fine_tuned_atomized_model, data=test_data) - after_tuning_predicted = after_tuning_output.predict - before_tuning_output = dummy_atomized_model.predict(fitted_dummy_model, data=test_data) - before_tuning_predicted = before_tuning_output.predict - - aft_tun_mse = mean_squared_error(y_true=test_data.target, y_pred=after_tuning_predicted) - bfr_tun_mse = mean_squared_error(y_true=test_data.target, y_pred=before_tuning_predicted) - - deviation = 0.50 * bfr_tun_mse - assert aft_tun_mse <= (bfr_tun_mse + deviation) diff --git a/test/integration/models/atomized_models/test_atomized_ts_operations.py b/test/integration/models/atomized_models/test_atomized_ts_operations.py new file mode 100644 index 0000000000..56d135d666 --- /dev/null +++ b/test/integration/models/atomized_models/test_atomized_ts_operations.py @@ -0,0 +1,62 @@ +import numpy as np +import pytest +from sklearn.metrics import mean_squared_error +from typing import Type + +from fedot.core.data.data import InputData +from fedot.core.data.data_split import train_test_data_setup +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_differ import AtomizedTimeSeriesDiffer +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_sampler import AtomizedTimeSeriesSampler +from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_scaler import AtomizedTimeSeriesScaler +from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.repository.dataset_types import DataTypesEnum +from fedot.core.repository.tasks import TaskTypesEnum, Task, TsForecastingParams +from fedot.core.pipelines.ts_wrappers import in_sample_ts_forecast + + +def get_data(fit_length: int, validation_blocks: int = 10, forecasting_length: int = 20): + time = np.linspace(0, 10, fit_length) + dt = time[1] - time[0] + start = time[-1] + dt + stop = start + validation_blocks * forecasting_length * dt + time = np.concatenate([time, np.arange(start, stop, dt)]) + time_series = np.sin(time) + + data = InputData(idx=np.arange(len(time_series)), + features=time_series, + target=time_series, + task=Task(TaskTypesEnum.ts_forecasting, TsForecastingParams(forecasting_length)), + data_type=DataTypesEnum.ts) + train, test = train_test_data_setup(data, validation_blocks=validation_blocks) + return train, test + + +def get_pipeline(atomized: Type[AtomizedModel] = None, model: str = 'rfr'): + node = PipelineNode('lagged') + if atomized is not None: + pipeline = Pipeline(PipelineNode(model)) + node = PipelineNode(atomized(pipeline), nodes_from=[node]) + else: + node = PipelineNode(model, nodes_from=[node]) + return Pipeline(node) + + +def predict(pipeline: Pipeline, train: InputData, test: InputData): + pipeline.fit(train) + return in_sample_ts_forecast(pipeline, test, len(test.target)) + + +@pytest.mark.parametrize(('data_length', 'atomized_class'), + [(100, AtomizedTimeSeriesSampler), + (1000, AtomizedTimeSeriesScaler), + (1000, AtomizedTimeSeriesDiffer), + ]) +def test_atomized_operations(data_length: int, atomized_class: Type[AtomizedModel]): + train, test = get_data(data_length) + atomized_predict = predict(get_pipeline(atomized_class), train, test) + simple_predict = predict(get_pipeline(), train, test) + # pd.DataFrame([test.target, simple_predict, atomized_predict], index=['target', 'simple', 'atomized']).T.plot() + # print(mean_squared_error(test.target, simple_predict) - mean_squared_error(test.target, atomized_predict)) + assert mean_squared_error(test.target, simple_predict) >= mean_squared_error(test.target, atomized_predict) diff --git a/test/unit/adapter/test_adapt_pipeline.py b/test/unit/adapter/test_adapt_pipeline.py index d35b347697..f2e1f7d085 100644 --- a/test/unit/adapter/test_adapt_pipeline.py +++ b/test/unit/adapter/test_adapt_pipeline.py @@ -7,11 +7,10 @@ from golem.core.dag.graph_node import GraphNode from golem.core.dag.graph_verifier import GraphVerifier from golem.core.dag.verification_rules import DEFAULT_DAG_RULES -from golem.core.optimisers.graph import OptNode from fedot.core.operations.operation import Operation from fedot.core.pipelines.adapters import PipelineAdapter -from fedot.core.pipelines.node import PipelineNode +from fedot.core.pipelines.node import PipelineNode, OptNode from fedot.core.pipelines.pipeline import Pipeline from fedot.core.pipelines.pipeline_builder import PipelineBuilder from test.unit.dag.test_graph_utils import find_first diff --git a/test/unit/optimizer/gp_operators/test_mutation.py b/test/unit/optimizer/gp_operators/test_mutation.py index 8cbdf06a82..954431e741 100644 --- a/test/unit/optimizer/gp_operators/test_mutation.py +++ b/test/unit/optimizer/gp_operators/test_mutation.py @@ -1,7 +1,11 @@ from copy import deepcopy +from itertools import chain from pathlib import Path import pytest +from typing import Any, List, Optional, Type, Callable + +from fedot.core.operations.atomized_model.atomized_model import AtomizedModel from golem.core.dag.graph_node import GraphNode from golem.core.dag.verification_rules import DEFAULT_DAG_RULES from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters @@ -20,6 +24,8 @@ from fedot.core.pipelines.pipeline_graph_generation_params import get_pipeline_generation_params from fedot.core.repository.operation_types_repository import get_operations_for_task from fedot.core.repository.tasks import Task, TaskTypesEnum +from fedot.core.optimisers.genetic_operators.mutation import fedot_single_edge_mutation, fedot_single_change_mutation, \ + fedot_single_drop_mutation from test.integration.composer.test_composer import to_categorical_codes from test.unit.dag.test_graph_utils import find_first from test.unit.tasks.test_forecasting import get_ts_data @@ -39,7 +45,7 @@ def file_data(): return input_data -def get_mutation_obj() -> Mutation: +def get_mutation_obj(mutation_types: Optional[List[Any]] = None) -> Mutation: """ Function for initializing mutation interface """ @@ -51,8 +57,11 @@ def get_mutation_obj() -> Mutation: graph_params = get_pipeline_generation_params(requirements=requirements, rules_for_constraint=DEFAULT_DAG_RULES, task=task) - parameters = GPAlgorithmParameters(mutation_strength=MutationStrengthEnum.strong, - mutation_prob=1) + kwargs = dict(mutation_strength=MutationStrengthEnum.strong, + mutation_prob=1) + if mutation_types is not None: + kwargs = {'mutation_types': mutation_types, **kwargs} + parameters = GPAlgorithmParameters(**kwargs) mutation = Mutation(parameters, requirements, graph_params) return mutation @@ -104,6 +113,32 @@ def get_ts_forecasting_graph_with_boosting() -> Pipeline: return pipeline +def get_graph_with_two_nested_atomized_models(atomized_model): + simple_pipeline = (PipelineBuilder() + .add_node('scaling') + .add_branch('linear', 'poly_features') + .grow_branches('rf', 'catboost') + .join_branches('ridge') + .build()) + + node1 = PipelineNode('a') + node2 = PipelineNode('b', nodes_from=[node1]) + node3 = PipelineNode(atomized_model(simple_pipeline), nodes_from=[node1]) + node4 = PipelineNode('c', nodes_from=[node1, node3]) + node5 = PipelineNode('d', nodes_from=[node2, node4]) + node6 = PipelineNode('e', nodes_from=[node2, node5]) + pipeline_with_atomized = Pipeline(node6) + + node1 = PipelineNode('1') + node2 = PipelineNode('2', nodes_from=[node1]) + node3 = PipelineNode(atomized_model(pipeline_with_atomized), nodes_from=[node1]) + node4 = PipelineNode('3', nodes_from=[node1, node3]) + node5 = PipelineNode('4', nodes_from=[node2, node4]) + node6 = PipelineNode('5', nodes_from=[node2, node5]) + pipeline_with_atomized = Pipeline(node6) + return PipelineAdapter().adapt(pipeline_with_atomized) + + def test_boosting_mutation_for_linear_graph(): """ Tests boosting mutation can add correct boosting cascade @@ -170,3 +205,54 @@ def test_no_opt_or_graph_nodes_after_mutation(): new_pipeline = adapter.restore(graph) assert not find_first(new_pipeline, lambda n: type(n) in (GraphNode, OptNode)) + + +@pytest.mark.parametrize('atomized_model', + (AtomizedModel, )) +@pytest.mark.parametrize('mutation_type', + (fedot_single_edge_mutation, + fedot_single_change_mutation, + fedot_single_change_mutation, + fedot_single_drop_mutation)) +def test_fedot_mutation_with_atomized_models(atomized_model: Type[AtomizedModel], + mutation_type: Callable[[OptGraph], OptGraph]): + + def extract_all_graphs(graph: OptGraph): + """ get all graphs from graph with atomized nodes as plane list""" + atomized_nodes = [node for node in graph.nodes if 'atomized' in node.name.lower()] + atomized_graphs = list(chain(*[extract_all_graphs(node.content['inner_graph']) for node in atomized_nodes])) + return [graph] + atomized_graphs + + def descriptive_id_without_atomized(graph: OptGraph): + description = '' + nodes = graph.root_nodes() + while nodes: + node = nodes.pop() + if 'inner_graph' in node.content: + description += 'atomized' + else: + description += node.description() + nodes.extend(node.nodes_from) + return description + + mutation = get_mutation_obj(mutation_types=[mutation_type]) + # check that mutation_type has been set correctly + assert len(mutation.parameters.mutation_types) == 1 + assert mutation.parameters.mutation_types[0] is mutation_type + + # make mutation some times + mut = mutation.parameters.mutation_types[0] + origin_graphs = extract_all_graphs(get_graph_with_two_nested_atomized_models(atomized_model)) + origin_descriptive_ids = [descriptive_id_without_atomized(x) for x in origin_graphs] + all_mutations = [0, 0, 0] + for _ in range(20): + graph, _ = mutation._adapt_and_apply_mutation(new_graph=deepcopy(origin_graphs[0]), mutation_type=mut) + descriptive_ids = [descriptive_id_without_atomized(x) for x in extract_all_graphs(graph)] + + # check that there was the only one mutation in all graph + assert sum(x != y for x, y in zip(origin_descriptive_ids, descriptive_ids)) == 1 + + all_mutations = [x + (y != z) for x, y, z in zip(all_mutations, origin_descriptive_ids, descriptive_ids)] + + # check that all graphs receive at least 20% of mutations share + assert all(x / sum(all_mutations) > 0.1 for x in all_mutations)