Skip to content

Commit

Permalink
Refactor data file path code
Browse files Browse the repository at this point in the history
Remove repetitive code to get appropriate data files
  • Loading branch information
ashenoy463 committed May 2, 2024
1 parent 78ed8ac commit 6652953
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 85 deletions.
1 change: 0 additions & 1 deletion mdx/format/__init__.py

This file was deleted.

151 changes: 82 additions & 69 deletions mdx/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import pandas as pd
import io
import yaml
from .format.meta import FormatMeta
from mdx.models.core import check_path
from mdx.models.meta import FormatMeta
from pydantic import PositiveInt, ValidationError, validate_call
from typing import Union

Expand All @@ -25,31 +26,6 @@ class InvalidChunks(Exception):
pass


# Validators


@validate_call
def decide_chunks(
given_chunks: Union[list[int], int], meta_chunks: PositiveInt
) -> list[int]:
"""
Validate given chunks against metadata
"""
if type(given_chunks) is int:
given_chunks = [given_chunks]
valid_chunks = [i for i in range(meta_chunks)]
if given_chunks is not None:
if set(given_chunks) <= set(valid_chunks):
return given_chunks
else:
raise InvalidChunks(
f"Valid chunks are in range [0,...,{valid_chunks[-1]}]. Was supplied {given_chunks}"
)
# raise ValidationError()
else:
return valid_chunks


