diff --git a/src/pandablocks/connections.py b/src/pandablocks/connections.py index 2f6440ec..059f4d0e 100644 --- a/src/pandablocks/connections.py +++ b/src/pandablocks/connections.py @@ -223,14 +223,14 @@ def receive_bytes(self, received: bytes) -> bytes: is_multiline = False else: # Check a correctly formatted response - assert line.startswith("!"), ( - "Multiline response %r doesn't start with !" % line - ) + assert line.startswith( + "!" + ), f"Multiline response {line} doesn't start with !" else: # Single line mode - assert not self._lines, ( - "Multiline response %s not terminated" % self._lines - ) + assert ( + not self._lines + ), f"Multiline response {self._lines} not terminated" to_send += self._update_contexts([line]) return to_send diff --git a/src/pandablocks/hdf.py b/src/pandablocks/hdf.py index 1ab31270..8008478f 100644 --- a/src/pandablocks/hdf.py +++ b/src/pandablocks/hdf.py @@ -80,12 +80,16 @@ class HDFWriter(Pipeline): per file created. """ - def __init__(self, file_names: Iterator[str], dataset_names: Dict[str, str]): + def __init__( + self, + file_names: Iterator[str], + capture_record_hdf_names: Dict[str, Dict[str, str]], + ): super().__init__() self.file_names = file_names self.hdf_file: Optional[h5py.File] = None self.datasets: List[h5py.Dataset] = [] - self.dataset_names = dataset_names + self.capture_record_hdf_names = capture_record_hdf_names self.what_to_do = { StartData: self.open_file, list: self.write_frame, @@ -101,8 +105,11 @@ def create_dataset(self, field: FieldCapture, raw: bool): else: # No processor, datatype passed through dtype = field.type + dataset_name = self.capture_record_hdf_names.get(field.name, {}).get( + field.capture, f"{field.name}.{field.capture}" + ) return self.hdf_file.create_dataset( - f"/{self.dataset_names.get(field.name, field.name)}.{field.capture}", + f"/{dataset_name}", dtype=dtype, shape=(0,), maxshape=(None,), @@ -196,7 +203,9 @@ def scale_data(self, data: FrameData) -> List[np.ndarray]: def create_default_pipeline( - file_names: Iterator[str], dataset_names: Dict[str, str], *additional_downstream_pipelines: Pipeline + file_names: Iterator[str], + capture_record_hdf_names: Dict[str, Dict[str, str]], + *additional_downstream_pipelines: Pipeline, ) -> List[Pipeline]: """Create the default processing pipeline consisting of one `FrameProcessor` and one `HDFWriter`. See `create_pipeline` for more details. @@ -204,13 +213,17 @@ def create_default_pipeline( Args: file_names: Iterator of file names. Must be full file paths. Will be called once per file created. As required by `HDFWriter`. - dataset_names: A dictionary of dataset names to use for each field. + capture_record_hdf_names: A dictionary of dataset names to use for each field. + The keys are record names, the values are another dictionary of + capture type to dataset name. additional_downstream_pipelines: Any number of additional pipelines to add downstream. """ return create_pipeline( - FrameProcessor(), HDFWriter(file_names, dataset_names), *additional_downstream_pipelines + FrameProcessor(), + HDFWriter(file_names, capture_record_hdf_names), + *additional_downstream_pipelines, ) @@ -257,6 +270,7 @@ async def write_hdf_files( HDFDataOverrunException: if there is a data overrun. """ counter = 0 + end_data = None pipeline = create_default_pipeline(file_names, {}) try: