diff --git a/src/pandablocks/connections.py b/src/pandablocks/connections.py index 2f6440ec..059f4d0e 100644 --- a/src/pandablocks/connections.py +++ b/src/pandablocks/connections.py @@ -223,14 +223,14 @@ def receive_bytes(self, received: bytes) -> bytes: is_multiline = False else: # Check a correctly formatted response - assert line.startswith("!"), ( - "Multiline response %r doesn't start with !" % line - ) + assert line.startswith( + "!" + ), f"Multiline response {line} doesn't start with !" else: # Single line mode - assert not self._lines, ( - "Multiline response %s not terminated" % self._lines - ) + assert ( + not self._lines + ), f"Multiline response {self._lines} not terminated" to_send += self._update_contexts([line]) return to_send diff --git a/src/pandablocks/hdf.py b/src/pandablocks/hdf.py index 00d47b73..ca7abc7f 100644 --- a/src/pandablocks/hdf.py +++ b/src/pandablocks/hdf.py @@ -78,13 +78,30 @@ class HDFWriter(Pipeline): Args: file_names: Iterator of file names. Must be full file paths. Will be called once per file created. + capture_record_hdf_names: A dictionary of alternate dataset names to use for + each field. For example + + .. code-block:: python + + { + "COUNTER1.OUT": { + "Value": "name", + "Min": "name-min", + "Max": "name-max" + } + } """ - def __init__(self, file_names: Iterator[str]): + def __init__( + self, + file_names: Iterator[str], + capture_record_hdf_names: Dict[str, Dict[str, str]], + ): super().__init__() self.file_names = file_names self.hdf_file: Optional[h5py.File] = None self.datasets: List[h5py.Dataset] = [] + self.capture_record_hdf_names = capture_record_hdf_names self.what_to_do = { StartData: self.open_file, list: self.write_frame, @@ -100,8 +117,12 @@ def create_dataset(self, field: FieldCapture, raw: bool): else: # No processor, datatype passed through dtype = field.type + dataset_name = self.capture_record_hdf_names.get(field.name, {}).get( + field.capture, f"{field.name}.{field.capture}" + ) + return self.hdf_file.create_dataset( - f"/{field.name}.{field.capture}", + f"/{dataset_name}", dtype=dtype, shape=(0,), maxshape=(None,), @@ -195,7 +216,9 @@ def scale_data(self, data: FrameData) -> List[np.ndarray]: def create_default_pipeline( - file_names: Iterator[str], *additional_downstream_pipelines: Pipeline + file_names: Iterator[str], + capture_record_hdf_names: Dict[str, Dict[str, str]], + *additional_downstream_pipelines: Pipeline, ) -> List[Pipeline]: """Create the default processing pipeline consisting of one `FrameProcessor` and one `HDFWriter`. See `create_pipeline` for more details. @@ -203,12 +226,17 @@ def create_default_pipeline( Args: file_names: Iterator of file names. Must be full file paths. Will be called once per file created. As required by `HDFWriter`. + capture_record_hdf_names: A dictionary of dataset names to use for each field. + The keys are record names, the values are another dictionary of + capture type to dataset name. additional_downstream_pipelines: Any number of additional pipelines to add downstream. """ return create_pipeline( - FrameProcessor(), HDFWriter(file_names), *additional_downstream_pipelines + FrameProcessor(), + HDFWriter(file_names, capture_record_hdf_names), + *additional_downstream_pipelines, ) @@ -255,8 +283,9 @@ async def write_hdf_files( HDFDataOverrunException: if there is a data overrun. """ counter = 0 + end_data = None - pipeline = create_default_pipeline(file_names) + pipeline = create_default_pipeline(file_names, {}) try: async for data in client.data(scaled=False, flush_period=flush_period): pipeline[0].queue.put_nowait(data) diff --git a/tests/test_hdf.py b/tests/test_hdf.py index 362e8da4..290491ac 100644 --- a/tests/test_hdf.py +++ b/tests/test_hdf.py @@ -2,8 +2,9 @@ from pathlib import Path import numpy as np +import pytest -from pandablocks.hdf import Pipeline, create_default_pipeline, stop_pipeline +from pandablocks.hdf import HDFWriter, Pipeline, create_default_pipeline, stop_pipeline from pandablocks.responses import EndData, EndReason, FieldCapture, FrameData, StartData @@ -21,7 +22,7 @@ def __init__(self): try: pipeline = create_default_pipeline( - iter([Path(tmp_path / "1.h5")]), file_counter + iter([Path(tmp_path / "1.h5")]), {}, file_counter ) pipeline[0].queue.put_nowait( @@ -58,3 +59,113 @@ def __init__(self): assert num_written_queue.get() == NUMBER_OF_FRAMES_WRITTEN finally: stop_pipeline(pipeline) + + +@pytest.mark.parametrize( + "capture_record_hdf_names,expected_names", + [ + ( + {}, + { + "/COUNTER1.OUT.Value", + "/COUNTER2.OUT.Mean", + "/COUNTER2.OUT.Max", + "/COUNTER2.OUT.Min", + }, + ), + ( + {"COUNTER1.OUT": {"Value": "scientific-name"}}, + { + "/scientific-name", + "/COUNTER2.OUT.Mean", + "/COUNTER2.OUT.Max", + "/COUNTER2.OUT.Min", + }, + ), + ( + { + "COUNTER2.OUT": { + "Mean": "scientific-name", + "Max": "scientific-name-max", + "Min": "scientific-name-min", + } + }, + { + "/COUNTER1.OUT.Value", + "/scientific-name", + "/scientific-name-max", + "/scientific-name-min", + }, + ), + ( + { + "COUNTER1.OUT": {"Value": "scientific-name1"}, + "COUNTER2.OUT": { + "Mean": "scientific-name2", + "Max": "scientific-name2-max", + "Min": "scientific-name2-min", + }, + }, + { + "/scientific-name1", + "/scientific-name2", + "/scientific-name2-max", + "/scientific-name2-min", + }, + ), + ], +) +def test_hdf_writer_uses_alternative_dataset_names( + tmp_path, capture_record_hdf_names, expected_names +): + hdf_writer = HDFWriter( + iter([str(tmp_path / "test_file.h5")]), capture_record_hdf_names + ) + + start_data = StartData( + fields=[ + FieldCapture( + name="COUNTER1.OUT", + type=np.dtype("float64"), + capture="Value", + scale=1, + offset=0, + units="", + ), + FieldCapture( + name="COUNTER2.OUT", + type=np.dtype("float64"), + capture="Min", + scale=1, + offset=0, + units="", + ), + FieldCapture( + name="COUNTER2.OUT", + type=np.dtype("float64"), + capture="Max", + scale=1, + offset=0, + units="", + ), + FieldCapture( + name="COUNTER2.OUT", + type=np.dtype("float64"), + capture="Mean", + scale=1, + offset=0, + units="", + ), + ], + missed=0, + process="Scaled", + format="Framed", + sample_bytes=52, + arm_time="2024-03-05T20:27:12.607841574Z", + start_time="2024-03-05T20:27:12.608875498Z", + hw_time_offset_ns=100555, + ) + + hdf_writer.open_file(start_data) + + assert {dataset.name for dataset in hdf_writer.datasets} == expected_names