Skip to content

Commit

Permalink
refactored recur extractor, added test for basises, cleaned up search…
Browse files Browse the repository at this point in the history
… space
  • Loading branch information
technocreep committed Sep 22, 2023
1 parent 0912891 commit 0b3aa00
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 396 deletions.
64 changes: 31 additions & 33 deletions fedot_ind/core/models/recurrence/reccurence_extractor.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,43 @@
from functools import partial
from multiprocessing import Pool
from typing import Optional

import numpy as np
from fedot.core.data.data import InputData
from fedot.core.repository.dataset_types import DataTypesEnum
from sklearn.preprocessing import StandardScaler
from fedot.core.operations.operation_parameters import OperationParameters
from joblib import Parallel, delayed
from tqdm import tqdm
from fedot_ind.core.operation.transformation.data.hankel import HankelMatrix
from fedot.core.repository.dataset_types import DataTypesEnum

from fedot_ind.core.metrics.metrics_implementation import *
from fedot_ind.core.models.base_extractor import BaseExtractor
from fedot_ind.core.models.recurrence.sequences import RecurrenceFeatureExtractor
from fedot_ind.core.operation.transformation.data.hankel import HankelMatrix
from fedot_ind.core.operation.transformation.data.kernel_matrix import TSTransformer
from fedot_ind.core.models.recurrence.sequences import ReccurenceFeaturesExtractor


class RecurrenceExtractor(BaseExtractor):
"""Class responsible for wavelet feature generator experiment.
Args:
window_mode: boolean flag - if True, window mode is used. Defaults to False.
use_cache: boolean flag - if True, cache is used. Defaults to False.
Attributes:
transformer: TSTransformer object.
self.extractor: ReccurenceExtractor object.
train_feats: train features.
test_feats: test features.
self.extractor: RecurrenceExtractor object.
self.window_mode: bool, if True, then the window mode is used.
self.min_signal_ratio: float, the minimum signal ratio.
self.max_signal_ratio: float, the maximum signal ratio.
self.rec_metric: str, the metric for calculating the recurrence matrix.
self.window_size: int, the window size.
Example:
from fedot.core.pipelines.pipeline_builder import PipelineBuilder
from examples.fedot.fedot_ex import init_input_data
from fedot_ind.core.architecture.preprocessing.DatasetLoader import DataLoader
from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels
train_data, test_data = DataLoader(dataset_name='Ham').load_data()
with IndustrialModels():
pipeline = PipelineBuilder().add_node('eigen_basis').add_node('recurrence_extractor').add_node(
'rf').build()
input_data = init_input_data(train_data[0], train_data[1])
pipeline.fit(input_data)
features = pipeline.predict(input_data)
print(features)
To use this operation you can create pipeline as follows::
from fedot.core.pipelines.pipeline_builder import PipelineBuilder
from examples.fedot.fedot_ex import init_input_data
from fedot_ind.core.architecture.preprocessing.DatasetLoader import DataLoader
from fedot_ind.core.repository.initializer_industrial_models import IndustrialModels
train_data, test_data = DataLoader(dataset_name='Ham').load_data()
with IndustrialModels():
pipeline = PipelineBuilder().add_node('eigen_basis').add_node('recurrence_extractor').add_node(
'rf').build()
input_data = init_input_data(train_data[0], train_data[1])
pipeline.fit(input_data)
features = pipeline.predict(input_data)
print(features)
"""

def __init__(self, params: Optional[OperationParameters] = None):
Expand All @@ -50,9 +48,9 @@ def __init__(self, params: Optional[OperationParameters] = None):
self.min_signal_ratio = params.get('min_signal_ratio')
self.max_signal_ratio = params.get('max_signal_ratio')
self.rec_metric = params.get('rec_metric')
self.window_size = 10
self.window_size = params.get('window_size')
self.transformer = TSTransformer
self.extractor = ReccurenceFeaturesExtractor
self.extractor = RecurrenceFeatureExtractor

def _generate_features_from_ts(self, ts: np.array):

Expand All @@ -68,7 +66,7 @@ def _generate_features_from_ts(self, ts: np.array):
feature_df = specter.ts_to_recurrence_matrix()

if not self.image_mode:
feature_df = self.extractor(recurrence_matrix=feature_df).recurrence_quantification_analysis()
feature_df = self.extractor(recurrence_matrix=feature_df).quantification_analysis()

features = np.nan_to_num(np.array(list(feature_df.values())))
recurrence_features = InputData(idx=np.arange(len(features)),
Expand All @@ -79,7 +77,7 @@ def _generate_features_from_ts(self, ts: np.array):
supplementary_data={'feature_name': list(feature_df.keys())})
return recurrence_features

