Skip to content

Commit

Permalink
Merge pull request #128 from WenjieDu/dev
Browse files Browse the repository at this point in the history
Fix VaDER failed testing and MRNN failed training on multiple devices
  • Loading branch information
WenjieDu authored May 22, 2023
2 parents 9cb2762 + 037166e commit 201fe6f
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 82 deletions.
6 changes: 6 additions & 0 deletions pypots/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ def _setup_device(self, device):
self.device = device
elif isinstance(device, list):
# parallely training on multiple CUDA devices

# ensure the list is not empty
assert (
len(device) > 1
), "The list of devices should have at least 1 device, but got 0."

device_list = []
for idx, d in enumerate(device):
if isinstance(d, str):
Expand Down
4 changes: 1 addition & 3 deletions pypots/classification/grud/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
import torch.nn.functional as F
from torch.utils.data import DataLoader

from ..base import BaseNNClassifier
from .data import DatasetForGRUD
from ..base import BaseNNClassifier
from ...imputation.brits.modules import TemporalDecay
from ...optim.adam import Adam
from ...optim.base import Optimizer
from ...utils.logging import logger


class _GRUD(nn.Module):
Expand Down Expand Up @@ -104,7 +103,6 @@ def forward(self, inputs: dict, training: bool = True) -> dict:
return {"classification_pred": classification_pred}

torch.log(classification_pred)
logger.error(f"ZShape {classification_pred.shape}")
classification_loss = F.nll_loss(
torch.log(classification_pred), inputs["label"]
)
Expand Down
13 changes: 8 additions & 5 deletions pypots/classification/raindrop/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@
from torch.nn.parameter import Parameter
from torch.utils.data import DataLoader

from .modules import (
PositionalEncoding,
ObservationPropagation,
)
from ...classification.base import BaseNNClassifier
from ...classification.grud.data import DatasetForGRUD
from ...optim.adam import Adam
from ...optim.base import Optimizer
from ...utils.logging import logger

try:
from .modules import PositionalEncoding, ObservationPropagation
from torch_geometric.nn.inits import glorot
except ImportError as e:
logger.error(
f"{e}\n"
"torch_geometric is missing, "
"Note torch_geometric is missing, "
"please install it with 'pip install torch_geometric' or 'conda install -c pyg pyg'"
)
except NameError as e:
logger.error(
f"{e}\n"
"Note torch_geometric is missing, "
"please install it with 'pip install torch_geometric' or 'conda install -c pyg pyg'"
)

