Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Aug 31, 2023
1 parent 5ae58e4 commit 99b0f00
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 55 deletions.
110 changes: 56 additions & 54 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SingleHdf5ToZarr:
to disable
storage_options: dict
passed to fsspec if h5f is a str
error: "warn" (default) | "pdb" | "ignore"
error: "warn" (default) | "pdb" | "ignore" | "raise"
vlen_encode: ["embed", "null", "leave", "encode"]
What to do with VLEN string variables or columns of tabular variables
leave: pass through the 16byte garbage IDs unaffected, but requires no codec
Expand Down Expand Up @@ -189,42 +189,64 @@ def _transfer_attrs(
f"TypeError transferring attr, skipping:\n {n}@{h5obj.name} = {v} ({type(v)})"
)

def _decode_blosc(properties): # 32001
blosc_compressors = ("blosclz", "lz4", "lz4hc", "snappy", "zlib", "zstd")
_1, _2, bytes_per_num, total_bytes, clevel, shuffle, compressor = properties
return dict(
id="blosc",
blocksize=total_bytes,
clevel=clevel,
shuffle=shuffle,
cname=blosc_compressors[compressor],
)

def _decode_zstd(properties): # 32015
return dict(
id="zstd",
level=properties[0],
)

decoders = {
"32001": _decode_blosc,
"32015": _decode_zstd,
}

def _decode_filters(self, h5obj: Union[h5py.Dataset, h5py.Group]):
if len(h5obj._filters.keys()) > 1:
if h5obj.scaleoffset:
raise RuntimeError(
f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk"
)
if h5obj.compression in ("szip", "lzf"):
raise RuntimeError(
f"{h5obj.name} uses multiple filters {list (h5obj._filters.keys())}."
f" This is not supported by kerchunk."
f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk"
)
filters = []
if h5obj.shuffle and h5obj.dtype.kind != "O":
# cannot use shuffle if we materialised objects
filters.append(numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize))
for filter_id, properties in h5obj._filters.items():
if not str(filter_id) in self.decoders.keys():
if str(filter_id) == "32001":
blosc_compressors = (
"blosclz",
"lz4",
"lz4hc",
"snappy",
"zlib",
"zstd",
)
(
_1,
_2,
bytes_per_num,
total_bytes,
clevel,
shuffle,
compressor,
) = properties
pars = dict(
blocksize=total_bytes,
clevel=clevel,
shuffle=shuffle,
cname=blosc_compressors[compressor],
)
filters.append(numcodecs.Blosc(**pars))
elif str(filter_id) == "32015":
filters.append(numcodecs.Zstd(level=properties[0]))
elif str(filter_id) == "gzip":
filters.append(numcodecs.Zlib(level=properties))
elif str(filter_id) == "32004":
raise RuntimeError(
f"{h5obj.name} uses lz4 compression - not supported by kerchunk"
)
elif str(filter_id) == "32008":
raise RuntimeError(
f"{h5obj.name} uses bitshuffle compression - not supported by kerchunk"
)
else:
breakpoint()
raise RuntimeError(
f"{h5obj.name} uses filter id {filter_id} with properties {properties},"
f" not supported by kerchunk., supported filters are {self.decoders.keys()}"
)
else:
return numcodecs.get_codec(self.decoders[filter_id](properties))
return filters

def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
"""Produce Zarr metadata for all groups and datasets in the HDF5 file."""
Expand All @@ -236,31 +258,15 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT:
# Only do if h5obj.nbytes < self.inline??
kwargs["data"] = h5obj[:]

filters = []
else:
#
# check for unsupported HDF encoding/filters
#
if h5obj.scaleoffset:
raise RuntimeError(
f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk"
)
if h5obj.compression in ("szip", "lzf"):
raise RuntimeError(
f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk"
)
if h5obj.compression == "gzip":
compression = numcodecs.Zlib(level=h5obj.compression_opts)
else:
compression = self._decode_filters(h5obj)
filters = []
filters = self._decode_filters(h5obj)
dt = None
# Get storage info of this HDF5 dataset...
cinfo = self._storage_info(h5obj)

if "data" in kwargs:
fill = None
compression = None
else:
# encodings
if h5obj.dtype.kind in "US":
Expand Down Expand Up @@ -418,12 +424,6 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
fill = None
else:
raise NotImplementedError
# Add filter for shuffle
if h5obj.shuffle and h5obj.dtype.kind != "O":
# cannot use shuffle if we materialised objects
filters.append(
numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize)
)

if h5py.h5ds.is_scale(h5obj.id) and not cinfo:
return
Expand All @@ -439,7 +439,7 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
dtype=dt or h5obj.dtype,
chunks=h5obj.chunks or False,
fill_value=fill,
compression=compression,
compression=None,
filters=filters,
overwrite=True,
**kwargs,
Expand Down Expand Up @@ -486,6 +486,8 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
import pdb

pdb.post_mortem()
elif self.error == "raise":
raise
else:
# "warn" or anything else, the default
import warnings
Expand Down
6 changes: 5 additions & 1 deletion kerchunk/tests/test_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ def test_compress():

files = glob.glob(osp.join(here, "hdf5_compression_*.h5"))
for f in files:
h = kerchunk.hdf.SingleHdf5ToZarr(f)
h = kerchunk.hdf.SingleHdf5ToZarr(f, error="raise")
if "compression_lz4" in f or "compression_bitshuffle" in f:
with pytest.raises(RuntimeError):
h.translate()
continue
out = h.translate()
m = fsspec.get_mapper("reference://", fo=out)
g = zarr.open(m)
Expand Down

0 comments on commit 99b0f00

Please sign in to comment.