class Simulation:
"""
Representation of an entire simulation run, with its metadata.
Expand Down Expand Up @@ -86,7 +62,7 @@ def __init__(

self.eager = eager
self.block = block_size
self.chunks = decide_chunks(chunks, self.meta["partition"]["n_chunks"])
self.chunks = self.__decide_chunks(chunks, self.meta["partition"]["n_chunks"])

self.trajectory = None
self.bonds = None
Expand All @@ -100,24 +76,44 @@ def __init__(

# End user methods

def read_bonds(self, data_path: os.PathLike = None) -> None:
def read_trajectory(
self, data_path: os.PathLike = None, atomic_format: str = "frame"
) -> None:
"""
Read bond files, parse and store in class attribute
Read trajectory files, parse and store in class attribute
Args:
data_path (os.PathLike): alternate base path containing chosen chunks
atomic_format: format to project trajectories into
"""
base_path = self.meta["data_path"] if data_path is None else data_path
file_paths = [
os.path.join(
base_path,
f"{chunk}/dat_bonds_{self.meta['sim_id']}_{chunk}.reaxff",
corpus = (
db.read_text(
self.__get_data_files(data_path, "trajectory"),
linedelimiter="TIMESTEP",
blocksize=self.block,
)
for chunk in self.chunks
]
.remove(lambda x: x == "ITEM: TIMESTEP")
.map(lambda x: x.split("ITEM: "))
.map(lambda x: x[:-1] if (x[-1] == "TIMESTEP") else x)
.map(self.__process_traj_step, atomic_format=atomic_format)
# .distinct(key=lambda x: x["timestep"]) ; causes memory leak on nanosecond scale data
)

self.trajectory = corpus.compute() if self.eager else corpus

def read_bonds(self, data_path: os.PathLike = None) -> None:
"""
Read bond files, parse and store in class attribute
Args:
data_path (os.PathLike): alternate base path containing chosen chunks
"""
corpus = (
db.read_text(file_paths, linedelimiter="# Timestep", blocksize=self.block)
db.read_text(
self.__get_data_files(data_path, "bonds"),
linedelimiter="# Timestep",
blocksize=self.block,
)
.remove(lambda x: x == "# Timestep")
.map(
lambda x: [
Expand All @@ -133,36 +129,6 @@ def read_bonds(self, data_path: os.PathLike = None) -> None:

self.bonds = corpus.compute() if self.eager else corpus

def read_trajectory(
self, data_path: os.PathLike = None, atomic_format: str = "frame"
) -> None:
"""
Read trajectory files, parse and store in class attribute
Args:
data_path (os.PathLike): alternate base path containing chosen chunks
atomic_format: format to project trajectories into
"""
base_path = self.meta["data_path"] if data_path is None else data_path
file_paths = [
os.path.join(
base_path,
f"{chunk}/dat_trajectory_{self.meta['sim_id']}_{chunk}.dump",
)
for chunk in self.chunks
]

corpus = (
db.read_text(file_paths, linedelimiter="TIMESTEP", blocksize=self.block)
.remove(lambda x: x == "ITEM: TIMESTEP")
.map(lambda x: x.split("ITEM: "))
.map(lambda x: x[:-1] if (x[-1] == "TIMESTEP") else x)
.map(self.__process_traj_step, atomic_format=atomic_format)
# .distinct(key=lambda x: x["timestep"]) ; causes memory leak on nanosecond scale data
)

self.trajectory = corpus.compute() if self.eager else corpus

def read_species(self):
pass

Expand Down Expand Up @@ -241,8 +207,6 @@ def __process_bond_step(self, step_text: str):
"""
Parse raw bond data text of one frame into chosen format
"""
# TODO Leverage symmetry and halve time
# atomids start from 0 (actualid -1)
timestep = int(step_text.pop(0))

i, j, v = [], [], []
Expand All @@ -260,3 +224,52 @@ def __process_bond_step(self, step_text: str):
shape=(self.meta["box"]["n_atoms"], self.meta["box"]["n_atoms"]),
),
}

# Helper methods

@validate_call
def __decide_chunks(
self, given_chunks: Union[list[int], int], meta_chunks: PositiveInt
) -> list[int]:
"""
Validate given chunks against metadata
"""
if type(given_chunks) is int:
given_chunks = [given_chunks]
valid_chunks = [i for i in range(meta_chunks)]
if given_chunks is not None:
if set(given_chunks) <= set(valid_chunks):
return given_chunks
else:
raise InvalidChunks(
f"Valid chunks are in range [0,...,{valid_chunks[-1]}]. Was supplied {given_chunks}"
)
# raise ValidationError()
else:
return valid_chunks

@validate_call
def __get_data_files(
self, data_path: Union[None, os.PathLike], type: str, exts=None
):
"""
Get files across simulation chunks
"""
base_path = self.meta["data_path"] if data_path is None else data_path

filetypes = (
{"trajectory": "dump", "bonds": "reaxff", "species": "out", "log_out": ""}
if exts is None
else exts
)

file_paths = [
check_path(
os.path.join(
base_path,
f"{chunk}/dat_{type}_{self.meta['sim_id']}_{chunk}.{filetypes[type]}",
)
)
for chunk in self.chunks
]
return file_paths
3 changes: 3 additions & 0 deletions mdx/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = []

from .core import *
13 changes: 13 additions & 0 deletions mdx/models/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pydantic.functional_validators import AfterValidator
from typing_extensions import Annotated
import os

# ValidPath


def check_path(path: os.PathLike) -> os.PathLike:
assert os.path.exists(path), f"{path} is not a valid path"
return path


ValidPath = Annotated[os.PathLike, AfterValidator(check_path)]
18 changes: 3 additions & 15 deletions mdx/format/meta.py → mdx/models/meta.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from pydantic import BaseModel, PositiveFloat, PositiveInt, ValidationError
from typing import Literal, Dict, Any, Optional, Union
from pydantic import BaseModel, PositiveFloat, PositiveInt
from typing import Literal, Dict, Any, Optional
from datetime import datetime

import os
from pydantic.functional_validators import AfterValidator
from typing_extensions import Annotated
from mdx.models.core import ValidPath

# Allowed values

Expand All @@ -23,15 +20,6 @@
]
# fmt: on

# Validators


def check_path(path: os.PathLike) -> os.PathLike:
assert os.path.exists(path), f"{path} is not a valid path"
return path


ValidPath = Annotated[os.PathLike, AfterValidator(check_path)]

# Format for simulation metadata

Expand Down

0 comments on commit 6652953

Please sign in to comment.