Skip to content

Commit

Permalink
basic oos for markov, not implemented probabilities change
Browse files Browse the repository at this point in the history
  • Loading branch information
leostre committed Jun 5, 2024
1 parent 2129988 commit 248deea
Showing 1 changed file with 117 additions and 190 deletions.
307 changes: 117 additions & 190 deletions fedot_ind/core/models/ts_forecasting/markov_ar.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,38 @@ def _filling_gaps(res):
return res


class MarkovSwitchBase(ModelImplementation):
class MarkovReg(ModelImplementation):
def __init__(self, params: OperationParameters):
super().__init__(params)
self.model = None
self.actual_ts_len = None
self.scaler = StandardScaler() if params.get('scaler', 'standard') else _BoxCoxTransformer()
self.scaler = None # StandardScaler() if params.get('scaler', 'standard') else _BoxCoxTransformer()
self.lambda_param = None
self.scope = None
self.k_regimes = params.get('k_regimes', 2)
self.trend = params.get('trend', 'c')
self.max_k_regimes = params.get('max_k_regimes', 5)
self.k_regimes = 1
self.trend = params.get('trend', 'ct')
self.switching_variance = params.get('switching_variance', True)
self.switching_trend = params.get('switching_trend', True)
self.forecast_fn = getattr(self, {
'ct': '_ct_forecasting',
'c': '_c_forecasting',
't': '_t_forecasting'
}[self.trend]
)
self.forecast_length

def _init_fit(self, source_ts, exog):
raise NotImplemented
def _init_fit(self, endog, exog=None):
params = {
'switching_trend': self.switching_trend,
'switching_variance': self.switching_variance,
'trend': self.trend
}
fitted_model = self._choose_model(endog, model=MarkovRegression,
max_k=self.max_k_regimes, exog=None,
**params)
self.k_regimes = fitted_model.k_regimes
return fitted_model

