Skip to content

Commit

Permalink
Support Zlib compression and shuffle filter for local storage
Browse files Browse the repository at this point in the history
This change adds support for compressed and filtered data for local
storage. Data in S3 will be addressed separately.

The compression and filters arguments passed to reduce_chunk are
actually numcodecs.abc.Codec instances, so we can use them as a black
box to decode the compression or filter.

Currently we are testing Zlib compression algorithm as well as the HDF5
byte shuffle filter. It's possible that other compression algorithms and
filters will "just work" due to using the numcodecs.abc.Codec interface
to decode the data, but they have not been tested.

Closes: #118
  • Loading branch information
markgoddard committed Jul 6, 2023
1 parent 42e6bce commit 6cec850
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 33 deletions.
27 changes: 19 additions & 8 deletions activestorage/dummy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,21 @@ def make_validrange_ncdata(filename='test_validrange.nc', chunksize=(3, 3, 1), n
return make_ncdata(filename, chunksize, n, compression=None, valid_range=[-1.0, 1.2 * n ** 3])


def make_compressed_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10, compression=None, shuffle=False):
"""
Make a compressed and optionally shuffled vanilla test dataset which is
three dimensional with indices and values that aid in testing data
extraction.
"""
return make_ncdata(filename, chunksize, n, compression=compression, shuffle=shuffle)


def make_vanilla_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10):
"""
Make a vanilla test dataset which is three dimensional with indices and values that
aid in testing data extraction.
"""
r = make_ncdata(filename, chunksize, n, None, False)
return
return make_ncdata(filename, chunksize, n)


def make_ncdata(filename, chunksize, n, compression=None,
Expand All @@ -81,7 +89,8 @@ def make_ncdata(filename, chunksize, n, compression=None,
valid_range=None,
valid_min=None,
valid_max=None,
partially_missing_data=False):
partially_missing_data=False,
shuffle=False):
"""
If compression is required, it can be passed in via keyword
and is applied to all variables.
Expand All @@ -96,6 +105,8 @@ def make_ncdata(filename, chunksize, n, compression=None,
partially_missing_data = True makes half the data missing so we can
ensure we find some chunks which are all missing ... Can
only be used in combination with a missing value.
shuffle: if True, apply the HDF5 shuffle filter before compression.
"""
if partially_missing_data and not missing:
raise ValueError(f'Missing data value keyword provided and set to {missing} '
Expand All @@ -119,15 +130,15 @@ def make_holes(var, indices, attribute, value, dummy):
ydim = ds.createDimension("ydim", n)
zdim = ds.createDimension("zdim", n)

x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression)
y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression)
z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression)
x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)
y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)
z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)

for a,s in zip([x, y, z],[1, n, n * n]):
a[:] = dd * s

dvar = ds.createVariable("data","f8", ("xdim","ydim","zdim"),
chunksizes=chunksize, compression=compression)
chunksizes=chunksize, compression=compression, shuffle=shuffle)
dvar[:] = data

nm1, nm2 = n - 1, n - 2
Expand Down Expand Up @@ -179,7 +190,7 @@ def make_holes(var, indices, attribute, value, dummy):
var = ds.variables['data']
print(f'\nCreated file "{filename}" with a variable called "data" with shape {var.shape} and chunking, compression {var.chunking()},{compression}\n')

return mindices, findices, vrindices, vm1indices, vm2indices
return ds, mindices, findices, vrindices, vm1indices, vm2indices


if __name__=="__main__":
Expand Down
29 changes: 24 additions & 5 deletions activestorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
rfile - the actual file with the data
offset, size - where and what we want ...
compression - optional `numcodecs.abc.Codec` compression codec
filters - optional list of `numcodecs.abc.Codec` filter codecs
dtype - likely float32 in most cases.
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
chunk itself
Expand All @@ -22,15 +24,12 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
"""

if compression is not None:
raise NotImplementedError("Compression is not yet supported!")
if filters is not None:
raise NotImplementedError("Filters are not yet supported!")

#FIXME: for the moment, open the file every time ... we might want to do that, or not
with open(rfile,'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
Expand All @@ -51,6 +50,26 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
else:
return tmp, None


def filter_pipeline(chunk, compression, filters):
"""
Reverse any compression and filters applied to the chunk.
When a chunk is written, the filters are applied in order, then compression
is applied. For reading, we must reverse this pipeline.
:param chunk: possibly filtered and compressed bytes
:param compression: optional `numcodecs.abc.Codec` compression codec
:param filters: optional list of `numcodecs.abc.Codec` filter codecs
:returns: decompressed and defiltered chunk bytes
"""
if compression is not None:
chunk = compression.decode(chunk)
for filter in reversed(filters or []):
chunk = filter.decode(chunk)
return chunk


def remove_missing(data, missing):
"""
As we are using numpy, we can use a masked array, storage implementations
Expand Down
42 changes: 42 additions & 0 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import pytest

from activestorage.active import Active
from activestorage.config import *
from activestorage.dummy_data import make_compressed_ncdata

import utils


def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool):
"""
Make a vanilla test dataset which is compressed and optionally shuffled.
"""
temp_file = str(tmp_path / "test_compression.nc")
test_data = make_compressed_ncdata(filename=temp_file, compression=compression, shuffle=shuffle)

# Sanity check that test data is compressed and filtered as expected.
test_data_filters = test_data[0].variables['data'].filters()
assert test_data_filters[compression]
assert test_data_filters['shuffle'] == shuffle

test_file = utils.write_to_storage(temp_file)
if USE_S3:
os.remove(temp_file)
return test_file


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
@pytest.mark.parametrize('compression', ['zlib'])
@pytest.mark.parametrize('shuffle', [False, True])
def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool):
"""
Test use of datasets with compression and filters applied.
"""
test_file = create_compressed_dataset(tmp_path, compression, shuffle)

active = Active(test_file, 'data', utils.get_storage_type())
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
assert result == 740.0
20 changes: 0 additions & 20 deletions tests/unit/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,6 @@ def test_reduce_chunk():
assert rc[0] == -1
assert rc[1] == 15

# with compression
with pytest.raises(NotImplementedError) as exc:
rc = st.reduce_chunk(rfile, offset, size,
compression="Blosc", filters=None,
missing=[None, 2050, None, None],
dtype="i2", shape=(8, 8),
order="C", chunk_selection=slice(0, 2, 1),
method=np.max)
assert str(exc.value) == "Compression is not yet supported!"

# with filters
with pytest.raises(NotImplementedError) as exc:
rc = st.reduce_chunk(rfile, offset, size,
compression=None, filters="filters",
missing=[None, 2050, None, None],
dtype="i2", shape=(8, 8),
order="C", chunk_selection=slice(0, 2, 1),
method=np.max)
assert str(exc.value) == "Filters are not yet supported!"


def test_reduced_chunk_masked_data():
"""Test method with masked data."""
Expand Down

0 comments on commit 6cec850

Please sign in to comment.