Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mapping parser bug fixed #8

Open
wants to merge 17 commits into
base: mapping_parser
Choose a base branch
from
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ maintainers = [
{ name = "Joseph Rudzinski", email = "[email protected]" },
]
license = { file = "LICENSE" }
dependencies = ["nomad-simulations"]
dependencies = ["nomad-simulations", "nomad-schema-plugin-simulation-workflow"]

[project.urls]
Repository = "https://github.com/FAIRmat-NFDI/nomad-parser-h5md"
Expand Down
4 changes: 2 additions & 2 deletions src/nomad_parser_h5md/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def load(self):
mainfile_contents_dict={'__has_all_keys': ['h5md']},
mainfile_mime_re='(application/x-hdf)',
mainfile_name_re=r'^.*\.(h5|hdf5)$',
parser_class_name='nomad_parser_h5md.parsers.h5md_parser.H5MDParser', # for running mapping parser with nomad parse
# parser_class_name='nomad_parser_h5md.parsers.parser.H5MDParser',
# parser_class_name='nomad_parser_h5md.parsers.h5md_parser.H5MDParser', # for running mapping parser with nomad parse
parser_class_name='nomad_parser_h5md.parsers.parser.H5MDParser',
code_name='H5MD',
code_category='MD code',
metadata={
Expand Down
238 changes: 216 additions & 22 deletions src/nomad_parser_h5md/parsers/h5md_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@
import pint

from nomad.parsing.file_parser.mapping_parser import HDF5Parser, MetainfoParser, Path
from nomad_parser_h5md.schema_packages.schema import Simulation
from nomad_parser_h5md.schema_packages.schema import Simulation, MolecularDynamics
from nomad_parser_h5md.parsers.mdparserutils import MDParser
from nomad.units import ureg

from h5py import Group


class H5MDH5Parser(HDF5Parser):
trajectory_steps: List[int] = []
output_steps: List[int] = []
observables: Dict[str, Any] = {}

def get_value(self, name: str, dct: Dict[str, Any]) -> Any:
value = dct.get(name, {}).get(self.value_key)
value = dct.get(name, {})
if not isinstance(value, dict):
return value
value = value.get(self.value_key)
if value is None:
return
unit = dct.get(name, {}).get(f'{self.attribute_prefix}unit')
if unit:
value = value * ureg(unit)
factor = dct.get(f'{self.attribute_prefix}unit_factor')
factor = dct.get(name, {}).get(f'{self.attribute_prefix}unit_factor')
if factor:
value = value * factor
return value
Expand All @@ -29,19 +36,90 @@ def get_source(self, parent: Dict[str, Any], path: str):
return source
return self.get_source(source, path_segments[1])

def map_value(self, source: Dict[str, Any], **kwargs) -> Any:
if kwargs.get('key') is None:
return None

return self.get_value(kwargs.get('key'), source)

def get_system_hierarchy(
self,
particlesgroup: {},
) -> List[Dict[str, Any]]:
print('in system hierarchy')
data = []
for key, dct in particlesgroup.items():
data.append(dct)
print(key)
print(h5md_sec_particlesgroup[key]['label'])
path_particlesgroup_key = f'{path_particlesgroup}.{key}'

particles_group = {
group_key: h5md_sec_particlesgroup.get(
f'{path_particlesgroup_key}.{group_key}'
)
for group_key in h5md_sec_particlesgroup[key].keys()
}

particles_group = {
group_key: self.data.get(f'{path_particlesgroup_key}.{group_key}')
for group_key in h5md_sec_particlesgroup[key].keys()
}
print(particles_group)
data['branch_label'] = particles_group.pop('label', None)
data['atom_indices'] = particles_group.pop('indices', None)
# TODO remove the deprecated below from the test file
data['type'] = particles_group.pop('type', None) # ? deprecate?
data['is_molecule'] = particles_group.pop(
'is_molecule', None
) # ? deprecate?
particles_group.pop('formula', None) # covered in normalization now
# write all the standard quantities to the archive
particles_subgroup = particles_group.pop('particles_group', None)

# set the remaining attributes
data['custom_system_attributes'] = []
for particles_group_key in particles_group.keys():
val = particles_group.get(particles_group_key)
units = val.units if hasattr(val, 'units') else None
val = val.magnitude if units is not None else val
data['custom_system_attributes'].append(
{'name': particles_group_key, 'value': val, 'unit': units}
)

# get the next branch level
if particles_subgroup:
self.get_system_hierarchy(
particles_subgroup,
f'{path_particlesgroup_key}.particles_group',
)

return []

def get_system_steps(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
steps = source.get('step')
steps = self.get_value('step', source)
times = self.get_value('time', source)
return [
dict(step=step, time=times[n])
system_steps = [
{'step': step, 'time': times[n]}
for n, step in enumerate(steps)
if step in self.trajectory_steps
]

# get system hierarchy and store in first step
# h5md_sec_particlesgroup = self.data.get('connectivity', {}).get(
# 'particles_group'
# )
# hierarchy = self.get_system_hierarchy(
# h5md_sec_particlesgroup=h5md_sec_particlesgroup,
# path_particlesgroup='connectivity.particles_group',
# )
# system_steps[system_steps.keys()[0]]['model_system'] = hierarchy
return system_steps

def get_step_data(self, data: Dict[str, Any], step: int) -> Dict[str, Any]:
step_data = {}
value = self.get_value('value', data)
steps = data.get('step')
steps = self.get_value('step', data)
if value is None or steps is None:
return step_data
index = steps.index(step)
Expand Down Expand Up @@ -80,20 +158,46 @@ def to_species_labels(self, source: List[str]) -> List[Dict[str, Any]]:
return [{'label': s} for s in source]

def get_output_steps(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
def get_observable(dct: Dict[str, Any]) -> List[Dict[str, Any]]:
for key, val in dct.items():
if key == 'step':
times = self.get_value('time', dct)
return [
dict(step=step, time=times[n]) for n, step in enumerate(val)
]
if isinstance(val, dict):
return get_observable(val)
output_steps = {}

return []
def get_steps(dct: Dict[str, Any]) -> Dict[str, Any]:
steps = self.get_value('step', dct)
if steps is None:
return {}
times = self.get_value('time', dct)
if len(steps) != len(times):
self.logger.error(
'Inconsistent step-time combinations in observable data.'
)

return {step: times[n] for n, step in enumerate(steps)}

steps = get_observable(source)
return steps
def get_observable_steps(
source: Dict[str, Any], output_steps: Dict[str, Any]
) -> None:
for __, val in source.items():
observable_type = val.get('@type')
if not observable_type:
get_observable_steps(val, output_steps)
elif observable_type == 'configurational':
steps = get_steps(val)
if not all(
[
time == output_steps[step]
for step, time in steps.items()
if step in output_steps.keys()
]
):
self.logger.error(
'Inconsistent step-time combinations in observable data.'
)
output_steps.update(steps)

get_observable_steps(source, output_steps)
output_steps = [
{'step': step, 'time': time} for step, time in output_steps.items()
]
return output_steps

def get_contributions(
self, source: Dict[str, Any], **kwargs
Expand All @@ -117,16 +221,100 @@ def get_output_data(self, source: Dict[str, Any], **kwargs) -> pint.Quantity:
return source['value']
if source.get('step') is None or kwargs.get('path') is None:
return
observable_type = kwargs.get('observable_type')
if observable_type is None or observable_type not in [
'configurational',
'ensemble_average',
'correlation_function',
]:
self.logger.warning(
'Invalid or no obervable type defined in the schema annotation '
f'for {source.keys()},skipping this observable.'
)
return
print(source.keys())

source_data = self.get_source(self.data, kwargs['path'])
if source_data.get('@type') != observable_type:
return
return self.get_step_data(source_data, source['step']).get('value')

def get_custom_outputs(
self, source: Dict[str, Any], **kwargs
) -> List[Dict[str, Any]]:
if kwargs.get('path') is None or source.get('step') is None:
return []

source_data = self.get_source(self.data, kwargs['path'])
include = kwargs.get('include')
exclude = kwargs.get('exclude')
observable_type = kwargs.get('observable_type')
custom_outputs = []
for key, val in source_data.items():
if include and key not in include or exclude and key in exclude:
continue
if observable_type is not None:
source_type = val.get('@type')
if source_type != observable_type:
continue
step_data = self.get_step_data(val, source['step'])
if step_data.get('value') is not None:
if isinstance(step_data['value'], pint.Quantity):
step_data['unit'] = str(step_data['value'].units)
step_data['value'] = step_data['value'].magnitude
custom_outputs.append({'name': key, **step_data})
return custom_outputs

# def get_parameters(self, parameter_group: Group, path: str) -> Dict:
# param_dict: Dict[Any, Any] = {}
# for key, val in parameter_group.items():
# path_key = f'{path}.{key}'
# if isinstance(val, Group):
# param_dict[key] = self.get_parameters(val, path_key)
# else:
# param_dict[key] = self._data_parser.get(path_key)
# if isinstance(param_dict[key], str):
# param_dict[key] = (
# param_dict[key].upper()
# if key == 'thermodynamic_ensemble'
# else param_dict[key].lower()
# )
# elif isinstance(param_dict[key], (int, np.int32, np.int64)):
# param_dict[key] = param_dict[key].item()
# return param_dict

def get_md_parameters(
self, source: Dict[str, Any], **kwargs
) -> List[Dict[str, Any]]:
print('in get md parameters')
if kwargs.get('path') is None:
return []

return []
# source_data = self.get_source(self.data, kwargs['path'])

# self._parameter_info = {'force_calculations': {}, 'workflow': {}}

# force_calculations_group = self._data_parser.get(
# 'parameters.force_calculations'
# )
# if force_calculations_group is not None:
# self._parameter_info['force_calculations'] = self.get_parameters(
# force_calculations_group, 'parameters.force_calculations'
# )
# workflow_group = self._data_parser.get('parameters.workflow')
# if workflow_group is not None:
# self._parameter_info['workflow'] = self.get_parameters(
# workflow_group, 'parameters.workflow'
# )


class H5MDParser(MDParser):
def __init__(self) -> None:
super().__init__()
self.h5_parser = H5MDH5Parser()
self.simulation_parser = MetainfoParser()
self.workflow_parser = MetainfoParser()

def write_to_archive(self) -> None:
# create h5 parser
Expand All @@ -137,16 +325,22 @@ def write_to_archive(self) -> None:
)
self.h5_parser.trajectory_steps = self.trajectory_steps

# create metainfo parser
# TODO consider using a single parser for the whole archive
# create metainfo parsers
self.simulation_parser.annotation_key = 'hdf5'
data = Simulation()
self.simulation_parser.data_object = data
simulation_data = Simulation()
self.simulation_parser.data_object = simulation_data
self.workflow_parser.annotation_key = 'hdf5'
workflow_data = MolecularDynamics()
self.workflow_parser.data_object = workflow_data

# map from h5 source to metainfo target
self.h5_parser.convert(self.simulation_parser)
self.h5_parser.convert(self.workflow_parser)

# assign simulation to archive data
self.archive.data = self.simulation_parser.data_object
self.archive.workflow2 = self.workflow_parser.data_object

self.h5_parser.close()

Expand Down
Loading