Skip to content

Commit

Permalink
It runs by api, but prediction failed (not proper horizon)
Browse files Browse the repository at this point in the history
  • Loading branch information
leostre committed May 16, 2024
1 parent 4a52200 commit 4a65aa2
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 115 deletions.
211 changes: 99 additions & 112 deletions fedot_ind/core/models/nn/network_impl/deepar.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fedot_ind.core.models.nn.network_impl.base_nn_model import BaseNeuralModel
from typing import Optional, Callable, Any, List, Union
from typing import Optional, Callable, Any, List, Union, Tuple
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.data.data import InputData, OutputData
from fedot_ind.core.repository.constanst_repository import CROSS_ENTROPY
Expand Down Expand Up @@ -43,13 +43,17 @@ def forward(self, x, normalize=True):
factors = factors[..., None]
means == factors[..., None]
return x * factors + means

def scale(self, x):
return (x - self.means) / self.factors

class DeepARModule(Module):
_loss_fns = {
'normal': NormalDistributionLoss
}

def __init__(self, cell_type, input_size, hidden_size, rnn_layers, dropout, distribution):
def __init__(self, cell_type: str, input_size: int, hidden_size: int,
rnn_layers: int, dropout: float, distribution: str):
super().__init__()
self.rnn = {'LSTM': LSTM, 'GRU': GRU, 'RNN': RNN}[cell_type](
input_size = input_size,
Expand Down Expand Up @@ -85,61 +89,77 @@ def _decode_whole_seq(self, ts: torch.Tensor, hidden_state: torch.Tensor):
output = self.projector(output)
return output, hidden_state

def forecast(self, prefix: torch.Tensor, horizon, mode='lagged', output_mode='quantiles', **mode_kw):
def forecast(self, prefix: torch.Tensor, horizon: int,
mode: str='lagged', output_mode: str='quantiles', **mode_kw):
self.eval()
forecast = []
if mode == 'lagged':
if self.rnn.input_size != 1 or mode == 'lagged':
with torch.no_grad():
for i in range(horizon):
output = self(prefix)[0]
forecast.append(self._transform_params(output, target_scale=self.target_scale, mode=output_mode, **mode_kw).detach().cpu())
prediction = self._transform_params(output, target_scale=self.target_scale,
mode='predictions')
forecast.append(self._transform_params(output, mode=output_mode, **mode_kw).detach().cpu())
prediction = self._transform_params(output, mode='predictions')
prefix = torch.roll(prefix, -1, dims=-1)
prefix[..., [-1]] = prediction
forecast = torch.stack(forecast)#.squeeze(1).permute(1, 2, 0)
elif mode == 'auto':
assert self.rnn.input_size == 1, "autoregressive mode requires the features not to be lagged"
forecast = torch.stack(forecast, dim=1).squeeze(-1)#.squeeze(1).permute(1, 2, 0)
elif self.rnn.input_size == 1 or mode == 'auto':
# assert self.rnn.input_size == 1, "autoregressive mode requires the features not to be lagged"
forecast = self._autoregressive(prefix, horizon, hidden_state=None,
output_mode=output_mode, **mode_kw)
else:
raise ValueError('Unknown forecasting type!')

return forecast

def _autoregressive(self, prefix: torch.Tensor,
horizon: int, hidden_state: torch.Tensor=None,
output_mode: str='quantiles', **mode_kw):
if hidden_state is None:
hidden_state = self._encode(prefix)
# hidden_state = hidden_state[:, [-1], :]
outputs = []
x = prefix[[-1], ...] # what's the order of rnn processing?
for i in range(horizon):
output, hidden_state = self.rnn(x, hidden_state)
outputs.append(self._transform_params(output, mode=output_mode, **mode_kw).detach().cpu())
x = self._transform_params(output, mode='predictions')
outputs = torch.stack(outputs, dim=1)
return outputs


def forward(self, x: torch.Tensor,
# n_samples: int = None,
mode='raw', **mode_kw):
mode='raw', **mode_kw) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Forward pass
x.size == (nseries, length)
"""
# encode
x = self.scaler(x, normalize=True)
hidden_state = self._encode(x)
# decode

if self.training:
# assert n_samples is None, "cannot sample from decoder when training"
assert mode == 'raw', "cannot use another mode, but 'raw' while training"
return self._decode_whole_seq(x, hidden_state)
else:
output, hidden_state = self._decode_whole_seq(x, hidden_state)
return self._transform_params(output,
mode=mode, **mode_kw), hidden_state

def to_quantiles(self, params: torch.Tensor, quantiles=None):
def to_quantiles(self, params: torch.Tensor, quantiles=None) -> torch.Tensor:
if quantiles is None:
quantiles = self.quantiles
distr = self.distribution.map_x_to_distribution(params)
return distr.icdf(quantiles).unsqueeze(1)

def to_samples(self, params: torch.Tensor, n_samples=100):
def to_samples(self, params: torch.Tensor, n_samples=100) -> torch.Tensor:
distr = self.distribution.map_x_to_distribution(params)
return distr.sample((n_samples,)).permute(1, 2, 0) # distr_n x n_samples

def to_predictions(self, params: torch.Tensor):
def to_predictions(self, params: torch.Tensor) -> torch.Tensor:
distr = self.distribution.map_x_to_distribution(params)
return distr.sample((1,)).permute(1, 2, 0) # distr_n x 1

def _transform_params(self, distr_params, mode='raw', **mode_kw):
def _transform_params(self, distr_params, mode='raw', **mode_kw) -> torch.Tensor:
if mode == 'raw':
return distr_params
elif mode == 'quantiles':
Expand All @@ -155,55 +175,6 @@ def _transform_params(self, distr_params, mode='raw', **mode_kw):
return transformed



def _decode_one(self, x,
idx,
hidden_state,
):
x = x[..., [idx]]
prediction, hidden_state = self._decode_whole_seq(x, hidden_state)
prediction = prediction[:, [0], ...] # select first time step fo this index
return prediction, hidden_state

def _decode_autoregressive(
self,
hidden_state: Any,
first_target: Union[List[torch.Tensor], torch.Tensor],
n_decoder_steps: int,
n_samples: int = 1,
**kwargs,
) -> Union[List[torch.Tensor], torch.Tensor]:

# make predictions which are fed into next step
output = []
current_target = first_target
current_hidden_state = hidden_state

normalized_output = [first_target]

for idx in range(n_decoder_steps):
# get lagged targets
current_target, current_hidden_state = self._decode_one(
idx,
# lagged_targets=normalized_output,
hidden_state=current_hidden_state, **kwargs
)

# get prediction and its normalized version for the next step
prediction, current_target = self.output_to_prediction(
current_target,
# target_scale=target_scale,
n_samples=n_samples
)
# save normalized output for lagged targets
normalized_output.append(current_target)
# set output to unnormalized samples, append each target as n_batch_samples x n_random_samples

output.append(prediction)
output = torch.stack(output, dim=1)
return output


class DeepAR(BaseNeuralModel):
"""No exogenous variable support
Variational Inference + Probable Anomaly detection"""
Expand All @@ -229,12 +200,10 @@ def __init__(self, params: Optional[OperationParameters] = {}):
###
self.preprocess_to_lagged = False
self.patch_len = params.get('patch_len', None)
self.forecast_mode = params.get('forecast_mode', 'raw')
self.forecast_mode = params.get('forecast_mode', 'predictions')
self.quantiles = params.get('quantiles', None)

self.test_patch_len = None



def _init_model(self, ts) -> tuple:
self.loss_fn = DeepARModule._loss_fns[self.expected_distribution]()
Expand All @@ -258,7 +227,7 @@ def _init_model(self, ts) -> tuple:
return self.loss_fn, self.optimizer

def fit(self, input_data: InputData, split_data: bool = False):
train_loader, val_loader = self._prepare_data(input_data, split_data=split_data)
train_loader, val_loader = self._prepare_data(input_data, split_data=split_data, horizon=1)
loss_fn, optimizer = self._init_model(input_data)
self._train_loop(model=self.model,
train_loader=train_loader,
Expand All @@ -268,9 +237,8 @@ def fit(self, input_data: InputData, split_data: bool = False):
)
return self

def _prepare_data(self, input_data: InputData, split_data):
def _prepare_data(self, input_data: InputData, split_data, horizon=None):
val_loader = None
# define patch_len
if self.preprocess_to_lagged:
self.patch_len = input_data.features.shape[-1]
train_loader = self.__create_torch_loader(input_data)
Expand All @@ -280,51 +248,76 @@ def _prepare_data(self, input_data: InputData, split_data):
method='dff').get_window_size(input_data.features)
self.patch_len = 2 * dominant_window_size
train_loader, val_loader = self._get_train_val_loaders(
input_data.features, self.patch_len, split_data)
input_data.features, self.patch_len, split_data, horizon=horizon)

self.test_patch_len = self.patch_len
return train_loader, val_loader



def _predict_loop(self, test_loader, output_mode):
model = self.model # or model for inference?
output = model.predict(test_loader, output_mode)

y_pred = output #
forecast_idx_predict = np.arange(start=test_data.idx[-self.horizon],
stop=test_data.idx[-self.horizon] +
self.horizon,
def predict(self,
test_data: InputData,
output_mode: str = None):
if not output_mode:
output_mode = self.forecast_mode
# test_loader = self._get_test_loader(test_data)
test_loader, _ = self._prepare_data(test_data, False, 0)
last_patch = test_loader.dataset[-1][0][None, ...]
fcs = self._predict(last_patch, output_mode)

# some logic to select needed ts

# forecast_idx_predict = np.arange(start=test_data.idx[-self.horizon],
# stop=test_data.idx[-self.horizon] +
# self.horizon,
# step=1)
forecast_idx_predict = np.arange(start=test_data.idx[-1],
stop=test_data.idx[-1] + self.horizon,
step=1)
predict = OutputData(
idx=forecast_idx_predict,
task=self.task_type,
predict=y_pred.reshape(1, -1),
predict=fcs.reshape(self.horizon, -1, fcs.size(-1)),
target=self.target,
data_type=DataTypesEnum.table)

return predict


def _predict(self, x, output_mode, **output_kw):
mode = 'lagged' if self.preprocess_to_lagged else 'auto'
x = x.to(default_device())
fc = self.model.forecast(x, self.horizon, mode, output_mode, **output_kw)
return fc

def predict_for_fit(self,
test_data,
output_mode: str='samples'):
y_pred = []
true_mode = self.forecast_mode

self.forecast_mode = output_mode
model = self.model
##########
output_mode: str = 'labels'): # will here signature conflict raise in case I drop kw?
output_mode = 'predictions'
# test_loader = self._get_test_loader(test_data)
fcs = self._predict(test_loader, output_mode)

# test_loader = self._get_test_loader(test_data)
test_loader, _ = self._prepare_data(test_data, False, 0)
last_patch = test_loader.dataset[-1][0][None, ...]
fcs = self._predict(last_patch, output_mode)

# some logic to select needed ts

# forecast_idx_predict = np.arange(start=test_data.idx[-self.horizon],
# stop=test_data.idx[-self.horizon] +
# self.horizon,
# step=1)
forecast_idx_predict = np.arange(start=test_data.idx[-1],
stop=test_data.idx[-1] + self.horizon,
step=1)

y_pred = np.array(y_pred)
y_pred = y_pred.squeeze()
forecast_idx_predict = test_data.idx
predict = OutputData(
idx=forecast_idx_predict,
task=self.task_type,
predict=y_pred,
predict=fcs.squeeze().numpy(),
target=self.target,
data_type=DataTypesEnum.table)
self.forecast_mode = true_mode
data_type=DataTypesEnum.table)
return predict


def _train_loop(self, model,
train_loader,
Expand Down Expand Up @@ -357,7 +350,7 @@ def _train_loop(self, model,
outputs, *hidden_state = model(batch_x)
# return batch_x, outputs, batch_y

loss = loss_fn(outputs, batch_y)
loss = loss_fn(outputs, batch_y, self.model.scaler)
train_loss.append(loss.item())

loss.backward()
Expand Down Expand Up @@ -392,14 +385,6 @@ def _train_loop(self, model,
scheduler.get_last_lr()[0]))
return best_model

def __predict_loop(self, test_loader):
outputs = []
with torch.no_grad():
for x_test in test_loader:
outputs.append(self.model.predict(x_test))
output = torch.stack(outputs, dim=0)
return output


@convert_inputdata_to_torch_time_series_dataset
def _create_dataset(self,
Expand All @@ -414,7 +399,9 @@ def _get_train_val_loaders(self,
patch_len=None,
split_data: bool = True,
validation_blocks: int = None,
unsqueeze_0=True):
horizon=None):
if not horizon:
horizon = self.horizon
if patch_len is None:
patch_len = self.patch_len
train_data = self.__ts_to_input_data(ts)
Expand All @@ -423,17 +410,17 @@ def _get_train_val_loaders(self,
train_data, val_data = train_test_data_setup(
train_data, validation_blocks=validation_blocks)
_, train_data.features, train_data.target = transform_features_and_target_into_lagged(train_data,
self.horizon,
horizon,
patch_len)
_, val_data.features, val_data.target = transform_features_and_target_into_lagged(val_data,
self.horizon,
horizon,
patch_len)
val_loader = self.__create_torch_loader(val_data)
train_loader = self.__create_torch_loader(train_data)
return train_loader, val_loader
else:
_, train_data.features, train_data.target = transform_features_and_target_into_lagged(train_data,
self.horizon,
horizon,
patch_len)
train_loader = self.__create_torch_loader(train_data)
return train_loader, None
Expand Down Expand Up @@ -478,7 +465,6 @@ def _get_test_loader(self,
if len(test_data.features.shape) == 1:
test_data.features = test_data.features[None, ...]


if not self.preprocess_to_lagged:
features = HankelMatrix(time_series=test_data.features,
window_size=self.test_patch_len or self.patch_len).trajectory_matrix
Expand All @@ -487,6 +473,7 @@ def _get_test_loader(self,
target = torch.from_numpy(DataConverter(
data=features).convert_to_torch_format()).float()
else:
# if True:
features = test_data.features
features = torch.from_numpy(DataConverter(data=features).
convert_to_torch_format()).float()
Expand Down
Loading

0 comments on commit 4a65aa2

Please sign in to comment.