Skip to content

Commit

Permalink
added: unit-test and integration, docstrings, init input_data method
Browse files Browse the repository at this point in the history
cleaned up TimeSeriesClassifier_Preset
  • Loading branch information
technocreep committed Sep 20, 2023
1 parent b6545a5 commit 0912891
Show file tree
Hide file tree
Showing 18 changed files with 450 additions and 134 deletions.
33 changes: 33 additions & 0 deletions fedot_ind/api/utils/input_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import numpy as np
import pandas as pd
from fedot.core.data.data import InputData
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.tasks import Task, TaskTypesEnum


def init_input_data(X: pd.DataFrame, y: np.ndarray) -> InputData:
"""Method for initialization of InputData object from pandas DataFrame and numpy array with target values.
Args:
X: pandas DataFrame with features
y: numpy array with target values
Returns:
InputData object convenient for FEDOT framework
"""
is_multivariate_data = True if isinstance(X.iloc[0, 0], pd.Series) else False
if is_multivariate_data:
input_data = InputData(idx=np.arange(len(X)),
features=np.array(X.values.tolist()),
target=y.reshape(-1, 1),
task=Task(TaskTypesEnum.classification),
data_type=DataTypesEnum.image)
else:
input_data = InputData(idx=np.arange(len(X)),
features=X.values,
target=np.ravel(y).reshape(-1, 1),
task=Task(TaskTypesEnum.classification),
data_type=DataTypesEnum.table)

return input_data
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_builder import PipelineBuilder
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.quality_metrics_repository import ClassificationMetricsEnum
from fedot.core.repository.tasks import Task, TaskTypesEnum
from golem.core.tuning.sequential import SequentialTuner
from golem.core.tuning.simultaneous import SimultaneousTuner

from fedot_ind.api.utils.input_data import init_input_data
from fedot_ind.api.utils.path_lib import default_path_to_save_results
from fedot_ind.api.utils.saver_collections import ResultSaver
from fedot_ind.core.metrics.evaluation import PerformanceAnalyzer
from fedot_ind.core.operation.caching import DataCacher
from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels

np.random.seed(0)
Expand Down Expand Up @@ -70,47 +67,8 @@ def __init__(self, params: Optional[OperationParameters] = None):
self.logger.info(f'TimeSeriesClassifierPreset initialised with [{self.branch_nodes}] nodes and '
f'[{self.tuning_iters}] tuning iterations and [{self.tuning_timeout}] timeout')

def __check_multivariate_data(self, data: pd.DataFrame) -> bool:
"""Method for checking if the data is multivariate.
Args:
X: pandas DataFrame with features
Returns:
True if data is multivariate, False otherwise
"""
if isinstance(data.iloc[0, 0], pd.Series):
return True
else:
return False

def _init_input_data(self, X: pd.DataFrame, y: np.ndarray) -> InputData:
"""Method for initialization of InputData object from pandas DataFrame and numpy array with target values.
Args:
X: pandas DataFrame with features
y: numpy array with target values
Returns:
InputData object convenient for FEDOT framework
"""
is_multivariate_data = self.__check_multivariate_data(X)
if is_multivariate_data:
input_data = InputData(idx=np.arange(len(X)),
features=np.array(X.values.tolist()),
target=y.reshape(-1, 1),
task=Task(TaskTypesEnum.classification),
data_type=DataTypesEnum.image)
else:
input_data = InputData(idx=np.arange(len(X)),
features=X.values,
target=np.ravel(y).reshape(-1, 1),
task=Task(TaskTypesEnum.classification),
data_type=DataTypesEnum.table)

return input_data
return init_input_data(X, y)

