diff --git a/README.md b/README.md index 47ee47fa..e024cc27 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ The paper references and links are all listed at the bottom of this file. | Neural Net | iTransformer🧑‍🔧[^24] | ✅ | | | | | `2024 - ICLR` | | Neural Net | ModernTCN[^38] | ✅ | | | | | `2024 - ICLR` | | Neural Net | ImputeFormer🧑‍🔧[^34] | ✅ | | | | | `2024 - KDD` | +| Neural Net | SegRNN[^42] | ✅ | | | | | `2023 - arXiv` | | Neural Net | SAITS[^1] | ✅ | | | | | `2023 - ESWA` | | Neural Net | FreTS🧑‍🔧[^23] | ✅ | | | | | `2023 - NeurIPS` | | Neural Net | Koopa🧑‍🔧[^29] | ✅ | | | | | `2023 - NeurIPS` | @@ -509,3 +510,6 @@ Time-Series.AI [^41]: Xu, Z., Zeng, A., & Xu, Q. (2024). [FITS: Modeling Time Series with 10k parameters](https://openreview.net/forum?id=bWcnvZ3qMb). *ICLR 2024*. +[^42]: Lin, S., Lin, W., Wu, W., Zhao, F., Mo, R., & Zhang, H. (2023). +[Segrnn: Segment recurrent neural network for long-term time series forecasting](https://github.com/lss-1138/SegRNN) +*arXiv 2023*. diff --git a/pypots/imputation/__init__.py b/pypots/imputation/__init__.py index 6600dcfd..9ce1d867 100644 --- a/pypots/imputation/__init__.py +++ b/pypots/imputation/__init__.py @@ -38,6 +38,7 @@ from .imputeformer import ImputeFormer from .timemixer import TimeMixer from .moderntcn import ModernTCN +from .segrnn import SegRNN # naive imputation methods from .locf import LOCF @@ -87,4 +88,5 @@ "Lerp", "TEFN", "CSAI", + "SegRNN", ] diff --git a/pypots/imputation/segrnn/__init__.py b/pypots/imputation/segrnn/__init__.py new file mode 100644 index 00000000..243345a8 --- /dev/null +++ b/pypots/imputation/segrnn/__init__.py @@ -0,0 +1,24 @@ +""" +The package including the modules of SegRNN. + +Refer to the paper +`Lin, Shengsheng and Lin, Weiwei and Wu, Wentai and Zhao, Feiyu and Mo, Ruichao and Zhang, Haotong. +Segrnn: Segment recurrent neural network for long-term time series forecasting. +arXiv preprint arXiv:2308.11200. +`_ + +Notes +----- +This implementation is inspired by the official one https://github.com/lss-1138/SegRNN + +""" + +# Created by Shengsheng Lin + + + +from .model import SegRNN + +__all__ = [ + "SegRNN", +] diff --git a/pypots/imputation/segrnn/core.py b/pypots/imputation/segrnn/core.py new file mode 100644 index 00000000..c1d5f3a2 --- /dev/null +++ b/pypots/imputation/segrnn/core.py @@ -0,0 +1,59 @@ +""" +The core wrapper assembles the submodules of SegRNN imputation model +and takes over the forward progress of the algorithm. +""" + +# Created by Shengsheng Lin + +from typing import Optional + +from typing import Callable +import torch.nn as nn + +from ...nn.modules.segrnn import BackboneSegRNN +from ...nn.modules.saits import SaitsLoss + +class _SegRNN(nn.Module): + def __init__( + self, + n_steps: int, + n_features: int, + seg_len: int = 24, + d_model: int = 512, + dropout: float = 0.5, + ORT_weight: float = 1, + MIT_weight: float = 1, + ): + super().__init__() + + self.n_steps = n_steps + self.n_features = n_features + self.seg_len = seg_len + self.d_model = d_model + self.dropout = dropout + + self.backbone = BackboneSegRNN(n_steps, n_features, seg_len, d_model, dropout) + + # apply SAITS loss function to Transformer on the imputation task + self.saits_loss_func = SaitsLoss(ORT_weight, MIT_weight) + + def forward(self, inputs: dict, training: bool = True) -> dict: + X, missing_mask = inputs["X"], inputs["missing_mask"] + + reconstruction = self.backbone(X) + + imputed_data = missing_mask * X + (1 - missing_mask) * reconstruction + results = { + "imputed_data": imputed_data, + } + + # if in training mode, return results with losses + if training: + X_ori, indicating_mask = inputs["X_ori"], inputs["indicating_mask"] + loss, ORT_loss, MIT_loss = self.saits_loss_func(reconstruction, X_ori, missing_mask, indicating_mask) + results["ORT_loss"] = ORT_loss + results["MIT_loss"] = MIT_loss + # `loss` is always the item for backward propagating to update the model + results["loss"] = loss + + return results diff --git a/pypots/imputation/segrnn/data.py b/pypots/imputation/segrnn/data.py new file mode 100644 index 00000000..a9eb728e --- /dev/null +++ b/pypots/imputation/segrnn/data.py @@ -0,0 +1,21 @@ +""" +Dataset class for the imputation model SegRNN. +""" + +# Created by Shengsheng lin + +from typing import Union + +from pypots.imputation.saits.data import DatasetForSAITS + + +class DatasetForSegRNN(DatasetForSAITS): + def __init__( + self, + data: Union[dict, str], + return_X_ori: bool, + return_y: bool, + file_type: str = "hdf5", + rate: float = 0.2, + ): + super().__init__(data, return_X_ori, return_y, file_type, rate) diff --git a/pypots/imputation/segrnn/model.py b/pypots/imputation/segrnn/model.py new file mode 100644 index 00000000..6e687084 --- /dev/null +++ b/pypots/imputation/segrnn/model.py @@ -0,0 +1,296 @@ +""" +The implementation of SegRNN for the partially-observed time-series imputation task. + +""" + +# Created by Shengsheng Lin + +from typing import Union, Optional + +import numpy as np +import torch +from torch.utils.data import DataLoader + +from .core import _SegRNN +from .data import DatasetForSegRNN +from ..base import BaseNNImputer +from ...data.checking import key_in_data_set +from ...data.dataset import BaseDataset +from ...optim.adam import Adam +from ...optim.base import Optimizer + + +class SegRNN(BaseNNImputer): + """The PyTorch implementation of the SegRNN model. + SegRNN is originally proposed by Shengsheng Lin et al. in :cite:`lin2023segrnn`. + See detail in https://arxiv.org/abs/2308.11200 or https://github.com/lss-1138/SegRNN. + + Parameters + ---------- + n_steps : + The number of time steps in the time-series data sample. + + n_features : + The number of features in the time-series data sample. + + seg_len : + The segment length for input of RNN. + + d_model: + The dimension of RNN cell. + + dropout : + The dropout rate of the output layer of SegRNN. + + ORT_weight : + The weight for the ORT loss, the same as SAITS. + + MIT_weight : + The weight for the MIT loss, the same as SAITS. + + batch_size : + The batch size for training and evaluating the model. + + epochs : + The number of epochs for training the model. + + patience : + The patience for the early-stopping mechanism. Given a positive integer, the training process will be + stopped when the model does not perform better after that number of epochs. + Leaving it default as None will disable the early-stopping. + + optimizer : + The optimizer for model training. + If not given, will use a default Adam optimizer. + + num_workers : + The number of subprocesses to use for data loading. + `0` means data loading will be in the main process, i.e. there won't be subprocesses. + + device : + The device for the model to run on. It can be a string, a :class:`torch.device` object, or a list of them. + If not given, will try to use CUDA devices first (will use the default CUDA device if there are multiple), + then CPUs, considering CUDA and CPU are so far the main devices for people to train ML models. + If given a list of devices, e.g. ['cuda:0', 'cuda:1'], or [torch.device('cuda:0'), torch.device('cuda:1')] , the + model will be parallely trained on the multiple devices (so far only support parallel training on CUDA devices). + Other devices like Google TPU and Apple Silicon accelerator MPS may be added in the future. + + saving_path : + The path for automatically saving model checkpoints and tensorboard files (i.e. loss values recorded during + training into a tensorboard file). Will not save if not given. + + model_saving_strategy : + The strategy to save model checkpoints. It has to be one of [None, "best", "better", "all"]. + No model will be saved when it is set as None. + The "best" strategy will only automatically save the best model after the training finished. + The "better" strategy will automatically save the model during training whenever the model performs + better than in previous epochs. + The "all" strategy will save every model after each epoch training. + + verbose : + Whether to print out the training logs during the training process. + """ + + def __init__( + self, + n_steps: int, + n_features: int, + seg_len: int = 24, + d_model: int = 512, + dropout: float = 0.5, + ORT_weight: float = 1, + MIT_weight: float = 1, + batch_size: int = 32, + epochs: int = 100, + patience: int = None, + optimizer: Optional[Optimizer] = Adam(), + num_workers: int = 0, + device: Optional[Union[str, torch.device, list]] = None, + saving_path: str = None, + model_saving_strategy: Optional[str] = "best", + verbose: bool = True, + ): + super().__init__( + batch_size, + epochs, + patience, + num_workers, + device, + saving_path, + model_saving_strategy, + verbose, + ) + + self.n_steps = n_steps + self.n_features = n_features + # model hype-parameters + self.seg_len = seg_len + self.d_model = d_model + self.dropout = dropout + self.ORT_weight = ORT_weight + self.MIT_weight = MIT_weight + + # set up the model + self.model = _SegRNN( + self.n_steps, + self.n_features, + self.seg_len, + self.d_model, + self.dropout, + self.ORT_weight, + self.MIT_weight, + ) + self._send_model_to_given_device() + self._print_model_size() + + # set up the optimizer + self.optimizer = optimizer + self.optimizer.init_optimizer(self.model.parameters()) + + def _assemble_input_for_training(self, data: list) -> dict: + ( + indices, + X, + missing_mask, + X_ori, + indicating_mask, + ) = self._send_data_to_given_device(data) + + inputs = { + "X": X, + "missing_mask": missing_mask, + "X_ori": X_ori, + "indicating_mask": indicating_mask, + } + + return inputs + + def _assemble_input_for_validating(self, data: list) -> dict: + return self._assemble_input_for_training(data) + + def _assemble_input_for_testing(self, data: list) -> dict: + indices, X, missing_mask = self._send_data_to_given_device(data) + + inputs = { + "X": X, + "missing_mask": missing_mask, + } + + return inputs + + def fit( + self, + train_set: Union[dict, str], + val_set: Optional[Union[dict, str]] = None, + file_type: str = "hdf5", + ) -> None: + # Step 1: wrap the input data with classes Dataset and DataLoader + training_set = DatasetForSegRNN(train_set, return_X_ori=False, return_y=False, file_type=file_type) + training_loader = DataLoader( + training_set, + batch_size=self.batch_size, + shuffle=True, + num_workers=self.num_workers, + ) + val_loader = None + if val_set is not None: + if not key_in_data_set("X_ori", val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForSegRNN(val_set, return_X_ori=True, return_y=False, file_type=file_type) + val_loader = DataLoader( + val_set, + batch_size=self.batch_size, + shuffle=False, + num_workers=self.num_workers, + ) + + # Step 2: train the model and freeze it + self._train_model(training_loader, val_loader) + self.model.load_state_dict(self.best_model_dict) + self.model.eval() # set the model as eval status to freeze it. + + # Step 3: save the model if necessary + self._auto_save_model_if_necessary(confirm_saving=self.model_saving_strategy == "best") + + def predict( + self, + test_set: Union[dict, str], + file_type: str = "hdf5", + ) -> dict: + """Make predictions for the input data with the trained model. + + Parameters + ---------- + test_set : dict or str + The dataset for model validating, should be a dictionary including keys as 'X', + or a path string locating a data file supported by PyPOTS (e.g. h5 file). + If it is a dict, X should be array-like of shape [n_samples, sequence length (n_steps), n_features], + which is time-series data for validating, can contain missing values, and y should be array-like of shape + [n_samples], which is classification labels of X. + If it is a path string, the path should point to a data file, e.g. a h5 file, which contains + key-value pairs like a dict, and it has to include keys as 'X' and 'y'. + + file_type : + The type of the given file if test_set is a path string. + + Returns + ------- + file_type : + The dictionary containing the clustering results and latent variables if necessary. + + """ + # Step 1: wrap the input data with classes Dataset and DataLoader + self.model.eval() # set the model as eval status to freeze it. + test_set = BaseDataset( + test_set, + return_X_ori=False, + return_X_pred=False, + return_y=False, + file_type=file_type, + ) + test_loader = DataLoader( + test_set, + batch_size=self.batch_size, + shuffle=False, + num_workers=self.num_workers, + ) + imputation_collector = [] + + # Step 2: process the data with the model + with torch.no_grad(): + for idx, data in enumerate(test_loader): + inputs = self._assemble_input_for_testing(data) + results = self.model.forward(inputs, training=False) + imputation_collector.append(results["imputed_data"]) + + # Step 3: output collection and return + imputation = torch.cat(imputation_collector).cpu().detach().numpy() + result_dict = { + "imputation": imputation, + } + return result_dict + + def impute( + self, + test_set: Union[dict, str], + file_type: str = "hdf5", + ) -> np.ndarray: + """Impute missing values in the given data with the trained model. + + Parameters + ---------- + test_set : + The data samples for testing, should be array-like of shape [n_samples, sequence length (n_steps), + n_features], or a path string locating a data file, e.g. h5 file. + + file_type : + The type of the given file if X is a path string. + + Returns + ------- + array-like, shape [n_samples, sequence length (n_steps), n_features], + Imputed data. + """ + + result_dict = self.predict(test_set, file_type=file_type) + return result_dict["imputation"] diff --git a/pypots/nn/modules/segrnn/__init__.py b/pypots/nn/modules/segrnn/__init__.py new file mode 100644 index 00000000..a367bf8d --- /dev/null +++ b/pypots/nn/modules/segrnn/__init__.py @@ -0,0 +1,23 @@ +""" +The package including the modules of SegRNN. + +Refer to the paper +`Lin, Shengsheng and Lin, Weiwei and Wu, Wentai and Zhao, Feiyu and Mo, Ruichao and Zhang, Haotong. +Segrnn: Segment recurrent neural network for long-term time series forecasting. +arXiv preprint arXiv:2308.11200. +`_ + +Notes +----- +This implementation is inspired by the official one https://github.com/lss-1138/SegRNN + +""" + +# Created by Shengsheng Lin + + +from .backbone import BackboneSegRNN + +__all__ = [ + "BackboneSegRNN", +] diff --git a/pypots/nn/modules/segrnn/backbone.py b/pypots/nn/modules/segrnn/backbone.py new file mode 100644 index 00000000..e0588403 --- /dev/null +++ b/pypots/nn/modules/segrnn/backbone.py @@ -0,0 +1,79 @@ +""" + +""" + +# Created by Shengsheng Lin + +from typing import Optional + +import torch +import torch.nn as nn + + +class BackboneSegRNN(nn.Module): + def __init__( + self, + n_steps: int, + n_features: int, + seg_len: int = 24, + d_model: int = 512, + dropout: float = 0.5 + ): + super().__init__() + + self.n_steps = n_steps + self.n_features = n_features + self.seg_len = seg_len + self.d_model = d_model + self.dropout = dropout + + + if n_steps % seg_len: + raise ValueError("The argument seg_len is necessary for SegRNN need to be divisible by the sequence length n_steps.") + + self.seg_num = self.n_steps // self.seg_len + self.valueEmbedding = nn.Sequential( + nn.Linear(self.seg_len, self.d_model), + nn.ReLU() + ) + self.rnn = nn.GRU(input_size=self.d_model, hidden_size=self.d_model, num_layers=1, bias=True, + batch_first=True, bidirectional=False) + self.pos_emb = nn.Parameter(torch.randn(self.seg_num, self.d_model // 2)) + self.channel_emb = nn.Parameter(torch.randn(self.n_features, self.d_model // 2)) + self.predict = nn.Sequential( + nn.Dropout(self.dropout), + nn.Linear(self.d_model, self.seg_len) + ) + + def forward(self, x): + # b:batch_size c:channel_size s:seq_len s:seq_len + # d:d_model w:seg_len n m:seg_num + batch_size = x.size(0) + + # normalization and permute b,s,c -> b,c,s + seq_last = x[:, -1:, :].detach() + x = (x - seq_last).permute(0, 2, 1) # b,c,s + + # segment and embedding b,c,s -> bc,n,w -> bc,n,d + x = self.valueEmbedding(x.reshape(-1, self.seg_num, self.seg_len)) + + # encoding + _, hn = self.rnn(x) # bc,n,d 1,bc,d + + # m,d//2 -> 1,m,d//2 -> c,m,d//2 + # c,d//2 -> c,1,d//2 -> c,m,d//2 + # c,m,d -> cm,1,d -> bcm, 1, d + pos_emb = torch.cat([ + self.pos_emb.unsqueeze(0).repeat(self.n_features, 1, 1), + self.channel_emb.unsqueeze(1).repeat(1, self.seg_num, 1) + ], dim=-1).view(-1, 1, self.d_model).repeat(batch_size,1,1) + + _, hy = self.rnn(pos_emb, hn.repeat(1, 1, self.seg_num).view(1, -1, self.d_model)) # bcm,1,d 1,bcm,d + + # 1,bcm,d -> 1,bcm,w -> b,c,s + y = self.predict(hy).view(-1, self.n_features, self.n_steps) + + # permute and denorm + y = y.permute(0, 2, 1) + seq_last + + return y