def generate_reccurence_features(self, ts: np.array) -> InputData:
def generate_recurrence_features(self, ts: np.array) -> InputData:

if len(ts.shape) == 1:
aggregation_df = self._generate_features_from_ts(ts)
Expand All @@ -90,4 +88,4 @@ def generate_reccurence_features(self, ts: np.array) -> InputData:

def generate_features_from_ts(self, ts_data: np.array,
dataset_name: str = None):
return self.generate_reccurence_features(ts=ts_data)
return self.generate_recurrence_features(ts=ts_data)
264 changes: 86 additions & 178 deletions fedot_ind/core/models/recurrence/sequences.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,63 @@
from __future__ import division, print_function

import numpy as np


class ReccurenceFeaturesExtractor:
def __init__(self,
recurrence_matrix: np.ndarray = None):
class RecurrenceFeatureExtractor:
def __init__(self, recurrence_matrix: np.ndarray = None):
self.recurrence_matrix = recurrence_matrix

def calculate_DFD(self, number_of_vectors):
# Calculating the diagonal frequency distribution - P(l)
def quantification_analysis(self, MDL: int = 3, MVL: int = 3, MWVL: int = 2):

n_vectors = self.recurrence_matrix.shape[0]
recurrence_rate = float(np.sum(self.recurrence_matrix)) / np.power(n_vectors, 2)

diagonal_frequency_dist = self.calculate_diagonal_frequency(number_of_vectors=n_vectors)
vertical_frequency_dist = self.calculate_vertical_frequency(number_of_vectors=n_vectors, not_white=1)
white_vertical_frequency_dist = self.calculate_vertical_frequency(number_of_vectors=n_vectors,
not_white=0)

determinism = self.laminarity_or_determinism(MDL, n_vectors, diagonal_frequency_dist, lam=False)
laminarity = self.laminarity_or_determinism(MVL, n_vectors, vertical_frequency_dist, lam=True)

average_diagonal_line_length = self.average_line_length(MDL, n_vectors, diagonal_frequency_dist)
average_vertical_line_length = self.average_line_length(MVL, n_vectors, vertical_frequency_dist)
average_white_vertical_line_length = self.average_line_length(MWVL, n_vectors, white_vertical_frequency_dist)

longest_diagonal_line_length = self.longest_line_length(diagonal_frequency_dist, n_vectors, diag=True)
longest_vertical_line_length = self.longest_line_length(vertical_frequency_dist, n_vectors, diag=False)
longest_white_vertical_line_length = self.longest_line_length(white_vertical_frequency_dist,
n_vectors, diag=False)

entropy_diagonal_lines = self.entropy_lines(MDL, n_vectors, diagonal_frequency_dist, diag=True)
entropy_vertical_lines = self.entropy_lines(MVL, n_vectors, vertical_frequency_dist, diag=False)
entropy_white_vertical_lines = self.entropy_lines(MWVL, n_vectors,
white_vertical_frequency_dist, diag=False)

return {'RR': recurrence_rate, 'DET': determinism, 'ADLL': average_diagonal_line_length,
'LDLL': longest_diagonal_line_length, 'DIV': 1. / longest_diagonal_line_length,
'EDL': entropy_diagonal_lines, 'LAM': laminarity, 'AVLL': average_vertical_line_length,
'LVLL': longest_vertical_line_length, 'EVL': entropy_vertical_lines,
'AWLL': average_white_vertical_line_length, 'LWLL': longest_white_vertical_line_length,
'EWLL': entropy_white_vertical_lines, 'RDRR': determinism / recurrence_rate,
'RLD': laminarity / determinism}

def calculate_vertical_frequency(self, number_of_vectors, not_white: int):
vertical_frequency_distribution = np.zeros(number_of_vectors + 1)
for i in range(number_of_vectors):
vertical_line_length = 0
for j in range(number_of_vectors):
if self.recurrence_matrix[i, j] == not_white:
vertical_line_length += 1
if j == (number_of_vectors - 1):
vertical_frequency_distribution[vertical_line_length] += 1.0
else:
if vertical_line_length != 0:
vertical_frequency_distribution[vertical_line_length] += 1.0
vertical_line_length = 0
return vertical_frequency_distribution

def calculate_diagonal_frequency(self, number_of_vectors):
diagonal_frequency_distribution = np.zeros(number_of_vectors + 1)
for i in range(number_of_vectors - 1, -1, -1):
diagonal_line_length = 0
Expand Down Expand Up @@ -36,176 +85,35 @@ def calculate_DFD(self, number_of_vectors):
diagonal_line_length = 0
return diagonal_frequency_distribution