def _build_pipeline(self):
"""
Expand Down Expand Up @@ -232,13 +190,6 @@ def predict(self, features: pd.DataFrame, target: np.array) -> dict:

test_data = self._init_input_data(features, target)
test_data_preprocessed = self.preprocessing_pipeline.root_node.predict(test_data)
# data_cacher = DataCacher()
# get unique hash of input data
# test_predict_hash = data_cacher.hash_info(data=features)
# compare it to existed hash
# if self.test_predict_hash != test_predict_hash:
# test_data = self._init_input_data(features, target)
# test_data_preprocessed = self.preprocessing_pipeline.root_node.predict(test_data)

if test_data.features.shape[0] == 1:
test_data_preprocessed.predict = np.squeeze(test_data_preprocessed.predict).reshape(1, -1)
Expand All @@ -255,16 +206,7 @@ def predict(self, features: pd.DataFrame, target: np.array) -> dict:

return self.prediction_label

# else:
# return self.prediction_label

def predict_proba(self, features, target) -> dict:
# data_cacher = DataCacher()
# # get unique hash of input data
# test_predict_hash = data_cacher.hash_info(data=features,
# obj_info_dict=self.__dict__)
# # compare it to existed hash
# if self.test_predict_hash != test_predict_hash:
test_data = self._init_input_data(features, target)
test_data_preprocessed = self.preprocessing_pipeline.root_node.predict(test_data)
self.test_data_preprocessed.predict = np.squeeze(test_data_preprocessed.predict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ def __init__(self, params: Optional[OperationParameters] = None):

def _get_basis(self, data):
basis = Either.insert(data).then(self._get_1d_basis if type(data) != list else self._get_multidim_basis).value

# if type(data) == list:
# basis = self._get_multidim_basis(data)
# else:
# basis = self._get_1d_basis(data)
return basis

def fit(self, data):
Expand Down
34 changes: 7 additions & 27 deletions fedot_ind/core/operation/transformation/basis/eigen_basis.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from typing import Optional, Tuple, TypeVar
from typing import Optional, Tuple, TypeVar

import numpy as np
import pandas as pd
import tensorly as tl
from fedot.core.data.data import InputData

from fedot.core.operations.operation_parameters import OperationParameters
from joblib import Parallel, delayed
from joblib import delayed, Parallel
from pymonad.either import Either
from pymonad.list import ListMonad
from scipy import stats
Expand All @@ -25,12 +23,12 @@


class EigenBasisImplementation(BasisDecompositionImplementation):
"""DataDriven basis
"""Eigen basis decomposition implementation
Example:
ts1 = np.random.rand(200)
ts2 = np.random.rand(200)
ts = [ts1, ts2]
bss = EigenBasisImplementation({'sv_selector': 'median', 'window_size': 30})
bss = EigenBasisImplementation({'window_size': 30})
basis_multi = bss._transform(ts)
basis_1d = bss._transform(ts1)
"""
Expand All @@ -41,11 +39,8 @@ def __init__(self, params: Optional[OperationParameters] = None):
self.low_rank_approximation = params.get('low_rank_approximation', True)
self.basis = None
self.SV_threshold = None
self.sv_selector = 'median'
self.svd_estimator = RSVDDecomposition()
self.logging_params.update({'WS': self.window_size,
'SV_selector': self.sv_selector,
})
self.logging_params.update({'WS': self.window_size})

def _combine_components(self, predict):
count = 0
Expand Down Expand Up @@ -77,36 +72,21 @@ def _transform(self, input_data: InputData) -> np.array:
features = np.array([series[~np.isnan(series)] for series in features])

if self.SV_threshold is None:
self.SV_threshold = self.get_threshold(data=features,
selector=self.sv_selector)
self.SV_threshold = self.get_threshold(data=features)
self.logging_params.update({'SV_thr': self.SV_threshold})

parallel = Parallel(n_jobs=self.n_processes, verbose=0, pre_dispatch="2*n_jobs")
v = parallel(delayed(self._transform_one_sample)(sample) for sample in features)
predict = np.array(v)
# new_shape = predict[0].shape[0]
#
# reduce_dimension = True
# while reduce_dimension:
# predict = self._combine_components(predict)
# if predict[0].shape[0] == new_shape or predict[0].shape[0] == 1:
# reduce_dimension = False
# new_shape = predict[0].shape[0]
# predict = self._clean_predict(np.array(v))
return predict

def get_threshold(self, data, selector: str):

selectors = {'median': stats.mode,
'mode': stats.mode}

def get_threshold(self, data) -> int:
svd_numbers = []
with tqdm(total=len(data), desc='SVD estimation') as pbar:
for signal in data:
svd_numbers.append(self._transform_one_sample(signal, svd_flag=True))
pbar.update(1)

return selectors[selector](svd_numbers).mode[0]
return stats.mode(svd_numbers).mode[0]

def _transform_one_sample(self, series: np.array, svd_flag: bool = False):
trajectory_transformer = HankelMatrix(time_series=series, window_size=self.window_size)
Expand Down
90 changes: 56 additions & 34 deletions fedot_ind/core/operation/transformation/data/eigen.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
import numpy as np
import pandas as pd
import copy
from typing import List, Tuple


def weighted_inner_product(F_i, F_j, window_length, ts_length):
# Calculate the weights
def weighted_inner_product(F_i: np.ndarray, F_j: np.ndarray, window_length: int, ts_length: int) -> float:
"""Calculate the weighted inner product of two vectors.
Args:
F_i: First vector.
F_j: Second vector.
window_length: Length of the window.
ts_length: Total length of the time series.
Returns:
Weighted inner product.
"""
first = list(np.arange(window_length) + 1)
second = [window_length] * (ts_length - 2*window_length)
second = [window_length] * (ts_length - 2 * window_length)
third = list(np.arange(window_length) + 1)[::-1]
w = np.array(first + second + third)
return w.dot(F_i * F_j)
return float(w.dot(F_i * F_j))


