Skip to content

Commit

Permalink
WIP: taking a dictionary for dataset names
Browse files Browse the repository at this point in the history
  • Loading branch information
evalott100 committed Jun 6, 2024
1 parent 34f9b20 commit ea83a6a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
12 changes: 6 additions & 6 deletions src/pandablocks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ def receive_bytes(self, received: bytes) -> bytes:
is_multiline = False
else:
# Check a correctly formatted response
assert line.startswith("!"), (
"Multiline response %r doesn't start with !" % line
)
assert line.startswith(
"!"
), f"Multiline response {line} doesn't start with !"
else:
# Single line mode
assert not self._lines, (
"Multiline response %s not terminated" % self._lines
)
assert (
not self._lines
), f"Multiline response {self._lines} not terminated"
to_send += self._update_contexts([line])
return to_send

Expand Down
26 changes: 20 additions & 6 deletions src/pandablocks/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,16 @@ class HDFWriter(Pipeline):
per file created.
"""

def __init__(self, file_names: Iterator[str], dataset_names: Dict[str, str]):
def __init__(
self,
file_names: Iterator[str],
capture_record_hdf_names: Dict[str, Dict[str, str]],
):
super().__init__()
self.file_names = file_names
self.hdf_file: Optional[h5py.File] = None
self.datasets: List[h5py.Dataset] = []
self.dataset_names = dataset_names
self.capture_record_hdf_names = capture_record_hdf_names
self.what_to_do = {
StartData: self.open_file,
list: self.write_frame,
Expand All @@ -101,8 +105,11 @@ def create_dataset(self, field: FieldCapture, raw: bool):
else:
# No processor, datatype passed through
dtype = field.type
dataset_name = self.capture_record_hdf_names.get(field.name, {}).get(
field.capture, f"{field.name}.{field.capture}"
)
return self.hdf_file.create_dataset(
f"/{self.dataset_names.get(field.name, field.name)}.{field.capture}",
f"/{dataset_name}",
dtype=dtype,
shape=(0,),
maxshape=(None,),
Expand Down Expand Up @@ -196,21 +203,27 @@ def scale_data(self, data: FrameData) -> List[np.ndarray]:


def create_default_pipeline(
file_names: Iterator[str], dataset_names: Dict[str, str], *additional_downstream_pipelines: Pipeline
file_names: Iterator[str],
capture_record_hdf_names: Dict[str, Dict[str, str]],
*additional_downstream_pipelines: Pipeline,
) -> List[Pipeline]:
"""Create the default processing pipeline consisting of one `FrameProcessor` and
one `HDFWriter`. See `create_pipeline` for more details.
Args:
file_names: Iterator of file names. Must be full file paths. Will be called once
per file created. As required by `HDFWriter`.
dataset_names: A dictionary of dataset names to use for each field.
capture_record_hdf_names: A dictionary of dataset names to use for each field.
The keys are record names, the values are another dictionary of
capture type to dataset name.
additional_downstream_pipelines: Any number of additional pipelines to add
downstream.
"""

return create_pipeline(
FrameProcessor(), HDFWriter(file_names, dataset_names), *additional_downstream_pipelines
FrameProcessor(),
HDFWriter(file_names, capture_record_hdf_names),
*additional_downstream_pipelines,
)


Expand Down Expand Up @@ -257,6 +270,7 @@ async def write_hdf_files(
HDFDataOverrunException: if there is a data overrun.
"""
counter = 0

end_data = None
pipeline = create_default_pipeline(file_names, {})
try:
Expand Down

0 comments on commit ea83a6a

Please sign in to comment.