Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export hdf5 to zip #100

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/freesas/app/extract_ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

__author__ = "Jérôme Kieffer"
__license__ = "MIT"
__copyright__ = "2020, ESRF"
__date__ = "15/01/2021"
__copyright__ = "2020-2024, ESRF"
__date__ = "05/12/2024"

import io
import os
Expand Down Expand Up @@ -89,9 +89,16 @@ def extract_averaged(filename):
with Nexus(filename, "r") as nxsr:
entry_grp = nxsr.get_entries()[0]
results["h5path"] = entry_grp.name
nxdata_grp = nxsr.h5[entry_grp.attrs["default"]]
default = entry_grp.attrs["default"]
if posixpath.split(default)[-1] == "hplc":
default = posixpath.join(posixpath.split(default)[0],"results")
print(default)
nxdata_grp = nxsr.h5[default]
signal = nxdata_grp.attrs["signal"]
axis = nxdata_grp.attrs["axes"]
if not isinstance(axis, (str,bytes)):
logger.error(f"There are several curves if the dataset {default} from file {filename}, please use the option --all to extract them all")
sys.exit(1)
results["I"] = nxdata_grp[signal][()]
results["q"] = nxdata_grp[axis][()]
results["std"] = nxdata_grp["errors"][()]
Expand All @@ -102,15 +109,12 @@ def extract_averaged(filename):
results["geometry"] = json.loads(
integration_grp["configuration/data"][()]
)
results["polarization"] = integration_grp[
"configuration/polarization_factor"
][()]
results["polarization"] = integration_grp["configuration/polarization_factor"][()]

instrument_grps = nxsr.get_class(entry_grp, class_type="NXinstrument")
if instrument_grps:
detector_grp = nxsr.get_class(
instrument_grps[0], class_type="NXdetector"
)[0]
detector_grp = nxsr.get_class(instrument_grps[0],
class_type="NXdetector")[0]
results["mask"] = detector_grp["pixel_mask"].attrs["filename"]
sample_grp = nxsr.get_class(entry_grp, class_type="NXsample")[0]
results["sample"] = posixpath.split(sample_grp.name)[-1]
Expand All @@ -119,9 +123,7 @@ def extract_averaged(filename):
results["exposure temperature"] = sample_grp["temperature"][()]
results["concentration"] = sample_grp["concentration"][()]
if "2_correlation_mapping" in entry_grp:
results["to_merge"] = entry_grp[
"2_correlation_mapping/results/to_merge"
][()]
results["to_merge"] = entry_grp["2_correlation_mapping/results/to_merge"][()]
return results


