From 6652953e7a6f2447f301d1829a95064aa6063ce3 Mon Sep 17 00:00:00 2001 From: Ayush Shenoy Date: Thu, 2 May 2024 10:31:14 -0400 Subject: [PATCH] Refactor data file path code Remove repetitive code to get appropriate data files --- mdx/format/__init__.py | 1 - mdx/ingest.py | 151 ++++++++++++++++++--------------- mdx/models/__init__.py | 3 + mdx/models/core.py | 13 +++ mdx/{format => models}/meta.py | 18 +--- 5 files changed, 101 insertions(+), 85 deletions(-) delete mode 100644 mdx/format/__init__.py create mode 100644 mdx/models/__init__.py create mode 100644 mdx/models/core.py rename mdx/{format => models}/meta.py (78%) diff --git a/mdx/format/__init__.py b/mdx/format/__init__.py deleted file mode 100644 index a9a2c5b..0000000 --- a/mdx/format/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__all__ = [] diff --git a/mdx/ingest.py b/mdx/ingest.py index e7864e1..ad32c4a 100755 --- a/mdx/ingest.py +++ b/mdx/ingest.py @@ -5,7 +5,8 @@ import pandas as pd import io import yaml -from .format.meta import FormatMeta +from mdx.models.core import check_path +from mdx.models.meta import FormatMeta from pydantic import PositiveInt, ValidationError, validate_call from typing import Union @@ -25,31 +26,6 @@ class InvalidChunks(Exception): pass -# Validators - - -@validate_call -def decide_chunks( - given_chunks: Union[list[int], int], meta_chunks: PositiveInt -) -> list[int]: - """ - Validate given chunks against metadata - """ - if type(given_chunks) is int: - given_chunks = [given_chunks] - valid_chunks = [i for i in range(meta_chunks)] - if given_chunks is not None: - if set(given_chunks) <= set(valid_chunks): - return given_chunks - else: - raise InvalidChunks( - f"Valid chunks are in range [0,...,{valid_chunks[-1]}]. Was supplied {given_chunks}" - ) - # raise ValidationError() - else: - return valid_chunks - - class Simulation: """ Representation of an entire simulation run, with its metadata. @@ -86,7 +62,7 @@ def __init__( self.eager = eager self.block = block_size - self.chunks = decide_chunks(chunks, self.meta["partition"]["n_chunks"]) + self.chunks = self.__decide_chunks(chunks, self.meta["partition"]["n_chunks"]) self.trajectory = None self.bonds = None @@ -100,24 +76,44 @@ def __init__( # End user methods - def read_bonds(self, data_path: os.PathLike = None) -> None: + def read_trajectory( + self, data_path: os.PathLike = None, atomic_format: str = "frame" + ) -> None: """ - Read bond files, parse and store in class attribute + Read trajectory files, parse and store in class attribute Args: data_path (os.PathLike): alternate base path containing chosen chunks + atomic_format: format to project trajectories into """ - base_path = self.meta["data_path"] if data_path is None else data_path - file_paths = [ - os.path.join( - base_path, - f"{chunk}/dat_bonds_{self.meta['sim_id']}_{chunk}.reaxff", + corpus = ( + db.read_text( + self.__get_data_files(data_path, "trajectory"), + linedelimiter="TIMESTEP", + blocksize=self.block, ) - for chunk in self.chunks - ] + .remove(lambda x: x == "ITEM: TIMESTEP") + .map(lambda x: x.split("ITEM: ")) + .map(lambda x: x[:-1] if (x[-1] == "TIMESTEP") else x) + .map(self.__process_traj_step, atomic_format=atomic_format) + # .distinct(key=lambda x: x["timestep"]) ; causes memory leak on nanosecond scale data + ) + + self.trajectory = corpus.compute() if self.eager else corpus + def read_bonds(self, data_path: os.PathLike = None) -> None: + """ + Read bond files, parse and store in class attribute + + Args: + data_path (os.PathLike): alternate base path containing chosen chunks + """ corpus = ( - db.read_text(file_paths, linedelimiter="# Timestep", blocksize=self.block) + db.read_text( + self.__get_data_files(data_path, "bonds"), + linedelimiter="# Timestep", + blocksize=self.block, + ) .remove(lambda x: x == "# Timestep") .map( lambda x: [ @@ -133,36 +129,6 @@ def read_bonds(self, data_path: os.PathLike = None) -> None: self.bonds = corpus.compute() if self.eager else corpus - def read_trajectory( - self, data_path: os.PathLike = None, atomic_format: str = "frame" - ) -> None: - """ - Read trajectory files, parse and store in class attribute - - Args: - data_path (os.PathLike): alternate base path containing chosen chunks - atomic_format: format to project trajectories into - """ - base_path = self.meta["data_path"] if data_path is None else data_path - file_paths = [ - os.path.join( - base_path, - f"{chunk}/dat_trajectory_{self.meta['sim_id']}_{chunk}.dump", - ) - for chunk in self.chunks - ] - - corpus = ( - db.read_text(file_paths, linedelimiter="TIMESTEP", blocksize=self.block) - .remove(lambda x: x == "ITEM: TIMESTEP") - .map(lambda x: x.split("ITEM: ")) - .map(lambda x: x[:-1] if (x[-1] == "TIMESTEP") else x) - .map(self.__process_traj_step, atomic_format=atomic_format) - # .distinct(key=lambda x: x["timestep"]) ; causes memory leak on nanosecond scale data - ) - - self.trajectory = corpus.compute() if self.eager else corpus - def read_species(self): pass @@ -241,8 +207,6 @@ def __process_bond_step(self, step_text: str): """ Parse raw bond data text of one frame into chosen format """ - # TODO Leverage symmetry and halve time - # atomids start from 0 (actualid -1) timestep = int(step_text.pop(0)) i, j, v = [], [], [] @@ -260,3 +224,52 @@ def __process_bond_step(self, step_text: str): shape=(self.meta["box"]["n_atoms"], self.meta["box"]["n_atoms"]), ), } + + # Helper methods + + @validate_call + def __decide_chunks( + self, given_chunks: Union[list[int], int], meta_chunks: PositiveInt + ) -> list[int]: + """ + Validate given chunks against metadata + """ + if type(given_chunks) is int: + given_chunks = [given_chunks] + valid_chunks = [i for i in range(meta_chunks)] + if given_chunks is not None: + if set(given_chunks) <= set(valid_chunks): + return given_chunks + else: + raise InvalidChunks( + f"Valid chunks are in range [0,...,{valid_chunks[-1]}]. Was supplied {given_chunks}" + ) + # raise ValidationError() + else: + return valid_chunks + + @validate_call + def __get_data_files( + self, data_path: Union[None, os.PathLike], type: str, exts=None + ): + """ + Get files across simulation chunks + """ + base_path = self.meta["data_path"] if data_path is None else data_path + + filetypes = ( + {"trajectory": "dump", "bonds": "reaxff", "species": "out", "log_out": ""} + if exts is None + else exts + ) + + file_paths = [ + check_path( + os.path.join( + base_path, + f"{chunk}/dat_{type}_{self.meta['sim_id']}_{chunk}.{filetypes[type]}", + ) + ) + for chunk in self.chunks + ] + return file_paths diff --git a/mdx/models/__init__.py b/mdx/models/__init__.py new file mode 100644 index 0000000..8ad9130 --- /dev/null +++ b/mdx/models/__init__.py @@ -0,0 +1,3 @@ +__all__ = [] + +from .core import * diff --git a/mdx/models/core.py b/mdx/models/core.py new file mode 100644 index 0000000..2bd0836 --- /dev/null +++ b/mdx/models/core.py @@ -0,0 +1,13 @@ +from pydantic.functional_validators import AfterValidator +from typing_extensions import Annotated +import os + +# ValidPath + + +def check_path(path: os.PathLike) -> os.PathLike: + assert os.path.exists(path), f"{path} is not a valid path" + return path + + +ValidPath = Annotated[os.PathLike, AfterValidator(check_path)] diff --git a/mdx/format/meta.py b/mdx/models/meta.py similarity index 78% rename from mdx/format/meta.py rename to mdx/models/meta.py index cafb068..de42376 100644 --- a/mdx/format/meta.py +++ b/mdx/models/meta.py @@ -1,10 +1,7 @@ -from pydantic import BaseModel, PositiveFloat, PositiveInt, ValidationError -from typing import Literal, Dict, Any, Optional, Union +from pydantic import BaseModel, PositiveFloat, PositiveInt +from typing import Literal, Dict, Any, Optional from datetime import datetime - -import os -from pydantic.functional_validators import AfterValidator -from typing_extensions import Annotated +from mdx.models.core import ValidPath # Allowed values @@ -23,15 +20,6 @@ ] # fmt: on -# Validators - - -def check_path(path: os.PathLike) -> os.PathLike: - assert os.path.exists(path), f"{path} is not a valid path" - return path - - -ValidPath = Annotated[os.PathLike, AfterValidator(check_path)] # Format for simulation metadata