Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support for HDF5 filters #350

Merged
merged 4 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 66 additions & 26 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SingleHdf5ToZarr:
to disable
storage_options: dict
passed to fsspec if h5f is a str
error: "warn" (default) | "pdb" | "ignore"
error: "warn" (default) | "pdb" | "ignore" | "raise"
vlen_encode: ["embed", "null", "leave", "encode"]
What to do with VLEN string variables or columns of tabular variables
leave: pass through the 16byte garbage IDs unaffected, but requires no codec
Expand Down Expand Up @@ -189,40 +189,84 @@ def _transfer_attrs(
f"TypeError transferring attr, skipping:\n {n}@{h5obj.name} = {v} ({type(v)})"
)

def _decode_filters(self, h5obj: Union[h5py.Dataset, h5py.Group]):
if h5obj.scaleoffset:
raise RuntimeError(
f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk"
)
if h5obj.compression in ("szip", "lzf"):
raise RuntimeError(
f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk"
)
filters = []
if h5obj.shuffle and h5obj.dtype.kind != "O":
# cannot use shuffle if we materialised objects
filters.append(numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize))
for filter_id, properties in h5obj._filters.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am 95% sure that setting a list like

filters=[numcodecs.get_codec(self.decoders[filter_id](properties) for filter_id, properties in h5obj._filters.items()]

will work fine with compression=None (or we can make the last one the compression), if we include gzip/zlib in this function. Of course, would need to check the order assumed by numcodecs matches hdf.

if str(filter_id) == "32001":
blosc_compressors = (
"blosclz",
"lz4",
"lz4hc",
"snappy",
"zlib",
"zstd",
)
(
_1,
_2,
bytes_per_num,
total_bytes,
clevel,
shuffle,
compressor,
) = properties
pars = dict(
blocksize=total_bytes,
clevel=clevel,
shuffle=shuffle,
cname=blosc_compressors[compressor],
)
filters.append(numcodecs.Blosc(**pars))
elif str(filter_id) == "32015":
filters.append(numcodecs.Zstd(level=properties[0]))
elif str(filter_id) == "gzip":
filters.append(numcodecs.Zlib(level=properties))
elif str(filter_id) == "32004":
raise RuntimeError(
f"{h5obj.name} uses lz4 compression - not supported by kerchunk"
)
elif str(filter_id) == "32008":
raise RuntimeError(
f"{h5obj.name} uses bitshuffle compression - not supported by kerchunk"
)
else:
breakpoint()
raise RuntimeError(
f"{h5obj.name} uses filter id {filter_id} with properties {properties},"
f" not supported by kerchunk., supported filters are {self.decoders.keys()}"
)
return filters

def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
"""Produce Zarr metadata for all groups and datasets in the HDF5 file."""
try: # method must not raise exception
kwargs = {}
if isinstance(h5obj, h5py.Dataset):
lggr.debug(f"HDF5 dataset: {h5obj.name}")
lggr.debug(f"HDF5 compression: {h5obj.compression}")
if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT:
# Only do if h5obj.nbytes < self.inline??
kwargs["data"] = h5obj[:]

filters = []
else:
#
# check for unsupported HDF encoding/filters
#
if h5obj.scaleoffset:
raise RuntimeError(
f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk"
)
if h5obj.compression in ("szip", "lzf"):
raise RuntimeError(
f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk"
)
if h5obj.compression == "gzip":
compression = numcodecs.Zlib(level=h5obj.compression_opts)
else:
compression = None
filters = []
filters = self._decode_filters(h5obj)
dt = None
# Get storage info of this HDF5 dataset...
cinfo = self._storage_info(h5obj)

if "data" in kwargs:
fill = None
compression = None
else:
# encodings
if h5obj.dtype.kind in "US":
Expand Down Expand Up @@ -380,12 +424,6 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
fill = None
else:
raise NotImplementedError
# Add filter for shuffle
if h5obj.shuffle and h5obj.dtype.kind != "O":
# cannot use shuffle if we materialised objects
filters.append(
numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize)
)

if h5py.h5ds.is_scale(h5obj.id) and not cinfo:
return
Expand All @@ -401,7 +439,7 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
dtype=dt or h5obj.dtype,
chunks=h5obj.chunks or False,
fill_value=fill,
compression=compression,
compression=None,
filters=filters,
overwrite=True,
**kwargs,
Expand Down Expand Up @@ -448,6 +486,8 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
import pdb

pdb.post_mortem()
elif self.error == "raise":
raise
else:
# "warn" or anything else, the default
import warnings
Expand Down
17 changes: 17 additions & 0 deletions kerchunk/tests/gen_hdf5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import numpy
import h5py
import hdf5plugin

compressors = dict(
zstd=hdf5plugin.Zstd(),
bitshuffle=hdf5plugin.Bitshuffle(nelems=0, cname="lz4"),
lz4=hdf5plugin.LZ4(nbytes=0),
blosc_lz4_bitshuffle=hdf5plugin.Blosc(
cname="blosclz", clevel=9, shuffle=hdf5plugin.Blosc.BITSHUFFLE
),
)

for c in compressors:
f = h5py.File(f"hdf5_compression_{c}.h5", "w")
f.create_dataset("data", data=numpy.arange(100), **compressors[c])
f.close()
Binary file added kerchunk/tests/hdf5_compression_bitshuffle.h5
Binary file not shown.
Binary file not shown.
Binary file added kerchunk/tests/hdf5_compression_lz4.h5
Binary file not shown.
Binary file added kerchunk/tests/hdf5_compression_zstd.h5
Binary file not shown.
16 changes: 16 additions & 0 deletions kerchunk/tests/test_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,22 @@ def test_compact():
assert np.allclose(g.ancillary_data.atlas_sdp_gps_epoch[:], 1.19880002e09)


def test_compress():
import glob

files = glob.glob(osp.join(here, "hdf5_compression_*.h5"))
for f in files:
h = kerchunk.hdf.SingleHdf5ToZarr(f, error="raise")
if "compression_lz4" in f or "compression_bitshuffle" in f:
with pytest.raises(RuntimeError):
h.translate()
continue
out = h.translate()
m = fsspec.get_mapper("reference://", fo=out)
g = zarr.open(m)
assert np.mean(g.data) == 49.5


def test_embed():
fn = osp.join(here, "NEONDSTowerTemperatureData.hdf5")
h = kerchunk.hdf.SingleHdf5ToZarr(fn, vlen_encode="embed")
Expand Down
Loading