Expand Down
6 changes: 3 additions & 3 deletions pypots/clustering/crli/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def forward(
self,
inputs: dict,
training_object: str = "generator",
mode: str = "training",
training: bool = True,
) -> dict:
assert training_object in [
"generator",
Expand All @@ -89,7 +89,7 @@ def forward(
batch_size, n_steps, n_features = X.shape
losses = {}
inputs = self.cluster(inputs, training_object)
if mode == "clustering":
if not training:
# if only run clustering, then no need to calculate loss
return inputs

Expand Down Expand Up @@ -432,7 +432,7 @@ def cluster(
with torch.no_grad():
for idx, data in enumerate(test_loader):
inputs = self._assemble_input_for_testing(data)
inputs = self.model.cluster(inputs)
inputs = self.model.forward(inputs, training=False)
latent_collector.append(inputs["fcn_latent"])

latent_collector = torch.cat(latent_collector).cpu().detach().numpy()
Expand Down
95 changes: 51 additions & 44 deletions pypots/clustering/vader/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from .data import DatasetForVaDER
from .modules import (
inverse_softplus,
GMMLayer,
PeepholeLSTMCell,
ImplicitImputation,
Expand Down Expand Up @@ -167,9 +168,11 @@ def forward(
self,
inputs: dict,
pretrain: bool = False,
mode: str = "training",
training: bool = True,
) -> dict:
X, missing_mask = inputs["X"], inputs["missing_mask"]
device = X.device

(
X_reconstructed,
mu_c,
Expand All @@ -180,37 +183,17 @@ def forward(
stddev_tilde,
) = self.get_results(X, missing_mask)

if mode == "clustering":

def func_to_apply(
mu_t_: np.ndarray,
mu_: np.ndarray,
stddev_: np.ndarray,
phi_: np.ndarray,
) -> np.ndarray:
# the covariance matrix is diagonal, so we can just take the product
return np.log(self.eps + phi_) + np.log(
self.eps
+ multivariate_normal.pdf(mu_t_, mean=mu_, cov=np.diag(stddev_))
)
if not training and not pretrain:

mu_tilde = mu_tilde.detach().cpu().numpy()
mu = mu_c.detach().cpu().numpy()
var = var_c.detach().cpu().numpy()
phi = phi_c.detach().cpu().numpy()
p = np.array(
[
func_to_apply(mu_tilde, mu[i], var[i], phi[i])
for i in np.arange(mu.shape[0])
]
)
clustering_results = np.argmax(p, axis=0)
results = {"clustering_pred": clustering_results}
results = {
"mu_tilde": mu_tilde,
"mu": mu_c,
"var": var_c,
"phi": phi_c,
}
# if only run clustering, then no need to calculate loss
return results

device = X.device

# calculate the reconstruction loss
unscaled_reconstruction_loss = cal_mse(X_reconstructed, X, missing_mask)
reconstruction_loss = (
Expand Down Expand Up @@ -283,12 +266,6 @@ def func_to_apply(
return results


def inverse_softplus(x: np.ndarray) -> np.ndarray:
b = x < 1e2
x[b] = np.log(np.exp(x[b]) - 1.0 + 1e-9)
return x


class VaDER(BaseNNClusterer):
"""The PyTorch implementation of the VaDER model :cite:`dejong2019VaDER`.
Expand Down Expand Up @@ -384,6 +361,11 @@ def __init__(
saving_path,
model_saving_strategy,
)

assert (
pretrain_epochs > 0
), f"pretrain_epochs must be a positive integer, but got {pretrain_epochs}"

self.n_steps = n_steps
self.n_features = n_features
self.pretrain_epochs = pretrain_epochs
Expand Down Expand Up @@ -492,20 +474,22 @@ def _train_model(
mu = gmm.means_
var = inverse_softplus(gmm.covariances_)
phi = np.log(gmm.weights_ + 1e-9) # inverse softmax
device = results["z"].device

# use trained GMM's parameters to init GMM layer's
if isinstance(self.device, list): # if using multi-GPU
self.model.module.gmm_layer.set_values(
torch.from_numpy(mu).to(results["z"].device),
torch.from_numpy(var).to(results["z"].device),
torch.from_numpy(phi).to(results["z"].device),
torch.from_numpy(mu).to(device),
torch.from_numpy(var).to(device),
torch.from_numpy(phi).to(device),
)
else:
self.model.gmm_layer.set_values(
torch.from_numpy(mu).to(results["z"].device),
torch.from_numpy(var).to(results["z"].device),
torch.from_numpy(phi).to(results["z"].device),
torch.from_numpy(mu).to(device),
torch.from_numpy(var).to(device),
torch.from_numpy(phi).to(device),
)

try:
training_step = 0
for epoch in range(self.epochs):
Expand Down Expand Up @@ -629,10 +613,33 @@ def cluster(self, X: Union[dict, str], file_type: str = "h5py") -> np.ndarray:
with torch.no_grad():
for idx, data in enumerate(test_loader):
inputs = self._assemble_input_for_testing(data)
results = self.model.forward(inputs, mode="clustering")[
"clustering_pred"
]
clustering_results_collector.append(results)
results = self.model.forward(inputs, training=False)

mu_tilde = results["mu_tilde"].cpu().numpy()
mu = results["mu"].cpu().numpy()
var = results["var"].cpu().numpy()
phi = results["phi"].cpu().numpy()

def func_to_apply(
mu_t_: np.ndarray,
mu_: np.ndarray,
stddev_: np.ndarray,
phi_: np.ndarray,
) -> np.ndarray:
# the covariance matrix is diagonal, so we can just take the product
return np.log(1e-9 + phi_) + np.log(
1e-9
+ multivariate_normal.pdf(mu_t_, mean=mu_, cov=np.diag(stddev_))
)

p = np.array(
[
func_to_apply(mu_tilde, mu[i], var[i], phi[i])
for i in np.arange(mu.shape[0])
]
)
clustering_results = np.argmax(p, axis=0)
clustering_results_collector.append(clustering_results)

clustering_results = np.concatenate(clustering_results_collector)
return clustering_results
7 changes: 7 additions & 0 deletions pypots/clustering/vader/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@

from typing import Tuple, Optional

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.nn.parameter import Parameter


def inverse_softplus(x: np.ndarray) -> np.ndarray:
b = x < 1e2
x[b] = np.log(np.exp(x[b]) - 1.0 + 1e-9)
return x


class ImplicitImputation(nn.Module):
def __init__(self, d_input: int):
super().__init__()
Expand Down
22 changes: 12 additions & 10 deletions pypots/imputation/mrnn/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,29 @@ def __init__(self, seq_len, feature_num, rnn_hidden_size, device):

self.f_rnn = nn.GRUCell(self.feature_num * 3, self.rnn_hidden_size)
self.b_rnn = nn.GRUCell(self.feature_num * 3, self.rnn_hidden_size)
self.rnn_cells = {"forward": self.f_rnn, "backward": self.b_rnn}
self.concated_hidden_project = nn.Linear(
self.rnn_hidden_size * 2, self.feature_num
)
self.fcn_regression = FCN_Regression(feature_num, rnn_hidden_size)

def gene_hidden_states(self, data, direction):
values = data[direction]["X"]
masks = data[direction]["missing_mask"]
deltas = data[direction]["deltas"]
def gene_hidden_states(self, inputs, direction):
X = inputs[direction]["X"]
masks = inputs[direction]["missing_mask"]
deltas = inputs[direction]["deltas"]
device = X.device

hidden_states_collector = []
hidden_state = torch.zeros(
(values.size()[0], self.rnn_hidden_size), device=self.device
)
hidden_state = torch.zeros((X.size()[0], self.rnn_hidden_size), device=device)

for t in range(self.seq_len):
x = values[:, t, :]
x = X[:, t, :]
m = masks[:, t, :]
d = deltas[:, t, :]
inputs = torch.cat([x, m, d], dim=1)
hidden_state = self.rnn_cells[direction](inputs, hidden_state)
if direction == "forward":
hidden_state = self.f_rnn(inputs, hidden_state)
else:
hidden_state = self.b_rnn(inputs, hidden_state)
hidden_states_collector.append(hidden_state)
return hidden_states_collector

Expand Down Expand Up @@ -189,6 +190,7 @@ def __init__(
self.n_features = n_features
self.rnn_hidden_size = rnn_hidden_size

# set up the model
self.model = _MRNN(
self.n_steps,
self.n_features,
Expand Down
14 changes: 7 additions & 7 deletions tests/test_forecasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@

import pytest

from pypots.data.generating import gene_incomplete_random_walk_dataset
from pypots.forecasting import BTTF
from pypots.utils.logging import logger
from pypots.utils.metrics import cal_mae
from tests.global_test_config import DATA

EPOCHS = 5
DATA = gene_incomplete_random_walk_dataset(n_steps=60, n_features=10)
TEST_SET = {"X": DATA["test_X"][:, :50]}
N_PRED_STEP = 4
TEST_SET = {"X": DATA["test_X"][:, :-N_PRED_STEP]}


class TestBTTF(unittest.TestCase):
logger.info("Running tests for a forecasting model BTTF...")

# initialize a BTTF model
bttf = BTTF(
n_steps=50,
n_steps=DATA["n_steps"] - N_PRED_STEP,
n_features=10,
pred_step=10,
pred_step=N_PRED_STEP,
rank=10,
time_lags=[1, 2, 3, 10, 10 + 1, 10 + 2, 20, 20 + 1, 20 + 2],
time_lags=[1, 2, 3, 5, 5 + 1, 5 + 2, 10, 10 + 1, 10 + 2],
burn_iter=5,
gibbs_iter=5,
multi_step=1,
Expand All @@ -38,7 +38,7 @@ class TestBTTF(unittest.TestCase):
def test_0_forecasting(self):
predictions = self.bttf.forecast(TEST_SET)
logger.info(f"prediction shape: {predictions.shape}")
mae = cal_mae(predictions, DATA["test_X_intact"][:, 50:])
mae = cal_mae(predictions, DATA["test_X_intact"][:, -N_PRED_STEP:])
logger.info(f"prediction MAE: {mae}")


Expand Down
Loading

0 comments on commit 201fe6f

Please sign in to comment.