def calculate_VFD(self, number_of_vectors):
# Calculating the vertical frequency distribution - P(v)
vertical_frequency_distribution = np.zeros(number_of_vectors + 1)
for i in range(number_of_vectors):
vertical_line_length = 0
for j in range(number_of_vectors):
if self.recurrence_matrix[i, j] == 1:
vertical_line_length += 1
if j == (number_of_vectors - 1):
vertical_frequency_distribution[vertical_line_length] += 1.0
else:
if vertical_line_length != 0:
vertical_frequency_distribution[vertical_line_length] += 1.0
vertical_line_length = 0
return vertical_frequency_distribution

def calculate_WVFD(self, number_of_vectors):
# Calculating the white vertical frequency distribution - P(w)
white_vertical_frequency_distribution = np.zeros(number_of_vectors + 1)
for i in range(number_of_vectors):
white_vertical_line_length = 0
for j in range(number_of_vectors):
if self.recurrence_matrix[i, j] == 0:
white_vertical_line_length += 1
if j == (number_of_vectors - 1):
white_vertical_frequency_distribution[white_vertical_line_length] += 1.0
else:
if white_vertical_line_length != 0:
white_vertical_frequency_distribution[white_vertical_line_length] += 1.0
white_vertical_line_length = 0
return white_vertical_frequency_distribution

def calculate_EVWL(self, white_vertical_frequency_distribution, MWVL, number_of_vectors):
longest_white_vertical_line_length = 1
# Calculating the longest white vertical line length - Wmax
for w in range(number_of_vectors, 0, -1):
if white_vertical_frequency_distribution[w] != 0:
longest_white_vertical_line_length = w
break

# Calculating the entropy white vertical lines - Wentr
sum_white_vertical_frequency_distribution = float(
np.sum(white_vertical_frequency_distribution[MWVL:]))
entropy_white_vertical_lines = 0
for w in range(MWVL, number_of_vectors + 1):
if white_vertical_frequency_distribution[w] != 0:
entropy_white_vertical_lines += (white_vertical_frequency_distribution[
w] / sum_white_vertical_frequency_distribution) * np.log(
white_vertical_frequency_distribution[w] / sum_white_vertical_frequency_distribution)
entropy_white_vertical_lines *= -1

return entropy_white_vertical_lines, longest_white_vertical_line_length

def recurrence_quantification_analysis(self,
MDL=3,
MVL=3,
MWVL=2):
# Calculating the number of states - N
number_of_vectors = self.recurrence_matrix.shape[0]

# Calculating the recurrence rate - RR
recurrence_rate = float(np.sum(self.recurrence_matrix)) / np.power(number_of_vectors, 2)

diagonal_frequency_distribution = self.calculate_DFD(
number_of_vectors=number_of_vectors)

vertical_frequency_distribution = self.calculate_VFD(number_of_vectors=number_of_vectors)

white_vertical_frequency_distribution = self.calculate_WVFD(number_of_vectors=number_of_vectors)

# Calculating the determinism - DET
numerator = np.sum(
[l * diagonal_frequency_distribution[l] for l in range(MDL, number_of_vectors)])
denominator = np.sum([l * diagonal_frequency_distribution[l] for l in range(1, number_of_vectors)])
determinism = numerator / denominator

# Calculating the average diagonal line length - L
numerator = np.sum(
[l * diagonal_frequency_distribution[l] for l in range(MDL, number_of_vectors)])
denominator = np.sum(
[diagonal_frequency_distribution[l] for l in range(MDL, number_of_vectors)])
average_diagonal_line_length = numerator / denominator
longest_diagonal_line_length = 1

# Calculating the longest diagonal line length - Lmax
for l in range(number_of_vectors - 1, 0, -1):
if diagonal_frequency_distribution[l] != 0:
longest_diagonal_line_length = l
break

# Calculating the divergence - DIV
divergence = 1. / longest_diagonal_line_length

# Calculating the entropy diagonal lines - Lentr
sum_diagonal_frequency_distribution = float(
np.sum(diagonal_frequency_distribution[MDL:-1]))
entropy_diagonal_lines = 0
for l in range(MDL, number_of_vectors):
if diagonal_frequency_distribution[l] != 0:
entropy_diagonal_lines += (diagonal_frequency_distribution[
l] / sum_diagonal_frequency_distribution) * np.log(
diagonal_frequency_distribution[l] / sum_diagonal_frequency_distribution)
entropy_diagonal_lines *= -1