Expand Down
1 change: 1 addition & 0 deletions src/freesas/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ py.install_sources([
'sasio.py',
'transformations.py',
'dnn.py',
'nexus_parser.py',
],
pure: false, # Will be installed next to binaries
subdir: 'freesas' # Folder relative to site-packages to install to
Expand Down
182 changes: 182 additions & 0 deletions src/freesas/nexus_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
__author__ = "Jérôme Kieffer"
__license__ = "MIT"
__copyright__ = "2017, ESRF"
__date__ = "20/01/2025"

import sys, os
import zipfile
import posixpath
import logging
from typing import Union
from silx.io.nxdata import NXdata
from dataclasses import dataclass
import numpy
logger = logging.getLogger(__name__)

try:
import h5py
except ImportError:
logger.error("H5py is mandatory to parse HDF5 files")
h5py = None


@dataclass
class IntegratedPattern:
"""Store one pyFAI integrated pattern"""

point: Union[float, int, None]
radial: numpy.ndarray
intensity: numpy.ndarray
intensity_errors: Union[numpy.ndarray, None]=None
radial_name: str = ""
radial_units: str = ""
intensity_name: str = ""
intensity_units: str = ""

def __repr__(self):
line = f"# {self.radial_name}({self.radial_units}) \t {self.intensity_name}({self.intensity_units})"
if self.intensity_errors is not None:
line += " \t uncertainties"
res = [line]
if self.intensity_errors is None:
for q,i,s in zip(self.radial, self.intensity):
res.append(f"{q} \t {i}")
else:
for q,i,s in zip(self.radial, self.intensity, self.intensity_errors):
res.append(f"{q} \t {i} \t {s}")
return os.linesep.join(res)


def read_nexus_integrated_patterns(group):
"""Read integrated patterns from a HDF5 NXdata group.

It reads from both single (1D signal) or multi (2D signal) NXdata.
:param group : h5py.Group
:return: list of IntegratedPattern instances.
"""
nxdata = NXdata(group)
if not nxdata.is_valid:
raise RuntimeError(
f"Cannot parse NXdata group: {group.file.filename}::{group.name}"
)
if not (nxdata.signal_is_1d or nxdata.signal_is_2d):
raise RuntimeError(
f"Signal is not a 1D or 2D dataset: {group.file.filename}::{group.name}"
)

if nxdata.signal_is_1d:
points = [None]
else: # 2d
if nxdata.axes[0] is None:
points = [None] * nxdata.signal.shape[0]
else:
points = nxdata.axes[0][()]

if nxdata.axes[-1] is None:
radial = numpy.arange(nxdata.signal.shape[1])
radial_units = ""
radial_name = ""
else:
axis_dataset = nxdata.axes[-1]
radial = axis_dataset[()]
radial_name = axis_dataset.name.split("/")[-1]
radial_units = axis_dataset.attrs.get("units", "")

intensities = numpy.atleast_2d(nxdata.signal)
intensity_name = nxdata.signal.name.split("/")[-1]
intensity_units = nxdata.signal.attrs.get("units", "")


if nxdata.errors is None:
errors = [None] * intensities.shape[0]
else:
errors = numpy.atleast_2d(nxdata.errors)

if (len(points), len(radial)) != intensities.shape:
raise RuntimeError("Shape mismatch between axes and signal")

return [IntegratedPattern(
point, radial, intensity, intensity_errors, radial_name, radial_units, intensity_name, intensity_units) for point, intensity, intensity_errors in zip(points, intensities, errors)]


class Tree:
def __init__(self, root=None):
self.root = root or {}
self.skip = set()
def visit_item(self, name, obj):
if name in self.skip:
return
node = self.root
path = [i.replace(" ","_") for i in name.split("/")]
for i in path[:-1]:
if i not in node:
node[i] = {}
node = node[i]
if isinstance(obj, h5py.Group):
if obj.attrs.get("NX_class") == "NXdata" and "errors" in obj:
try:
node[path[-1]] = read_nexus_integrated_patterns(obj)
except (KeyError, OSError) as err:
print(f"{type(err).__name__}: {err} while readding {path}")
for key in obj:
self.skip.add(posixpath.join(name,key))
if isinstance(obj[key], h5py.Group):
for sub in obj[key]:
self.skip.add(posixpath.join(name,key, sub))
else:
node[path[-1]] = {}
if isinstance(obj, h5py.Dataset):
if len(obj.shape) <= 1:
node[path[-1]] = obj[()]

def save(self, filename):
with zipfile.ZipFile(filename, "w") as z:
def write(path, name, obj):
new_path = posixpath.join(path, name)
if isinstance(obj, dict):
if sys.version_info>=(3,11): z.mkdir(new_path)
for key, value in obj.items():
write(new_path, key, value)
elif isinstance(obj, numpy.ndarray):
if obj.ndim == 1:
z.writestr(new_path, os.linesep.join(str(i) for i in obj))
else:
z.writestr(new_path, str(obj))
elif isinstance(obj, list):
if sys.version_info>=(3,11): z.mkdir(new_path)
if len(obj)==1:
fname = new_path+"/biosaxs.dat"
z.writestr(fname, str(obj[0]))
else:
for i,j in enumerate(obj):
fname = new_path+f"/bioxaxs_{i:04d}.dat"
z.writestr(fname, str(j))
elif isinstance(obj, (int, float, numpy.number, bool, numpy.bool)):
z.writestr(new_path, str(obj))
elif isinstance(obj, (str, bytes)):
z.writestr(new_path, obj)
else:
print(f"skip {new_path} for {obj} of type {obj.__class__.__mro__}")

root = self.root
for key, value in root.items():
write("", key, value)
def get(self, path):
node = self.root
for i in path.split("/"):
node = node[i]
return node

def convert_nexus2zip(nexusfile, outfile=None):
""" Convert a nexus-file, as produced by BM29 beamline into a zip file

:param nexusfile: string with the path of the input file
:param outfile: name of the output file, unless, just replace the extension with ".zip"
:return: nothing, maybe an error code ?
"""
tree = Tree()
with h5py.File(nexusfile, "r") as h:
h.visititems(tree.visit_item)
outfile = outfile or (os.path.splitext(nexusfile)[0]+".h5")
tree.save(outfile)

Loading