Skip to content

Commit

Permalink
lite rearrangement of markov switching models
Browse files Browse the repository at this point in the history
  • Loading branch information
leostre committed May 23, 2024
1 parent 2a67d69 commit 6e814dc
Showing 1 changed file with 160 additions and 92 deletions.
252 changes: 160 additions & 92 deletions fedot_ind/core/models/ts_forecasting/markov_ar.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import statsmodels.api as sm
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.regime_switching.markov_autoregression import MarkovAutoregression
from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression

from statsmodels.tsa.exponential_smoothing.ets import ETSModel

from fedot.core.data.data import InputData, OutputData
Expand All @@ -19,8 +21,7 @@

from sklearn.preprocessing import StandardScaler

class MarkovAR(ModelImplementation):

class MarkovSwitchBase(ModelImplementation):
def __init__(self, params: OperationParameters):
super().__init__(params)
self.autoreg = None
Expand All @@ -30,25 +31,46 @@ def __init__(self, params: OperationParameters):
self.scope = None
self.k_regimes = params.get('k_regimes', 2)
self.order = params.get('order', 2)
self.trend = params.get('trend', 'ct')

def fit(self, input_data):
self.trend = params.get('trend', 'c')
self.switching_variance = params.get('switching_variance', True)

def _init_fit(self, source_ts, exog):
raise NotImplemented

def _prepare_data(self, input_data: InputData, idx_target: int=None, vars_first: bool=True)-> tuple:
features = input_data.features[...] #copy
if len(features.shape) == 1:
# univariate
features = features.reshape(-1, 1) if not vars_first else features.reshape(1, -1)
if vars_first: # if true, assuming features are n_variates x series_length
features = features.T
# features: series_length x n_variates

# swap so target is last column
if idx_target is None and input_data.task.task_type == 'ts_forecasting': # then target is not included in features of input_data
features = np.vstack([features, input_data.target.reshape(1, -1)],)
idx_target = features.shape[1] - 1
else:
idx_target = idx_target or 0
features[:, idx_target], features[:, -1] = features[:, -1], features[:, idx_target]
features = self.scaler.fit_transform(features)
endog = features[:, -1]
exog = features[:, :-1]
return endog, exog


def fit(self, input_data, idx_target=None, vars_first=True):
""" Class fit ar model on data
:param input_data: data with features, target and ids to process
"""

source_ts = self.scaler.fit_transform(input_data.features.reshape(-1, 1)).flatten()

self.actual_ts_len = len(source_ts)

self.autoreg = MarkovAutoregression(source_ts,
k_regimes=self.k_regimes,
order=self.order,
trend = self.trend,
switching_variance=False).fit()

endog, exog = self._prepare_data(input_data, idx_target=idx_target, vars_first=vars_first)
# self.scaler.fit_transform(input_data.features.reshape(-1, 1)).flatten()
self.actual_ts_len = len(endog)
self.autoreg = self._init_fit(endog, exog)
return self.autoreg