def calculate_matrix_norms(TS_comps: np.ndarray, window_length: int, ts_length: int) -> np.ndarray:
"""Calculate matrix norms for the time series components.
def calculate_matrix_norms(TS_comps, window_length, ts_length):
Args:
TS_comps: The time series components.
window_length: Length of the window.
ts_length: Total length of the time series.
Returns:
Array of matrix norms.
"""
r = []
for i in range(TS_comps.shape[1]):
r.append(weighted_inner_product(TS_comps[:, i], TS_comps[:, i], window_length, ts_length))
Expand All @@ -21,45 +40,48 @@ def calculate_matrix_norms(TS_comps, window_length, ts_length):
return F_wnorms


def calculate_corr_matrix(TS_comps, F_wnorms, window_length, ts_length):
Wcorr = np.identity(TS_comps.shape[1])
def calculate_corr_matrix(ts_comps: np.ndarray,
f_wnorms: np.ndarray,
window_length: int,
ts_length: int) -> Tuple[np.ndarray, List[int]]:
"""Calculate the w-correlation matrix for the time series components.
Args:
ts_comps: The time series components.
f_wnorms: Matrix norms of the time series components.
window_length: Length of the window.
ts_length: Total length of the time series.
Returns:
W-correlation matrix and a list of component indices.
"""
Wcorr = np.identity(ts_comps.shape[1])
for i in range(Wcorr.shape[0]):
for j in range(i + 1, Wcorr.shape[0]):
Wcorr[i, j] = abs(
weighted_inner_product(TS_comps[:, i], TS_comps[:, j], window_length, ts_length) *
F_wnorms[i] * F_wnorms[j])
weighted_inner_product(ts_comps[:, i], ts_comps[:, j], window_length, ts_length) *
f_wnorms[i] * f_wnorms[j])
Wcorr[j, i] = Wcorr[i, j]
return Wcorr, [i for i in range(Wcorr.shape[0])]


def combine_eigenvectors(TS_comps, window_length, correlation_level: float = 0.8):
"""Calculates the w-correlation matrix for the time series.
def combine_eigenvectors(ts_comps: np.ndarray, window_length: int) -> List[np.ndarray]:
"""Combine eigenvectors based on the w-correlation matrix for the time series.
Args:
TS_comps (np.ndarray): The time series components.
correlation_level (float): threshold value of Pearson correlation, using for merging eigenvectors.
ts_length (int): The length of TS .
window_length (int): The length of TS window.
ts_comps (np.ndarray): The time series components.
window_length (int): Length of the window.
Returns:
List[np.ndarray]: List of combined eigenvectors.
"""
combined_components = []
ts_length = TS_comps.shape[0]
# Calculated weighted norms
F_wnorms = calculate_matrix_norms(TS_comps, window_length, ts_length)

# Calculate Wcorr.
Wcorr, components = calculate_corr_matrix(TS_comps, F_wnorms, window_length, ts_length)

ts_length = ts_comps.shape[0]
F_wnorms = calculate_matrix_norms(ts_comps, window_length, ts_length)
Wcorr, components = calculate_corr_matrix(ts_comps, F_wnorms, window_length, ts_length)
combined_components = []
current_group = []
for i in range(len(components)):
if i == 0 or Wcorr[i, i-1] > correlation_level:
current_group.append(TS_comps[:, i])
else:
combined_components.append(np.array(current_group).sum(axis=0))
current_group = [TS_comps[:, i]]



combined_components.append(np.array(current_group).sum(axis=0))
current_group = [ts_comps[:, i]]
combined_components.append(np.array(current_group).sum(axis=0))
return combined_components
2 changes: 0 additions & 2 deletions fedot_ind/core/operation/transformation/data/hankel.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __check_windows_length(self):
if not 2 <= self.__window_length <= self.__ts_length / 2:
self.__window_length = int(self.__ts_length / 3)


def __convert_ts_to_array(self):
if type(self.__time_series) == pd.DataFrame:
self.__time_series = self.__time_series.values.reshape(-1, 1)
Expand All @@ -44,7 +43,6 @@ def __convert_ts_to_array(self):
else:
self.__time_series = self.__time_series


def __get_trajectory_matrix(self):
if len(self.__time_series.shape) > 1:
return [hankel(time_series[:self.__window_length + 1], time_series[self.__window_length:]) for time_series
Expand Down
Loading

0 comments on commit 0912891

Please sign in to comment.