diff --git a/rul_datasets/adaption.py b/rul_datasets/adaption.py index 2088d8c..dc6e21b 100644 --- a/rul_datasets/adaption.py +++ b/rul_datasets/adaption.py @@ -2,16 +2,14 @@ import warnings from copy import deepcopy -from typing import List, Optional, Any, Tuple, Callable, Sequence, Union, cast +from typing import List, Optional, Any, Tuple, Callable, Sequence, cast import numpy as np import pytorch_lightning as pl import torch -from torch.utils.data import DataLoader, Dataset -from torch.utils.data.dataset import ConcatDataset, TensorDataset +from torch.utils.data import DataLoader, Dataset, ConcatDataset -from rul_datasets import utils -from rul_datasets.core import PairedRulDataset, RulDataModule +from rul_datasets.core import PairedRulDataset, RulDataModule, RulDataset class DomainAdaptionDataModule(pl.LightningDataModule): @@ -235,7 +233,7 @@ class LatentAlignDataModule(DomainAdaptionDataModule): >>> fd2 = rul_datasets.CmapssReader(fd=2, percent_broken=0.8) >>> src = rul_datasets.RulDataModule(fd1, 32) >>> trg = rul_datasets.RulDataModule(fd2, 32) - >>> dm = rul_datasets.LatentAlignDataModule(src, trg, split_by_max_rul=125) + >>> dm = rul_datasets.LatentAlignDataModule(src, trg, split_by_max_rul=True) >>> dm.prepare_data() >>> dm.setup() >>> train_1_2 = dm.train_dataloader() @@ -286,12 +284,15 @@ def __init__( def _get_training_dataset(self) -> "AdaptionDataset": source_healthy, source_degraded = split_healthy( - *self.source.load_split("dev"), by_max_rul=True + *self.source.data["dev"], by_max_rul=True + ) + target_features, target_labels = ( # reload only if needed to save memory + self.target.data["dev"] + if not self.inductive + else self.target.load_split("test", alias="dev") ) target_healthy, target_degraded = split_healthy( - *self.target.load_split("test" if self.inductive else "dev", alias="dev"), - self.split_by_max_rul, - self.split_by_steps, + target_features, target_labels, self.split_by_max_rul, self.split_by_steps ) healthy: Dataset = ConcatDataset([source_healthy, target_healthy]) dataset = AdaptionDataset(source_degraded, target_degraded, healthy) @@ -300,11 +301,11 @@ def _get_training_dataset(self) -> "AdaptionDataset": def split_healthy( - features: Union[List[np.ndarray], List[torch.Tensor]], - targets: Union[List[np.ndarray], List[torch.Tensor]], + features: List[np.ndarray], + targets: List[np.ndarray], by_max_rul: bool = False, by_steps: Optional[int] = None, -) -> Tuple[TensorDataset, TensorDataset]: +) -> Tuple[RulDataset, RulDataset]: """ Split the feature and target time series into healthy and degrading parts and return a dataset of each. @@ -329,19 +330,13 @@ def split_healthy( if not by_max_rul and (by_steps is None): raise ValueError("Either 'by_max_rul' or 'by_steps' need to be set.") - if isinstance(features[0], np.ndarray): - features, targets = cast(Tuple[List[np.ndarray], ...], (features, targets)) - _features, _targets = utils.to_tensor(features, targets) - else: - _features, _targets = cast(Tuple[List[torch.Tensor], ...], (features, targets)) - healthy = [] degraded = [] - for feature, target in zip(_features, _targets): + for feature, target in zip(features, targets): sections = _get_sections(by_max_rul, by_steps, target) - healthy_feat, degraded_feat = torch.split(feature, sections) - healthy_target, degraded_target = torch.split(target, sections) - degradation_steps = torch.arange(1, len(degraded_target) + 1) + healthy_feat, degraded_feat = np.split(feature, sections) + healthy_target, degraded_target = np.split(target, sections) + degradation_steps = np.arange(1, len(degraded_target) + 1) healthy.append((healthy_feat, healthy_target)) degraded.append((degraded_feat, degradation_steps, degraded_target)) @@ -352,22 +347,22 @@ def split_healthy( def _get_sections( - by_max_rul: bool, by_steps: Optional[int], target: torch.Tensor + by_max_rul: bool, by_steps: Optional[int], target: np.ndarray ) -> List[int]: # cast is needed for mypy and has no runtime effect if by_max_rul: - split_idx = cast(int, target.flip(0).argmax().item()) - sections = [len(target) - split_idx, split_idx] + split_idx = cast(int, np.flip(target, axis=0).argmax()) + sections = [len(target) - split_idx] else: by_steps = min(cast(int, by_steps), len(target)) - sections = [by_steps, len(target) - by_steps] + sections = [by_steps] return sections -def _to_dataset(data: Sequence[Tuple[torch.Tensor, ...]]) -> TensorDataset: - tensor_data = [torch.cat(h) for h in zip(*data)] - dataset = TensorDataset(*tensor_data) +def _to_dataset(data: Sequence[Tuple[np.ndarray, ...]]) -> RulDataset: + features, *targets = list(zip(*data)) + dataset = RulDataset(features, *targets) return dataset diff --git a/rul_datasets/baseline.py b/rul_datasets/baseline.py index 253655d..3b8faee 100644 --- a/rul_datasets/baseline.py +++ b/rul_datasets/baseline.py @@ -140,7 +140,6 @@ def __init__( self.min_distance = min_distance self.distance_mode = distance_mode self.window_size = self.unfailed.reader.window_size - self.source = unfailed_data_module self._check_loaders() @@ -209,7 +208,8 @@ def prepare_data(self, *args, **kwargs): self.unfailed.reader.prepare_data() def setup(self, stage: Optional[str] = None): - self.source.setup(stage) + self.unfailed.setup(stage) + self.failed.setup(stage) def train_dataloader(self, *args, **kwargs) -> DataLoader: return DataLoader( @@ -220,9 +220,9 @@ def val_dataloader(self, *args, **kwargs) -> List[DataLoader]: combined_loader = DataLoader( self._get_paired_dataset("val"), batch_size=self.batch_size, pin_memory=True ) - source_loader = self.source.val_dataloader() + unfailed_loader = self.unfailed.val_dataloader() - return [combined_loader, source_loader] + return [combined_loader, unfailed_loader] def _get_paired_dataset(self, split: str) -> PairedRulDataset: deterministic = split == "val" diff --git a/rul_datasets/core.py b/rul_datasets/core.py index 08734f2..3618123 100644 --- a/rul_datasets/core.py +++ b/rul_datasets/core.py @@ -6,7 +6,12 @@ import numpy as np import pytorch_lightning as pl import torch -from torch.utils.data import DataLoader, IterableDataset, TensorDataset, get_worker_info +from torch.utils.data import ( + DataLoader, + IterableDataset, + get_worker_info, + Dataset, +) from rul_datasets import utils from rul_datasets.reader import AbstractReader @@ -60,7 +65,7 @@ class RulDataModule(pl.LightningDataModule): >>> dm = rul_datasets.RulDataModule(cmapss, 32, degraded_only=["val", "test"]) """ - _data: Dict[str, Tuple[torch.Tensor, torch.Tensor]] + _data: Dict[str, Tuple[List[np.ndarray], List[np.ndarray]]] def __init__( self, @@ -126,7 +131,7 @@ def __init__( self.save_hyperparameters(hparams) @property - def data(self) -> Dict[str, Tuple[torch.Tensor, torch.Tensor]]: + def data(self) -> Dict[str, Tuple[List[np.ndarray], List[np.ndarray]]]: """ A dictionary of the training, validation and test splits. @@ -222,9 +227,9 @@ def setup(self, stage: Optional[str] = None) -> None: stage: Ignored. Only for adhering to parent class interface. """ self._data = { - "dev": self._setup_split("dev"), - "val": self._setup_split("val"), - "test": self._setup_split("test"), + "dev": self.load_split("dev"), + "val": self.load_split("val"), + "test": self.load_split("test"), } def load_split( @@ -232,7 +237,7 @@ def load_split( split: str, alias: Optional[str] = None, degraded_only: Optional[bool] = None, - ) -> Tuple[List[torch.Tensor], List[torch.Tensor]]: + ) -> Tuple[List[np.ndarray], List[np.ndarray]]: """ Load a split from the underlying reader and apply the feature extractor. @@ -254,27 +259,15 @@ def load_split( """ features, targets = self.reader.load_split(split, alias) features, targets = self._apply_feature_extractor_per_run(features, targets) - tensor_features, tensor_targets = utils.to_tensor(features, targets) if degraded_only is None: degraded_only = ( self.degraded_only is not None and (alias or split) in self.degraded_only ) if degraded_only: - self._filter_out_healthy(tensor_features, tensor_targets) - - return tensor_features, tensor_targets - - def _setup_split( - self, split: str, alias: Optional[str] = None - ) -> Tuple[torch.Tensor, torch.Tensor]: - features, targets = self.load_split(split, alias) - if features: - cat_features, cat_targets = torch.cat(features), torch.cat(targets) - else: - cat_features, cat_targets = torch.empty(0, 0, 0), torch.empty(0) + self._filter_out_healthy(features, targets) - return cat_features, cat_targets + return features, targets def _apply_feature_extractor_per_run( self, features: List[np.ndarray], targets: List[np.ndarray] @@ -296,7 +289,7 @@ def _extract_and_window( return features, targets - def _filter_out_healthy(self, tensor_features, tensor_targets): + def _filter_out_healthy(self, features, targets): if self.reader.max_rul is not None: thresh = self.reader.max_rul elif hasattr(self.reader, "norm_rul") and self.reader.norm_rul: @@ -306,10 +299,10 @@ def _filter_out_healthy(self, tensor_features, tensor_targets): "Cannot filter degraded samples if no max_rul is set and " "norm_rul is False." ) - for i in range(len(tensor_targets)): - degraded = tensor_targets[i] < thresh - tensor_features[i] = tensor_features[i][degraded] - tensor_targets[i] = tensor_targets[i][degraded] + for i in range(len(targets)): + degraded = targets[i] < thresh + features[i] = features[i][degraded] + targets[i] = targets[i][degraded] def train_dataloader(self, *args: Any, **kwargs: Any) -> DataLoader: """ @@ -387,7 +380,7 @@ def test_dataloader(self, *args: Any, **kwargs: Any) -> DataLoader: pin_memory=True, ) - def to_dataset(self, split: str, alias: Optional[str] = None) -> TensorDataset: + def to_dataset(self, split: str, alias: Optional[str] = None) -> "RulDataset": """ Create a dataset of a split. @@ -408,14 +401,66 @@ def to_dataset(self, split: str, alias: Optional[str] = None) -> TensorDataset: if (alias is None) or (split == alias): features, targets = self._data[split] else: - features, targets = self._setup_split(split, alias) - split_dataset = TensorDataset(features, targets) + features, targets = self.load_split(split, alias) + split_dataset = RulDataset(features, targets) return split_dataset +class RulDataset(Dataset): + """Internal dataset to hold multiple runs. + + Its length is the sum of all runs' lengths. + """ + + def __init__( + self, + features: List[np.ndarray], + *targets: List[np.ndarray], + copy_tensors: bool = False, + ) -> None: + """ + Create a new dataset from multiple runs. + + If `copy_tensors` is true, the tensors are copied to avoid side effects when + modifying them. Otherwise, the tensors use the same memory as the original + Numpy arrays to save space. + + Args: + features: The features of each run. + targets: The targets of each run. + copy_tensors: Whether to copy the tensors or not. + """ + super().__init__() + + self.features = features + self.targets = targets + self.copy_tensors = copy_tensors + + def __getitem__(self, index: int) -> Tuple[torch.Tensor, ...]: + if isinstance(index, slice): + raise NotImplementedError("Slicing is not supported by this dataset.") + for i in range(len(self.features)): + if index < len(self.features[i]): + tensor_feat = utils.feature_to_tensor( + self.features[i][index], copy=self.copy_tensors + ) + tensor_tar = tuple(torch.as_tensor(t[i][index]) for t in self.targets) + return tensor_feat, *tensor_tar + else: + index -= len(self.features[i]) + + raise IndexError(f"Index {index} out of range.") + + def __len__(self) -> int: + return sum(len(f) for f in self.features) + + class PairedRulDataset(IterableDataset): - """A dataset of sample pairs drawn from the same time series.""" + """A dataset of sample pairs drawn from the same time series. + + The dataset uses the runs exactly as loaded by the passed data modules. Options + like `degraded_only` need to be set there.""" def __init__( self, @@ -425,7 +470,6 @@ def __init__( min_distance: int, deterministic: bool = False, mode: str = "linear", - degraded_only: bool = False, ): super().__init__() @@ -435,14 +479,13 @@ def __init__( self.num_samples = num_samples self.deterministic = deterministic self.mode = mode - self.degraded_only = degraded_only for dm in self.dms: dm.check_compatibility(self.dms[0]) self._run_domain_idx: np.ndarray - self._features: List[torch.Tensor] - self._labels: List[torch.Tensor] + self._features: List[np.ndarray] + self._labels: List[np.ndarray] self._prepare_datasets() self._max_rul = self._get_max_rul() @@ -474,9 +517,7 @@ def _prepare_datasets(self): features = [] labels = [] for domain_idx, dm in enumerate(self.dms): - run_features, run_labels = dm.load_split( - self.split, degraded_only=self.degraded_only - ) + run_features, run_labels = dm.data[self.split] for feat, lab in zip(run_features, run_labels): if len(feat) > self.min_distance: run_domain_idx.append(domain_idx) @@ -518,7 +559,7 @@ def __next__(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tens def _get_pair_idx(self) -> Tuple[int, int, int, Union[int, float], int]: chosen_run_idx = self._rng.integers(0, len(self._features)) - domain_label = self._run_domain_idx[chosen_run_idx] + domain_label = cast(int, self._run_domain_idx[chosen_run_idx]) chosen_run = self._features[chosen_run_idx] run_length = chosen_run.shape[0] @@ -537,7 +578,7 @@ def _get_pair_idx(self) -> Tuple[int, int, int, Union[int, float], int]: def _get_pair_idx_piecewise(self) -> Tuple[int, int, int, Union[int, float], int]: chosen_run_idx = self._rng.integers(0, len(self._features)) - domain_label = self._run_domain_idx[chosen_run_idx] + domain_label = cast(int, self._run_domain_idx[chosen_run_idx]) chosen_run = self._features[chosen_run_idx] run_length = chosen_run.shape[0] @@ -559,7 +600,7 @@ def _get_pair_idx_piecewise(self) -> Tuple[int, int, int, Union[int, float], int def _get_labeled_pair_idx(self) -> Tuple[int, int, int, Union[int, float], int]: chosen_run_idx = self._rng.integers(0, len(self._features)) - domain_label = self._run_domain_idx[chosen_run_idx] + domain_label = cast(int, self._run_domain_idx[chosen_run_idx]) chosen_run = self._features[chosen_run_idx] chosen_labels = self._labels[chosen_run_idx] @@ -579,14 +620,14 @@ def _get_labeled_pair_idx(self) -> Tuple[int, int, int, Union[int, float], int]: def _build_pair( self, - run: torch.Tensor, + run: np.ndarray, anchor_idx: int, query_idx: int, distance: Union[int, float], domain_label: int, ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: - anchors = run[anchor_idx] - queries = run[query_idx] + anchors = utils.feature_to_tensor(run[anchor_idx], torch.float32) + queries = utils.feature_to_tensor(run[query_idx], torch.float32) domain_tensor = torch.tensor(domain_label, dtype=torch.float) distances = torch.tensor(distance, dtype=torch.float) if self._max_rul is not None: # normalize only if max_rul is set diff --git a/rul_datasets/utils.py b/rul_datasets/utils.py index 233ffd6..68469ca 100644 --- a/rul_datasets/utils.py +++ b/rul_datasets/utils.py @@ -1,12 +1,12 @@ import os -from typing import List, Optional, Callable, Dict, Tuple +import tempfile +from typing import List, Optional, Callable, Dict, Tuple, Literal import numpy as np import requests # type: ignore import torch from tqdm import tqdm # type: ignore - GDRIVE_URL_BASE = "https://docs.google.com/uc?export=download" @@ -60,7 +60,12 @@ def get_targets_from_file_paths( return targets -def extract_windows(seq: np.ndarray, window_size: int, dilation: int = 1) -> np.ndarray: +def extract_windows( + seq: np.ndarray, + window_size: int, + dilation: int = 1, + mode: Literal["memory", "memmap"] = "memory", +) -> np.ndarray: """ Extract sliding windows from a sequence. @@ -77,15 +82,26 @@ def extract_windows(seq: np.ndarray, window_size: int, dilation: int = 1) -> np. seq: sequence to extract windows from window_size: length of the sliding window dilation: dilation of the sliding window + mode: create windows either in memory or on disk Returns: array of sliding windows """ - if window_size > len(seq): + if (window_size * dilation) > len(seq): raise ValueError( - f"Cannot extract windows of size {window_size} with dilation {dilation}" + f"Cannot extract windows of size {window_size} with dilation {dilation} " f"from a sequence of length {len(seq)}." ) + if mode == "memory": + windows = _extract_windows_in_memory(seq, window_size, dilation) + elif mode == "memmap": + windows = _extract_windows_memmap(seq, window_size, dilation) + else: + raise ValueError(f"Unknown mode {mode}.") + + return windows + +def _extract_windows_in_memory(seq, window_size, dilation): num_frames = seq.shape[0] - (window_size - 1) * dilation window_idx = np.arange(window_size)[None, :] * dilation window_idx = window_idx + np.arange(num_frames)[:, None] @@ -94,6 +110,23 @@ def extract_windows(seq: np.ndarray, window_size: int, dilation: int = 1) -> np. return windows +def _extract_windows_memmap(seq, window_size, dilation): + num_frames = seq.shape[0] - (window_size - 1) * dilation + window_idx = np.arange(window_size)[None, :] * dilation + with tempfile.NamedTemporaryFile() as tmp_file: + windows = np.memmap( + tmp_file.name, + dtype=np.float32, + mode="w+", + shape=(num_frames, window_size, *seq.shape[1:]), + ) + for i in range(num_frames): + windows[i] = seq[window_idx + i] + windows.flush() + + return windows + + def download_file(url: str, save_path: str) -> None: response = requests.get(url, stream=True) if not response.status_code == 200: @@ -132,18 +165,19 @@ def _write_content(response: requests.Response, save_path: str) -> None: def to_tensor( - features: List[np.ndarray], *targets: List[np.ndarray] + features: List[np.ndarray], *targets: List[np.ndarray], copy: bool = False ) -> Tuple[List[torch.Tensor], ...]: dtype = torch.float32 - tensor_feats = [feature_to_tensor(f, dtype) for f in features] - tensor_targets = [ - [torch.tensor(t, dtype=dtype) for t in target] for target in targets - ] + tensor_feats = [feature_to_tensor(f, dtype, copy) for f in features] + convert: Callable = torch.tensor if copy else torch.as_tensor # type: ignore + tensor_targets = [[convert(t, dtype=dtype) for t in target] for target in targets] return tensor_feats, *tensor_targets -def feature_to_tensor(features: np.ndarray, dtype: torch.dtype) -> torch.Tensor: +def feature_to_tensor( + features: np.ndarray, dtype: torch.dtype = torch.float32, copy: bool = False +) -> torch.Tensor: """ Convert a numpy array to a torch tensor of `dtype` and swap the last dimensions. @@ -154,5 +188,10 @@ def feature_to_tensor(features: np.ndarray, dtype: torch.dtype) -> torch.Tensor: Args: features: numpy array to convert dtype: dtype of the resulting tensor + copy: whether to copy the array before converting it """ - return torch.transpose(torch.tensor(features, dtype=dtype), -1, -2) + if copy: + features = np.copy(features) + tensor = torch.transpose(torch.as_tensor(features, dtype=dtype), -1, -2) + + return tensor diff --git a/tests/templates.py b/tests/templates.py index f01df86..c9da0c4 100644 --- a/tests/templates.py +++ b/tests/templates.py @@ -7,7 +7,7 @@ from pytorch_lightning import LightningDataModule from torch.utils.data import TensorDataset -from rul_datasets.core import PairedRulDataset +from rul_datasets.core import PairedRulDataset, RulDataset from rul_datasets.reader import AbstractReader @@ -40,7 +40,7 @@ def _check_paired_shapes(self, data): self.assertEqual(torch.Size(()), domain_labels.shape) def _check_tensor_dataset(self, data): - self.assertIsInstance(data, TensorDataset) + self.assertIsInstance(data, RulDataset) self._check_cmapss_shapes(data) def _check_cmapss_shapes(self, data): diff --git a/tests/test_adaption.py b/tests/test_adaption.py index 0c70ff2..d7b9690 100644 --- a/tests/test_adaption.py +++ b/tests/test_adaption.py @@ -375,9 +375,9 @@ def test_latent_align_data_module(mock_split_healthy, by_max_rul, by_steps): source = mock.MagicMock(core.RulDataModule) source.batch_size = 32 source.reader.window_size = 30 - source.load_split.return_value = ([torch.zeros(1)],) * 2 + source.data = {"dev": ([torch.zeros(1)],) * 2} target = mock.MagicMock(core.RulDataModule) - target.load_split.return_value = ([torch.ones(1)],) * 2 + target.data = {"dev": ([torch.ones(1)],) * 2} dm = adaption.LatentAlignDataModule( source, target, split_by_max_rul=by_max_rul, split_by_steps=by_steps @@ -388,13 +388,13 @@ def test_latent_align_data_module(mock_split_healthy, by_max_rul, by_steps): mock_split_healthy.assert_has_calls( [ mock.call( - source.load_split.return_value[0], - source.load_split.return_value[1], + source.data["dev"][0], + source.data["dev"][1], by_max_rul=True, ), mock.call( - target.load_split.return_value[0], - target.load_split.return_value[1], + target.data["dev"][0], + target.data["dev"][1], by_max_rul, by_steps, ), @@ -406,14 +406,24 @@ def test_latent_align_data_module(mock_split_healthy, by_max_rul, by_steps): "rul_datasets.adaption.split_healthy", return_value=(TensorDataset(torch.zeros(1)),) * 2, ) -@pytest.mark.parametrize(["inductive", "exp_split"], [(True, "test"), (False, "dev")]) -def test_latent_align_data_module_inductive(_, inductive, exp_split): - source = mock.MagicMock(core.RulDataModule) +@pytest.mark.parametrize("inductive", [True, False]) +def test_latent_align_data_module_inductive(_, inductive, mocker): + source = mocker.MagicMock(core.RulDataModule) source.batch_size = 32 source.reader.window_size = 30 - source.load_split.return_value = ([torch.zeros(1)],) * 2 - target = mock.MagicMock(core.RulDataModule) - target.load_split.return_value = ([torch.zeros(1)],) * 2 + source.data.__getitem__.return_value = ( + mock.sentinel.source_features, + mocker.sentinel.source_targets, + ) + target = mocker.MagicMock(core.RulDataModule) + target.load_split.return_value = ( + mock.sentinel.target_features, + mocker.sentinel.target_targets, + ) + target.data.__getitem__.return_value = ( + mock.sentinel.target_features, + mocker.sentinel.target_targets, + ) dm = adaption.LatentAlignDataModule( source, target, inductive=inductive, split_by_max_rul=True @@ -421,8 +431,12 @@ def test_latent_align_data_module_inductive(_, inductive, exp_split): dm.train_dataloader() - source.load_split.assert_called_once_with("dev") - target.load_split.assert_called_once_with(exp_split, alias="dev") + source.data.__getitem__.assert_called_once_with("dev") + if inductive: + # data should only be reloaded if necessary + target.load_split.assert_called_once_with("test", alias="dev") + else: + target.data.__getitem__.assert_called_once_with("dev") def test_latent_align_with_dummy(): @@ -439,15 +453,10 @@ def test_latent_align_with_dummy(): assert len(batch) == 6 -@pytest.mark.parametrize( - ["features", "targets"], - [ - ([np.random.randn(11, 100, 2)], [np.minimum(np.arange(11)[::-1], 5)]), - ([torch.randn(11, 2, 100)], [torch.clamp_max(torch.arange(11).flip(0), 5)]), - ], -) @pytest.mark.parametrize(["by_max_rul", "by_steps"], [(True, None), (False, 6)]) -def test_split_healthy(features, targets, by_max_rul, by_steps): +def test_split_healthy(by_max_rul, by_steps): + features = [np.random.randn(11, 100, 2)] + targets = [np.minimum(np.arange(11)[::-1], 5)] healthy, degraded = adaption.split_healthy(features, targets, by_max_rul, by_steps) assert len(healthy) == 6 diff --git a/tests/test_baseline.py b/tests/test_baseline.py index 0ec60c2..ae71385 100644 --- a/tests/test_baseline.py +++ b/tests/test_baseline.py @@ -11,59 +11,63 @@ from tests.templates import PretrainingDataModuleTemplate -class TestBaselineDataModule(unittest.TestCase): - def setUp(self): - self.mock_loader = mock.MagicMock(name="AbstractLoader") - self.mock_loader.fd = 1 - self.mock_loader.fds = [1, 2, 3] - self.mock_loader.hparams = { - "fd": self.mock_loader.fd, - "window_size": self.mock_loader.window_size, +class TestBaselineDataModule: + @pytest.fixture() + def mock_reader(self): + mock_reader = mock.MagicMock(name="AbstractLoader") + mock_reader.fd = 1 + mock_reader.fds = [1, 2, 3] + mock_reader.hparams = { + "fd": mock_reader.fd, + "window_size": mock_reader.window_size, } - self.mock_runs = [np.zeros((1, 1, 1))], [np.zeros(1)] - self.mock_loader.load_split.return_value = self.mock_runs + mock_runs = [np.zeros((1, 1, 1))], [np.zeros(1)] + mock_reader.load_split.return_value = mock_runs + + return mock_reader + + @pytest.fixture() + def dataset(self, mock_reader): + base_module = rul_datasets.RulDataModule(mock_reader, batch_size=16) + dataset = rul_datasets.BaselineDataModule(base_module) + dataset.prepare_data() + dataset.setup() + + return dataset + + def test_test_sets_created_correctly(self, mock_reader, dataset): + for fd in mock_reader.fds: + assert fd in dataset.subsets + assert fd == dataset.subsets[fd].reader.fd + if fd == dataset.data_module.reader.fd: + assert dataset.data_module is dataset.subsets[fd] + else: + assert dataset.subsets[fd].reader.percent_fail_runs is None + assert dataset.subsets[fd].reader.percent_broken is None - self.base_module = rul_datasets.RulDataModule(self.mock_loader, batch_size=16) - self.dataset = rul_datasets.BaselineDataModule(self.base_module) - self.dataset.prepare_data() - self.dataset.setup() + def test_selected_source_on_train(self, dataset, mocker): + mocker.patch.object( + dataset.data_module, "train_dataloader", return_value=mocker.sentinel.dl + ) + assert dataset.train_dataloader() is mocker.sentinel.dl - def test_test_sets_created_correctly(self): - for fd in self.mock_loader.fds: - self.assertIn(fd, self.dataset.subsets) - self.assertEqual(fd, self.dataset.subsets[fd].reader.fd) - if fd == self.dataset.data_module.reader.fd: - self.assertIs(self.dataset.data_module, self.dataset.subsets[fd]) - else: - self.assertIsNone(self.dataset.subsets[fd].reader.percent_fail_runs) - self.assertIsNone(self.dataset.subsets[fd].reader.percent_broken) - - def test_selected_source_on_train(self): - baseline_train_dataset = self.dataset.train_dataloader().dataset - source_train_dataset = self.dataset.data_module.train_dataloader().dataset - self._assert_datasets_equal(baseline_train_dataset, source_train_dataset) - - def test_selected_source_on_val(self): - baseline_val_dataset = self.dataset.val_dataloader().dataset - source_val_dataset = self.dataset.data_module.val_dataloader().dataset - self._assert_datasets_equal(baseline_val_dataset, source_val_dataset) - - def test_selected_all_on_test(self): - baseline_test_loaders = self.dataset.test_dataloader() - for fd, baseline_test_loader in enumerate(baseline_test_loaders, start=1): - baseline_test_dataset = baseline_test_loader.dataset - test_dataset = self.dataset.subsets[fd].test_dataloader().dataset - self._assert_datasets_equal(baseline_test_dataset, test_dataset) - - def _assert_datasets_equal(self, baseline_dataset, inner_dataset): - num_samples = len(baseline_dataset) - baseline_data = baseline_dataset[:num_samples] - inner_data = inner_dataset[:num_samples] - for baseline, inner in zip(baseline_data, inner_data): - self.assertEqual(0, torch.sum(baseline - inner)) - - def test_hparams(self): - self.assertDictEqual(self.base_module.hparams, self.dataset.hparams) + def test_selected_source_on_val(self, dataset, mocker): + mocker.patch.object( + dataset.data_module, "val_dataloader", return_value=mocker.sentinel.dl + ) + assert dataset.val_dataloader() is mocker.sentinel.dl + + def test_selected_all_on_test(self, dataset, mocker): + for fd in [1, 2, 3]: + sentinel = getattr(mocker.sentinel, f"dl_{fd}") + mocker.patch.object( + dataset.subsets[fd], "test_dataloader", return_value=sentinel + ) + for fd, baseline_test_loader in enumerate(dataset.test_dataloader(), start=1): + assert baseline_test_loader is getattr(mocker.sentinel, f"dl_{fd}") + + def test_hparams(self, dataset): + assert dataset.hparams, dataset.data_module.hparams class TestPretrainingBaselineDataModuleFullData( diff --git a/tests/test_core.py b/tests/test_core.py index cca7d1e..bbe6fa9 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -9,7 +9,7 @@ import torch from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset -from rul_datasets import core, reader, RulDataModule +from rul_datasets import core, reader, RulDataModule, utils @pytest.fixture() @@ -79,7 +79,6 @@ def test_setup(self, mock_loader, mock_runs): mock_loader.load_split.assert_has_calls( [mock.call("dev", None), mock.call("val", None), mock.call("test", None)] ) - mock_runs = tuple(torch.tensor(np.concatenate(r)) for r in mock_runs) assert dataset._data == {"dev": mock_runs, "val": mock_runs, "test": mock_runs} @pytest.mark.parametrize("split", ["dev", "val", "test"]) @@ -211,8 +210,8 @@ def test_to_dataset(self, mock_loader): for i, split in enumerate(["dev", "val", "test"]): tensor_dataset = dataset.to_dataset(split) - assert isinstance(tensor_dataset, TensorDataset) - assert i == len(tensor_dataset.tensors[0]) + assert isinstance(tensor_dataset, core.RulDataset) + assert i == len(tensor_dataset.features) def test_check_compatability(self, mock_loader): fe = lambda x: np.mean(x, axis=2) @@ -307,6 +306,10 @@ def __init__(self, length, norm_rul=False): [np.zeros((100, self.window_size, 5))], [np.clip(np.arange(100, 0, step=-1), a_min=None, a_max=125) / norm], ), + "test": ( + [np.zeros((10, self.window_size, 5))], + [np.clip(np.arange(10, 0, step=-1), a_min=None, a_max=125) / norm], + ), } @property @@ -361,6 +364,8 @@ class DummyRulShortRuns(reader.AbstractReader): np.ones(1) * 500, ], ), + "val": ([], []), + "test": ([], []), } @property @@ -381,10 +386,7 @@ def prepare_data(self): pass def load_complete_split(self, split, alias): - if not split == "dev": - raise ValueError(f"DummyRulShortRuns does not have a '{split}' split") - - return self.data["dev"] + return self.data[split] def load_split(self, split, alias): return self.load_complete_split(split, alias) @@ -397,17 +399,26 @@ def length(): @pytest.fixture def cmapss_normal(length): - return RulDataModule(DummyRul(length), 32) + dm = RulDataModule(DummyRul(length), 32) + dm.setup() + + return dm @pytest.fixture def cmapss_normed(length): - return RulDataModule(DummyRul(length, norm_rul=True), 32) + dm = RulDataModule(DummyRul(length, norm_rul=True), 32) + dm.setup() + + return dm @pytest.fixture def cmapss_short(): - return RulDataModule(DummyRulShortRuns(), 32) + dm = RulDataModule(DummyRulShortRuns(), 32) + dm.setup() + + return dm class TestPairedDataset: @@ -461,8 +472,8 @@ def test_sampled_data(self, cmapss_short): for i, sample in enumerate(data): idx = 3 * i expected_run = data._features[fixed_idx[idx]] - expected_anchor = torch.tensor(expected_run[fixed_idx[idx + 1]]) - expected_query = torch.tensor(expected_run[fixed_idx[idx + 2]]) + expected_anchor = utils.feature_to_tensor(expected_run[fixed_idx[idx + 1]]) + expected_query = utils.feature_to_tensor(expected_run[fixed_idx[idx + 2]]) expected_distance = min(125, fixed_idx[idx + 2] - fixed_idx[idx + 1]) / 125 expected_domain_idx = 0 assert 0 == torch.dist(expected_anchor, sample[0]) @@ -580,17 +591,9 @@ def test_compatability_check(self): RulDataModule(DummyRulShortRuns(window_size=20), 32), ] for dm in dms: + dm.setup() dm.check_compatibility = mock_check_compat core.PairedRulDataset(dms, "dev", 1000, 1) assert 2 == mock_check_compat.call_count - - @pytest.mark.parametrize("degraded_only", [True, False]) - def test_degraded_only(self, degraded_only, cmapss_normal, mocker): - spy_load_split = mocker.spy(cmapss_normal, "load_split") - core.PairedRulDataset( - [cmapss_normal], "dev", 1000, 1, degraded_only=degraded_only - ) - - spy_load_split.assert_called_with("dev", degraded_only=degraded_only) diff --git a/tests/test_utils.py b/tests/test_utils.py index f1aef27..73829e3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,6 @@ import os import random +import sys import numpy as np import numpy.testing as npt @@ -65,6 +66,34 @@ def test_extract_windows(window_size, dilation): npt.assert_equal(windows[i], expected_window) +def test_extract_windows_memmap_identical(tmp_path): + inputs = np.random.randn(100, 16) + + windows = utils.extract_windows(inputs, 10, 1) + windows_memmap = utils.extract_windows(inputs, 10, 1, mode="memmap") + + npt.assert_almost_equal(windows, windows_memmap) + + +def test_extract_windows_memmap_auto_deletes(): + tmp_file_name = None + + def _extract_tmp_file_name(event_name, args): + """Grabs temporary file name from 'tempfile.mkstemp' audit event.""" + if event_name == "tempfile.mkstemp": + nonlocal tmp_file_name + tmp_file_name = args[0] + + sys.addaudithook(_extract_tmp_file_name) + inputs = np.random.randn(100, 16) + + windows = utils.extract_windows(inputs, 10, 1, mode="memmap") + + windows.max() # check if memmap is accessible + del windows + assert not os.path.exists(tmp_file_name) # check if memmap is deleted + + @pytest.mark.parametrize("num_targets", [0, 1, 2]) @pytest.mark.parametrize("num_batch_dims", [0, 1, 2, 3]) def test_to_tensor(num_targets, num_batch_dims):