def predict(self, input_data):
""" Method for time series prediction on forecast length
Expand All @@ -73,6 +95,7 @@ def predict(self, input_data):
predict=predict,
data_type=DataTypesEnum.table)
return output_data


def predict_for_fit(self, input_data: InputData) -> OutputData:
input_data = copy(input_data)
Expand All @@ -99,6 +122,42 @@ def predict_for_fit(self, input_data: InputData) -> OutputData:
predict=predict,
data_type=DataTypesEnum.table)
return output_data

def handle_new_data(self, input_data: InputData):
"""
Method to update x samples inside a model (used when we want to use old model to a new data)
:param input_data: new input_data
"""
if input_data.idx[0] > self.actual_ts_len:
self.autoreg.model.endog = input_data.features[-self.actual_ts_len:]
self.autoreg.model._setup_regressors()

class MarkovAR(MarkovSwitchBase):
def __init__(self, params: OperationParameters):
super().__init__(params)

def _init_fit(self, endog, exog=None):
return MarkovAutoregression(endog,
k_regimes=self.k_regimes,
order=self.order,
trend = self.trend,
exog=exog,
switching_variance=False).fit()

class MarkovReg(MarkovSwitchBase):
def __init__(self, params: OperationParameters):
super().__init__(params)

def _init_fit(self, endog, exog=None):
return MarkovRegression(endog,
k_regimes=self.k_regimes,
order=self.order,
trend = self.trend,
exog=exog,
switching_variance=self.switching_variance).fit()



# def predict_for_fit(self, input_data: InputData) -> OutputData:
# parameters = input_data.task.task_params
Expand Down Expand Up @@ -132,82 +191,91 @@ def predict_for_fit(self, input_data: InputData) -> OutputData:
# data_type=DataTypesEnum.table)
# return output_data

def handle_new_data(self, input_data: InputData):
"""
Method to update x samples inside a model (used when we want to use old model to a new data)
:param input_data: new input_data
"""
if input_data.idx[0] > self.actual_ts_len:
self.autoreg.model.endog = input_data.features[-self.actual_ts_len:]
self.autoreg.model._setup_regressors()


class ExpSmoothingImplementation(ModelImplementation):
""" Exponential smoothing implementation from statsmodels """

def __init__(self, params: OperationParameters):
super().__init__(params)
self.model = None
if self.params.get("seasonal"):
self.seasonal_periods = int(self.params.get("seasonal_periods"))
else:
self.seasonal_periods = None

def fit(self, input_data):
self.model = ETSModel(
input_data.features.astype("float64"),
error=self.params.get("error"),
trend=self.params.get("trend"),
seasonal=self.params.get("seasonal"),
damped_trend=self.params.get("damped_trend") if self.params.get("trend") else None,
seasonal_periods=self.seasonal_periods
)
self.model = self.model.fit(disp=False)
return self.model

def predict(self, input_data):
input_data = copy(input_data)
idx = input_data.idx

start_id = idx[0]
end_id = idx[-1]
predictions = self.model.predict(start=start_id,
end=end_id)
predict = predictions
predict = np.array(predict).reshape(1, -1)
new_idx = np.arange(start_id, end_id + 1)

input_data.idx = new_idx

output_data = self._convert_to_output(input_data,
predict=predict,
data_type=DataTypesEnum.table)
return output_data

def predict_for_fit(self, input_data: InputData) -> OutputData:
input_data = copy(input_data)
parameters = input_data.task.task_params
forecast_length = parameters.forecast_length
idx = input_data.idx
target = input_data.target

# Indexing for statsmodels is different
start_id = idx[0]
end_id = idx[-1]
predictions = self.model.predict(start=start_id,
end=end_id)
_, predict = ts_to_table(idx=idx,
time_series=predictions,
window_size=forecast_length)
new_idx, target_columns = ts_to_table(idx=idx,
time_series=target,
window_size=forecast_length)

input_data.idx = new_idx
input_data.target = target_columns

output_data = self._convert_to_output(input_data,
predict=predict,
data_type=DataTypesEnum.table)
return output_data
# class ExpSmoothingImplementation(ModelImplementation):
# """ Exponential smoothing implementation from statsmodels """

# def __init__(self, params: OperationParameters):
# super().__init__(params)
# self.model = None
# if self.params.get("seasonal"):
# self.seasonal_periods = int(self.params.get("seasonal_periods"))
# else:
# self.seasonal_periods = None

# def fit(self, input_data):
# self.model = ETSModel(
# input_data.features.astype("float64"),
# error=self.params.get("error"),
# trend=self.params.get("trend"),
# seasonal=self.params.get("seasonal"),
# damped_trend=self.params.get("damped_trend") if self.params.get("trend") else None,
# seasonal_periods=self.seasonal_periods
# )
# self.model = self.model.fit(disp=False)
# return self.model

# def predict(self, input_data):
# input_data = copy(input_data)
# idx = input_data.idx

# start_id = idx[0]
# end_id = idx[-1]
# predictions = self.model.predict(start=start_id,
# end=end_id)
# predict = predictions
# predict = np.array(predict).reshape(1, -1)
# new_idx = np.arange(start_id, end_id + 1)

# input_data.idx = new_idx

# output_data = self._convert_to_output(input_data,
# predict=predict,
# data_type=DataTypesEnum.table)
# return output_data

# def predict_for_fit(self, input_data: InputData) -> OutputData:
# input_data = copy(input_data)
# parameters = input_data.task.task_params
# forecast_length = parameters.forecast_length
# idx = input_data.idx
# target = input_data.target

# # Indexing for statsmodels is different
# start_id = idx[0]
# end_id = idx[-1]
# predictions = self.model.predict(start=start_id,
# end=end_id)
# _, predict = ts_to_table(idx=idx,
# time_series=predictions,
# window_size=forecast_length)
# new_idx, target_columns = ts_to_table(idx=idx,
# time_series=target,
# window_size=forecast_length)

# input_data.idx = new_idx
# input_data.target = target_columns

# output_data = self._convert_to_output(input_data,
# predict=predict,
# data_type=DataTypesEnum.table)
# return output_data

# @staticmethod
# def extract_transition_probabilities(fitted_autoreg, as_series=False):
# k = fitted_autoreg.k_regimes
# ps = fitted_autoreg.params.iloc[: k * (k - 1)]
# rest_ps = 1 - ps.values.reshape(k, -1).sum(1)
# rest_ps[rest_ps < 0] = 0 # computational errors sometime lead to sum(probabilities) > 1
# ps = np.hstack([ps.values, rest_ps])
# if not as_series:
# return ps
# else:
# index = []
# for i in range(k):
# for j in range(k):
# index.append(f'p[{j}->{i}]')
# res = pd.Series(ps, index=index)
# return res

0 comments on commit 6e814dc

Please sign in to comment.