Skip to content

Commit

Permalink
allowed a dictionary of alternate dataset names to be given by the wr…
Browse files Browse the repository at this point in the history
…iter (#91)

Allowed a dictionary of alterate dataset names to be given by the writer
  • Loading branch information
evalott100 authored Jun 14, 2024
1 parent c2c8c0b commit 8a6bfc2
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 13 deletions.
12 changes: 6 additions & 6 deletions src/pandablocks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ def receive_bytes(self, received: bytes) -> bytes:
is_multiline = False
else:
# Check a correctly formatted response
assert line.startswith("!"), (
"Multiline response %r doesn't start with !" % line
)
assert line.startswith(
"!"
), f"Multiline response {line} doesn't start with !"
else:
# Single line mode
assert not self._lines, (
"Multiline response %s not terminated" % self._lines
)
assert (
not self._lines
), f"Multiline response {self._lines} not terminated"
to_send += self._update_contexts([line])
return to_send

Expand Down
39 changes: 34 additions & 5 deletions src/pandablocks/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,30 @@ class HDFWriter(Pipeline):
Args:
file_names: Iterator of file names. Must be full file paths. Will be called once
per file created.
capture_record_hdf_names: A dictionary of alternate dataset names to use for
each field. For example
.. code-block:: python
{
"COUNTER1.OUT": {
"Value": "name",
"Min": "name-min",
"Max": "name-max"
}
}
"""

def __init__(self, file_names: Iterator[str]):
def __init__(
self,
file_names: Iterator[str],
capture_record_hdf_names: Dict[str, Dict[str, str]],
):
super().__init__()
self.file_names = file_names
self.hdf_file: Optional[h5py.File] = None
self.datasets: List[h5py.Dataset] = []
self.capture_record_hdf_names = capture_record_hdf_names
self.what_to_do = {
StartData: self.open_file,
list: self.write_frame,
Expand All @@ -100,8 +117,12 @@ def create_dataset(self, field: FieldCapture, raw: bool):
else:
# No processor, datatype passed through
dtype = field.type
dataset_name = self.capture_record_hdf_names.get(field.name, {}).get(
field.capture, f"{field.name}.{field.capture}"
)

return self.hdf_file.create_dataset(
f"/{field.name}.{field.capture}",
f"/{dataset_name}",
dtype=dtype,
shape=(0,),
maxshape=(None,),
Expand Down Expand Up @@ -195,20 +216,27 @@ def scale_data(self, data: FrameData) -> List[np.ndarray]:


def create_default_pipeline(
file_names: Iterator[str], *additional_downstream_pipelines: Pipeline
file_names: Iterator[str],
capture_record_hdf_names: Dict[str, Dict[str, str]],
*additional_downstream_pipelines: Pipeline,
) -> List[Pipeline]:
"""Create the default processing pipeline consisting of one `FrameProcessor` and
one `HDFWriter`. See `create_pipeline` for more details.
Args:
file_names: Iterator of file names. Must be full file paths. Will be called once
per file created. As required by `HDFWriter`.
capture_record_hdf_names: A dictionary of dataset names to use for each field.
The keys are record names, the values are another dictionary of
capture type to dataset name.
additional_downstream_pipelines: Any number of additional pipelines to add
downstream.
"""

return create_pipeline(
FrameProcessor(), HDFWriter(file_names), *additional_downstream_pipelines
FrameProcessor(),
HDFWriter(file_names, capture_record_hdf_names),
*additional_downstream_pipelines,
)


Expand Down Expand Up @@ -255,8 +283,9 @@ async def write_hdf_files(
HDFDataOverrunException: if there is a data overrun.
"""
counter = 0

end_data = None
pipeline = create_default_pipeline(file_names)
pipeline = create_default_pipeline(file_names, {})
try:
async for data in client.data(scaled=False, flush_period=flush_period):
pipeline[0].queue.put_nowait(data)
Expand Down
115 changes: 113 additions & 2 deletions tests/test_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from pathlib import Path

import numpy as np
import pytest

from pandablocks.hdf import Pipeline, create_default_pipeline, stop_pipeline
from pandablocks.hdf import HDFWriter, Pipeline, create_default_pipeline, stop_pipeline
from pandablocks.responses import EndData, EndReason, FieldCapture, FrameData, StartData


Expand All @@ -21,7 +22,7 @@ def __init__(self):

try:
pipeline = create_default_pipeline(
iter([Path(tmp_path / "1.h5")]), file_counter
iter([Path(tmp_path / "1.h5")]), {}, file_counter
)

pipeline[0].queue.put_nowait(
Expand Down Expand Up @@ -58,3 +59,113 @@ def __init__(self):
assert num_written_queue.get() == NUMBER_OF_FRAMES_WRITTEN
finally:
stop_pipeline(pipeline)


@pytest.mark.parametrize(
"capture_record_hdf_names,expected_names",
[
(
{},
{
"/COUNTER1.OUT.Value",
"/COUNTER2.OUT.Mean",
"/COUNTER2.OUT.Max",
"/COUNTER2.OUT.Min",
},
),
(
{"COUNTER1.OUT": {"Value": "scientific-name"}},
{
"/scientific-name",
"/COUNTER2.OUT.Mean",
"/COUNTER2.OUT.Max",
"/COUNTER2.OUT.Min",
},
),
(
{
"COUNTER2.OUT": {
"Mean": "scientific-name",
"Max": "scientific-name-max",
"Min": "scientific-name-min",
}
},
{
"/COUNTER1.OUT.Value",
"/scientific-name",
"/scientific-name-max",
"/scientific-name-min",
},
),
(
{
"COUNTER1.OUT": {"Value": "scientific-name1"},
"COUNTER2.OUT": {
"Mean": "scientific-name2",
"Max": "scientific-name2-max",
"Min": "scientific-name2-min",
},
},
{
"/scientific-name1",
"/scientific-name2",
"/scientific-name2-max",
"/scientific-name2-min",
},
),
],
)
def test_hdf_writer_uses_alternative_dataset_names(
tmp_path, capture_record_hdf_names, expected_names
):
hdf_writer = HDFWriter(
iter([str(tmp_path / "test_file.h5")]), capture_record_hdf_names
)

start_data = StartData(
fields=[
FieldCapture(
name="COUNTER1.OUT",
type=np.dtype("float64"),
capture="Value",
scale=1,
offset=0,
units="",
),
FieldCapture(
name="COUNTER2.OUT",
type=np.dtype("float64"),
capture="Min",
scale=1,
offset=0,
units="",
),
FieldCapture(
name="COUNTER2.OUT",
type=np.dtype("float64"),
capture="Max",
scale=1,
offset=0,
units="",
),
FieldCapture(
name="COUNTER2.OUT",
type=np.dtype("float64"),
capture="Mean",
scale=1,
offset=0,
units="",
),
],
missed=0,
process="Scaled",
format="Framed",
sample_bytes=52,
arm_time="2024-03-05T20:27:12.607841574Z",
start_time="2024-03-05T20:27:12.608875498Z",
hw_time_offset_ns=100555,
)

hdf_writer.open_file(start_data)

assert {dataset.name for dataset in hdf_writer.datasets} == expected_names

0 comments on commit 8a6bfc2

Please sign in to comment.