# Calculating the ratio determinism_recurrence - DET/RR
ratio_determinism_recurrence_rate = determinism / recurrence_rate

# Calculating the laminarity - LAM
numerator = np.sum([v * vertical_frequency_distribution[v] for v in
range(MVL, number_of_vectors + 1)])
denominator = np.sum([v * vertical_frequency_distribution[v] for v in range(1, number_of_vectors + 1)])
laminarity = numerator / denominator

# Calculating the average vertical line length - V
numerator = np.sum([v * vertical_frequency_distribution[v] for v in
range(MVL, number_of_vectors + 1)])
denominator = np.sum(
[vertical_frequency_distribution[v] for v in range(MVL, number_of_vectors + 1)])
average_vertical_line_length = numerator / denominator

longest_vertical_line_length = 1
# Calculating the longest vertical line length - Vmax
for v in range(number_of_vectors, 0, -1):
if vertical_frequency_distribution[v] != 0:
longest_vertical_line_length = v
break

# Calculating the entropy vertical lines - Ventr
sum_vertical_frequency_distribution = float(
np.sum(vertical_frequency_distribution[MVL:]))
entropy_vertical_lines = 0
for v in range(MVL, number_of_vectors + 1):
if vertical_frequency_distribution[v] != 0:
entropy_vertical_lines += (vertical_frequency_distribution[
v] / sum_vertical_frequency_distribution) * np.log(
vertical_frequency_distribution[v] / sum_vertical_frequency_distribution)
entropy_vertical_lines *= -1

# Calculatint the ratio laminarity_determinism - LAM/DET
ratio_laminarity_determinism = laminarity / determinism

# Calculating the average white vertical line length - W
numerator = np.sum([w * white_vertical_frequency_distribution[w] for w in
range(MWVL, number_of_vectors + 1)])
denominator = np.sum([white_vertical_frequency_distribution[w] for w in
range(MWVL, number_of_vectors + 1)])
average_white_vertical_line_length = numerator / denominator

entropy_white_vertical_lines, longest_white_vertical_line_length = self.calculate_EVWL(
white_vertical_frequency_distribution=white_vertical_frequency_distribution,
MWVL=MWVL,
number_of_vectors=number_of_vectors)

feature_dict = {
# 'DFD': diagonal_frequency_distribution,
# 'VFD': vertical_frequency_distribution,
# 'WVFD': white_vertical_frequency_distribution,
'RR': recurrence_rate,
'DET': determinism,
'ADLL': average_diagonal_line_length,
'LDLL': longest_diagonal_line_length,
'Div': divergence,
'EDL': entropy_diagonal_lines,
'Lam': laminarity,
'AVLL': average_vertical_line_length,
'LVLL': longest_vertical_line_length,
'EVL': entropy_vertical_lines,
'AWLL': average_white_vertical_line_length,
'LWLL': longest_white_vertical_line_length,
'EWLL': entropy_white_vertical_lines,
'RDRR': ratio_determinism_recurrence_rate,
'RLD': ratio_laminarity_determinism}
return feature_dict
def entropy_lines(self, factor, number_of_vectors, distribution, diag: bool):
if diag:
sum_frequency_distribution = float(np.sum(distribution[factor:-1]))
else:
number_of_vectors = number_of_vectors + 1
sum_frequency_distribution = float(np.sum(distribution[factor:]))

entropy_lines = 0
for i in range(factor, number_of_vectors):
if distribution[i] != 0:
entropy_lines += (distribution[i] / sum_frequency_distribution) * \
np.log(distribution[i] / sum_frequency_distribution)
return -entropy_lines

def laminarity_or_determinism(self, factor, number_of_vectors, distribution, lam: bool):
if lam:
number_of_vectors = number_of_vectors + 1
numerator = np.sum([i * distribution[i] for i in range(factor, number_of_vectors)])
denominator = np.sum([i * distribution[i] for i in range(1, number_of_vectors)])
return numerator / denominator

def longest_line_length(self, frequency_distribution, number_of_vectors, diag: bool):
longest_line_length = 1
for i in range(number_of_vectors, 0, -1):
if frequency_distribution[i] != 0:
return i
return longest_line_length

def average_line_length(self, factor, number_of_vectors, distribution):
numerator = np.sum([i * distribution[i] for i in range(factor, number_of_vectors + 1)])
denominator = np.sum([distribution[i] for i in range(factor, number_of_vectors + 1)])
return numerator / denominator
Loading

0 comments on commit 0b3aa00

Please sign in to comment.