diff --git a/fedot_ind/core/models/ts_forecasting/markov_ar.py b/fedot_ind/core/models/ts_forecasting/markov_ar.py index eb9a3f270..0d3c638a8 100644 --- a/fedot_ind/core/models/ts_forecasting/markov_ar.py +++ b/fedot_ind/core/models/ts_forecasting/markov_ar.py @@ -75,20 +75,38 @@ def _filling_gaps(res): return res -class MarkovSwitchBase(ModelImplementation): +class MarkovReg(ModelImplementation): def __init__(self, params: OperationParameters): super().__init__(params) self.model = None self.actual_ts_len = None - self.scaler = StandardScaler() if params.get('scaler', 'standard') else _BoxCoxTransformer() + self.scaler = None # StandardScaler() if params.get('scaler', 'standard') else _BoxCoxTransformer() self.lambda_param = None self.scope = None - self.k_regimes = params.get('k_regimes', 2) - self.trend = params.get('trend', 'c') + self.max_k_regimes = params.get('max_k_regimes', 5) + self.k_regimes = 1 + self.trend = params.get('trend', 'ct') self.switching_variance = params.get('switching_variance', True) + self.switching_trend = params.get('switching_trend', True) + self.forecast_fn = getattr(self, { + 'ct': '_ct_forecasting', + 'c': '_c_forecasting', + 't': '_t_forecasting' + }[self.trend] + ) + self.forecast_length - def _init_fit(self, source_ts, exog): - raise NotImplemented + def _init_fit(self, endog, exog=None): + params = { + 'switching_trend': self.switching_trend, + 'switching_variance': self.switching_variance, + 'trend': self.trend + } + fitted_model = self._choose_model(endog, model=MarkovRegression, + max_k=self.max_k_regimes, exog=None, + **params) + self.k_regimes = fitted_model.k_regimes + return fitted_model def _prepare_data(self, input_data: InputData, idx_target: int=None, vars_first: bool=True)-> tuple: features = input_data.features[...] #copy @@ -106,7 +124,8 @@ def _prepare_data(self, input_data: InputData, idx_target: int=None, vars_first: else: idx_target = idx_target or 0 features[:, idx_target], features[:, -1] = features[:, -1], features[:, idx_target] - features = self.scaler.fit_transform(features) + if self.scaler is not None: + features = self.scaler.fit_transform(features) endog = features[:, -1] exog = features[:, :-1] return endog, exog @@ -118,34 +137,109 @@ def fit(self, input_data, idx_target=None, vars_first=True): :param input_data: data with features, target and ids to process """ endog, exog = self._prepare_data(input_data, idx_target=idx_target, vars_first=vars_first) - # self.scaler.fit_transform(input_data.features.reshape(-1, 1)).flatten() self.actual_ts_len = len(endog) self.model = self._init_fit(endog, exog) return self.model + + def _choose_model(self, endog, model=None, max_k=5, **params): + assert max_k >= 1, 'k_regimes can\'t be less than 1' + if not model: + model = MarkovRegression + fitted_model = None + for i in range(2, max_k + 1): + try: + fitted_model = model(endog, k_regimes=i, **params).fit() + except Exception as ex: + print(type(ex)) + continue + if 'nan' not in str(fitted_model.summary()): + break + else: + fitted_model = None + if fitted_model is None: + raise RuntimeError('Model did not converge!') + return fitted_model + + def _forecast(self, forecast_length, initial_state): + fitted_model = self.model + tr_mtr = fitted_model.regime_transition[..., -1] + regimes = np.arange(fitted_model.k_regimes) + states = [initial_state] + for i in range(forecast_length): + states.append( + np.random.choice(regimes, size=1, p=tr_mtr[:, states[-1]].flatten()) + ) + states = np.array(states[1:]).flatten() + + forecast = self.forecast_fn(states, forecast_length) + return forecast + def _ct_forecasting(self, states, forecast_length): + fitted_model = self.model + + slopes = self.parse('slope')[states] + consts = self.parse('const')[states] + + start_ind = fitted_model.nobs + index = np.arange(start_ind, start_ind + forecast_length) + return slopes * index + consts - def predict(self, input_data): + def parse(self, param_type:str): + p_idx = { + 'ct': {'const': 0, 'slope': 1, 'sigma2': 2}, + 'c': {'const': 0, 'sigma2': 1}, + 't': {'slope': 0, 'sigma2': 1} + } + mr = self.model + idx = p_idx[self.trend][param_type] + tables = mr.summary().tables + return np.array([float(tables[i + 1].data[idx + 1][1]) for i in range(mr.k_regimes)]) + + def _t_forecasting(self): + #TODO + raise NotImplemented + + def _c_forecasting(self): + #TODO + raise NotImplemented + + def last_regime(self): + fitted_model = self.model + last_regime = np.argmax(fitted_model.smoothed_marginal_probabilities[-2]) # or -1? + return last_regime + + def _probabilities_forecast(self, *args, **kwargs): + raise NotImplemented('We did\'t find any approach to get out-of-sample marginal_probabilities yet!') + + def predict(self, test_data: InputData, output_mode='predictions'): """ Method for time series prediction on forecast length :param input_data: data with features, target and ids to process :return output_data: output data with smoothed time series """ - # input_data = copy(input_data) - parameters = input_data.task.task_params - forecast_length = parameters.forecast_length + test_data = copy(test_data) - # in case in(out) sample forecasting - self.handle_new_data(input_data) - start_id = self.actual_ts_len - end_id = start_id + forecast_length - 1 - - predicted = MSARExtension(self.model).predict_out_of_sample() + parameters = test_data.task.task_params + if hasattr(parameters, 'forecast_length'): # erase for production + forecast_length = parameters.forecast_length + else: + forecast = self.forecast_length + initial_state = self.last_regime() - predict = self.scaler.inverse_transform(np.array([predicted]).ravel().reshape(1, -1)) + if output_mode == predictions: + forecast = self._forecast(forecast_length, initial_state) + elif output_mode == 'marginal_probabilities': + forecast = self._probabilities_forecast(forecast_length, initial_state) + else: + raise ValueError('Unknown output mode!') + # while scaling not needed + predict = forecast + # predict = self.scaler.inverse_transform(np.array([predicted]).ravel().reshape(1, -1)) - output_data = self._convert_to_output(input_data, + output_data = self._convert_to_output(test_data, predict=predict, data_type=DataTypesEnum.table) + return output_data @@ -175,47 +269,8 @@ def predict_for_fit(self, input_data: InputData) -> OutputData: data_type=DataTypesEnum.table) return output_data - def _get_out_of_sample(self, state: int, fitted_model, offset: int=0, noisy=False): - # assert trend == 'ct' - table = fitted_model.summary().tables[1 + state].data - const = float(table[1][1]) - x1 = float(table[2][1]) - - noise = 0 - if self.switching_variance and noisy: - sigma2 = float(table[3][1]) - noise = np.random.normal(0, np.sqrt(sigma2), 1).item() - - pred = const + (fitted_model.nobs + offset) * x1 + noise - return pred - @staticmethod - def _get_next_state(fitted_model): - return np.random.choice(np.arange(fitted_model.k_regimes), - size=1, - p=fitted_model.predicted_marginal_probabilities[-1, :] - ).item() - - def forecast(self, horizon, noisy=False, max_iter=20): - model = self.model - endog = model.data.endog - preds = [] - while max_iter: - max_iter -= 1 - try: - state = self._get_next_state(model) - pred = self._get_out_of_sample(state=state, fitted_model=model, offset=0, noisy=noisy) - endog = np.append(endog, pred) - model = self._init_fit(endog, None) - preds.append(pred) - if len(preds) == horizon: - break - except: - print('SVD failed to converge') - return preds - - -class MarkovAR(MarkovSwitchBase): +class MarkovAR(MarkovReg): def __init__(self, params: OperationParameters): super().__init__(params) self.order = params.get('order', 2) @@ -229,136 +284,8 @@ def _init_fit(self, endog, exog=None): exog=exog, switching_variance=False).fit() -class MarkovReg(MarkovSwitchBase): - def __init__(self, params: OperationParameters): - super().__init__(params) + - def _init_fit(self, endog, exog=None): - return MarkovRegression(endog, - k_regimes=self.k_regimes, - trend = self.trend, - exog=exog, - switching_variance=self.switching_variance).fit() - - # def predict_for_fit(self, input_data: InputData) -> OutputData: - # parameters = input_data.task.task_params - # forecast_length = parameters.forecast_length - # idx = input_data.idx - # target = input_data.target - - # fitted_values = self.autoreg.predict(start=idx[0], end=idx[-1]) - # diff = int(self.actual_ts_len) - len(fitted_values) - # # If first elements skipped - # if diff != 0: - # # Fill nans with first values - # first_element = fitted_values[0] - # first_elements = [first_element] * diff - # first_elements.extend(list(fitted_values)) - - # fitted_values = np.array(first_elements) - - # _, predict = ts_to_table(idx=idx, - # time_series=fitted_values, - # window_size=forecast_length) - - # new_idx, target_columns = ts_to_table(idx=idx, - # time_series=target, - # window_size=forecast_length) - - # input_data.idx = new_idx - # input_data.target = target_columns - # output_data = self._convert_to_output(input_data, - # predict=predict, - # data_type=DataTypesEnum.table) - # return output_data - - - - -# class ExpSmoothingImplementation(ModelImplementation): -# """ Exponential smoothing implementation from statsmodels """ - -# def __init__(self, params: OperationParameters): -# super().__init__(params) -# self.model = None -# if self.params.get("seasonal"): -# self.seasonal_periods = int(self.params.get("seasonal_periods")) -# else: -# self.seasonal_periods = None - -# def fit(self, input_data): -# self.model = ETSModel( -# input_data.features.astype("float64"), -# error=self.params.get("error"), -# trend=self.params.get("trend"), -# seasonal=self.params.get("seasonal"), -# damped_trend=self.params.get("damped_trend") if self.params.get("trend") else None, -# seasonal_periods=self.seasonal_periods -# ) -# self.model = self.model.fit(disp=False) -# return self.model - -# def predict(self, input_data): -# input_data = copy(input_data) -# idx = input_data.idx - -# start_id = idx[0] -# end_id = idx[-1] -# predictions = self.model.predict(start=start_id, -# end=end_id) -# predict = predictions -# predict = np.array(predict).reshape(1, -1) -# new_idx = np.arange(start_id, end_id + 1) - -# input_data.idx = new_idx - -# output_data = self._convert_to_output(input_data, -# predict=predict, -# data_type=DataTypesEnum.table) -# return output_data - -# def predict_for_fit(self, input_data: InputData) -> OutputData: -# input_data = copy(input_data) -# parameters = input_data.task.task_params -# forecast_length = parameters.forecast_length -# idx = input_data.idx -# target = input_data.target - -# # Indexing for statsmodels is different -# start_id = idx[0] -# end_id = idx[-1] -# predictions = self.model.predict(start=start_id, -# end=end_id) -# _, predict = ts_to_table(idx=idx, -# time_series=predictions, -# window_size=forecast_length) -# new_idx, target_columns = ts_to_table(idx=idx, -# time_series=target, -# window_size=forecast_length) - -# input_data.idx = new_idx -# input_data.target = target_columns - -# output_data = self._convert_to_output(input_data, -# predict=predict, -# data_type=DataTypesEnum.table) -# return output_data - -# @staticmethod -# def extract_transition_probabilities(fitted_autoreg, as_series=False): -# k = fitted_autoreg.k_regimes -# ps = fitted_autoreg.params.iloc[: k * (k - 1)] -# rest_ps = 1 - ps.values.reshape(k, -1).sum(1) -# rest_ps[rest_ps < 0] = 0 # computational errors sometime lead to sum(probabilities) > 1 -# ps = np.hstack([ps.values, rest_ps]) -# if not as_series: -# return ps -# else: -# index = [] -# for i in range(k): -# for j in range(k): -# index.append(f'p[{j}->{i}]') -# res = pd.Series(ps, index=index) -# return res \ No newline at end of file + \ No newline at end of file