def _prepare_data(self, input_data: InputData, idx_target: int=None, vars_first: bool=True)-> tuple:
features = input_data.features[...] #copy
Expand All @@ -106,7 +124,8 @@ def _prepare_data(self, input_data: InputData, idx_target: int=None, vars_first:
else:
idx_target = idx_target or 0
features[:, idx_target], features[:, -1] = features[:, -1], features[:, idx_target]
features = self.scaler.fit_transform(features)
if self.scaler is not None:
features = self.scaler.fit_transform(features)
endog = features[:, -1]
exog = features[:, :-1]
return endog, exog
Expand All @@ -118,34 +137,109 @@ def fit(self, input_data, idx_target=None, vars_first=True):
:param input_data: data with features, target and ids to process
"""
endog, exog = self._prepare_data(input_data, idx_target=idx_target, vars_first=vars_first)
# self.scaler.fit_transform(input_data.features.reshape(-1, 1)).flatten()
self.actual_ts_len = len(endog)
self.model = self._init_fit(endog, exog)
return self.model

def _choose_model(self, endog, model=None, max_k=5, **params):
assert max_k >= 1, 'k_regimes can\'t be less than 1'
if not model:
model = MarkovRegression
fitted_model = None
for i in range(2, max_k + 1):
try:
fitted_model = model(endog, k_regimes=i, **params).fit()
except Exception as ex:
print(type(ex))
continue
if 'nan' not in str(fitted_model.summary()):
break
else:
fitted_model = None
if fitted_model is None:
raise RuntimeError('Model did not converge!')
return fitted_model

def _forecast(self, forecast_length, initial_state):
fitted_model = self.model
tr_mtr = fitted_model.regime_transition[..., -1]
regimes = np.arange(fitted_model.k_regimes)
states = [initial_state]
for i in range(forecast_length):
states.append(
np.random.choice(regimes, size=1, p=tr_mtr[:, states[-1]].flatten())
)
states = np.array(states[1:]).flatten()

forecast = self.forecast_fn(states, forecast_length)
return forecast

def _ct_forecasting(self, states, forecast_length):
fitted_model = self.model

slopes = self.parse('slope')[states]
consts = self.parse('const')[states]

start_ind = fitted_model.nobs
index = np.arange(start_ind, start_ind + forecast_length)
return slopes * index + consts

def predict(self, input_data):
def parse(self, param_type:str):
p_idx = {
'ct': {'const': 0, 'slope': 1, 'sigma2': 2},
'c': {'const': 0, 'sigma2': 1},
't': {'slope': 0, 'sigma2': 1}
}
mr = self.model
idx = p_idx[self.trend][param_type]
tables = mr.summary().tables
return np.array([float(tables[i + 1].data[idx + 1][1]) for i in range(mr.k_regimes)])

def _t_forecasting(self):
#TODO
raise NotImplemented

def _c_forecasting(self):
#TODO
raise NotImplemented

def last_regime(self):
fitted_model = self.model
last_regime = np.argmax(fitted_model.smoothed_marginal_probabilities[-2]) # or -1?
return last_regime

def _probabilities_forecast(self, *args, **kwargs):
raise NotImplemented('We did\'t find any approach to get out-of-sample marginal_probabilities yet!')

def predict(self, test_data: InputData, output_mode='predictions'):
""" Method for time series prediction on forecast length
:param input_data: data with features, target and ids to process
:return output_data: output data with smoothed time series
"""
# input_data = copy(input_data)
parameters = input_data.task.task_params
forecast_length = parameters.forecast_length
test_data = copy(test_data)

# in case in(out) sample forecasting
self.handle_new_data(input_data)
start_id = self.actual_ts_len
end_id = start_id + forecast_length - 1

predicted = MSARExtension(self.model).predict_out_of_sample()
parameters = test_data.task.task_params
if hasattr(parameters, 'forecast_length'): # erase for production
forecast_length = parameters.forecast_length
else:
forecast = self.forecast_length
initial_state = self.last_regime()

predict = self.scaler.inverse_transform(np.array([predicted]).ravel().reshape(1, -1))
if output_mode == predictions:
forecast = self._forecast(forecast_length, initial_state)
elif output_mode == 'marginal_probabilities':
forecast = self._probabilities_forecast(forecast_length, initial_state)
else:
raise ValueError('Unknown output mode!')
# while scaling not needed
predict = forecast
# predict = self.scaler.inverse_transform(np.array([predicted]).ravel().reshape(1, -1))

output_data = self._convert_to_output(input_data,
output_data = self._convert_to_output(test_data,
predict=predict,
data_type=DataTypesEnum.table)

return output_data


Expand Down Expand Up @@ -175,47 +269,8 @@ def predict_for_fit(self, input_data: InputData) -> OutputData:
data_type=DataTypesEnum.table)
return output_data

def _get_out_of_sample(self, state: int, fitted_model, offset: int=0, noisy=False):
# assert trend == 'ct'
table = fitted_model.summary().tables[1 + state].data
const = float(table[1][1])
x1 = float(table[2][1])

noise = 0
if self.switching_variance and noisy:
sigma2 = float(table[3][1])
noise = np.random.normal(0, np.sqrt(sigma2), 1).item()

pred = const + (fitted_model.nobs + offset) * x1 + noise
return pred

@staticmethod
def _get_next_state(fitted_model):
return np.random.choice(np.arange(fitted_model.k_regimes),
size=1,
p=fitted_model.predicted_marginal_probabilities[-1, :]
).item()

def forecast(self, horizon, noisy=False, max_iter=20):
model = self.model
endog = model.data.endog
preds = []
while max_iter:
max_iter -= 1
try:
state = self._get_next_state(model)
pred = self._get_out_of_sample(state=state, fitted_model=model, offset=0, noisy=noisy)
endog = np.append(endog, pred)
model = self._init_fit(endog, None)
preds.append(pred)
if len(preds) == horizon:
break
except:
print('SVD failed to converge')
return preds


class MarkovAR(MarkovSwitchBase):
class MarkovAR(MarkovReg):
def __init__(self, params: OperationParameters):
super().__init__(params)
self.order = params.get('order', 2)
Expand All @@ -229,136 +284,8 @@ def _init_fit(self, endog, exog=None):
exog=exog,
switching_variance=False).fit()

class MarkovReg(MarkovSwitchBase):
def __init__(self, params: OperationParameters):
super().__init__(params)


def _init_fit(self, endog, exog=None):
return MarkovRegression(endog,
k_regimes=self.k_regimes,
trend = self.trend,
exog=exog,
switching_variance=self.switching_variance).fit()



# def predict_for_fit(self, input_data: InputData) -> OutputData:
# parameters = input_data.task.task_params
# forecast_length = parameters.forecast_length
# idx = input_data.idx
# target = input_data.target

# fitted_values = self.autoreg.predict(start=idx[0], end=idx[-1])
# diff = int(self.actual_ts_len) - len(fitted_values)
# # If first elements skipped
# if diff != 0:
# # Fill nans with first values
# first_element = fitted_values[0]
# first_elements = [first_element] * diff
# first_elements.extend(list(fitted_values))

# fitted_values = np.array(first_elements)

# _, predict = ts_to_table(idx=idx,
# time_series=fitted_values,
# window_size=forecast_length)

# new_idx, target_columns = ts_to_table(idx=idx,
# time_series=target,
# window_size=forecast_length)

# input_data.idx = new_idx
# input_data.target = target_columns
# output_data = self._convert_to_output(input_data,
# predict=predict,
# data_type=DataTypesEnum.table)
# return output_data




# class ExpSmoothingImplementation(ModelImplementation):
# """ Exponential smoothing implementation from statsmodels """

# def __init__(self, params: OperationParameters):
# super().__init__(params)
# self.model = None
# if self.params.get("seasonal"):
# self.seasonal_periods = int(self.params.get("seasonal_periods"))
# else:
# self.seasonal_periods = None

# def fit(self, input_data):
# self.model = ETSModel(
# input_data.features.astype("float64"),
# error=self.params.get("error"),
# trend=self.params.get("trend"),
# seasonal=self.params.get("seasonal"),
# damped_trend=self.params.get("damped_trend") if self.params.get("trend") else None,
# seasonal_periods=self.seasonal_periods
# )
# self.model = self.model.fit(disp=False)
# return self.model

# def predict(self, input_data):
# input_data = copy(input_data)
# idx = input_data.idx

# start_id = idx[0]
# end_id = idx[-1]
# predictions = self.model.predict(start=start_id,
# end=end_id)
# predict = predictions
# predict = np.array(predict).reshape(1, -1)
# new_idx = np.arange(start_id, end_id + 1)

# input_data.idx = new_idx

# output_data = self._convert_to_output(input_data,
# predict=predict,
# data_type=DataTypesEnum.table)
# return output_data

# def predict_for_fit(self, input_data: InputData) -> OutputData:
# input_data = copy(input_data)
# parameters = input_data.task.task_params
# forecast_length = parameters.forecast_length
# idx = input_data.idx
# target = input_data.target

# # Indexing for statsmodels is different
# start_id = idx[0]
# end_id = idx[-1]
# predictions = self.model.predict(start=start_id,
# end=end_id)
# _, predict = ts_to_table(idx=idx,
# time_series=predictions,
# window_size=forecast_length)
# new_idx, target_columns = ts_to_table(idx=idx,
# time_series=target,
# window_size=forecast_length)

# input_data.idx = new_idx
# input_data.target = target_columns

# output_data = self._convert_to_output(input_data,
# predict=predict,
# data_type=DataTypesEnum.table)
# return output_data

# @staticmethod
# def extract_transition_probabilities(fitted_autoreg, as_series=False):
# k = fitted_autoreg.k_regimes
# ps = fitted_autoreg.params.iloc[: k * (k - 1)]
# rest_ps = 1 - ps.values.reshape(k, -1).sum(1)
# rest_ps[rest_ps < 0] = 0 # computational errors sometime lead to sum(probabilities) > 1
# ps = np.hstack([ps.values, rest_ps])
# if not as_series:
# return ps
# else:
# index = []
# for i in range(k):
# for j in range(k):
# index.append(f'p[{j}->{i}]')
# res = pd.Series(ps, index=index)
# return res

0 comments on commit 248deea

Please sign in to comment.