Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Zlib compression and shuffle filter for local storage #119

Merged
merged 20 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ tests/test_data/cesm2_native_TREFHT.json
tests/test_data/daily_data_tas.json
tests/test_data/daily_data_ta.json
tests/test_data/daily_data_masked_ta.json
tests/test_data/CMIP6_IPSL-CM6A-LR_tas_tas.json
tests/test_data/obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut_rlut.json

# test coverage
.coverage
Expand Down
27 changes: 21 additions & 6 deletions activestorage/dummy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ def make_validrange_ncdata(filename='test_validrange.nc', chunksize=(3, 3, 1), n
return make_ncdata(filename, chunksize, n, compression=None, valid_range=valid_range)


def make_compressed_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10, compression=None, shuffle=False):
"""
Make a compressed and optionally shuffled vanilla test dataset which is
three dimensional with indices and values that aid in testing data
extraction.
"""
return make_ncdata(filename, chunksize, n, compression=compression, shuffle=shuffle)


def make_vanilla_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10):
"""
Make a vanilla test dataset which is three dimensional with indices and values that
Expand All @@ -81,7 +90,8 @@ def make_ncdata(filename, chunksize, n, compression=None,
valid_range=None,
valid_min=None,
valid_max=None,
partially_missing_data=False):
partially_missing_data=False,
shuffle=False):
"""
If compression is required, it can be passed in via keyword
and is applied to all variables.
Expand All @@ -96,6 +106,8 @@ def make_ncdata(filename, chunksize, n, compression=None,
partially_missing_data = True makes half the data missing so we can
ensure we find some chunks which are all missing ... Can
only be used in combination with a missing value.
shuffle: if True, apply the HDF5 shuffle filter before compression.
"""
if partially_missing_data and not missing:
raise ValueError(f'Missing data value keyword provided and set to {missing} '
Expand All @@ -109,16 +121,19 @@ def make_ncdata(filename, chunksize, n, compression=None,
ydim = ds.createDimension("ydim", n)
zdim = ds.createDimension("zdim", n)

x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression)
y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression)
z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression)
x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)
y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)
z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)

for a,s in zip([x, y, z],[1, n, n * n]):
a[:] = dd * s

dvar = ds.createVariable("data","f8", ("xdim","ydim","zdim"),
chunksizes=chunksize, compression=compression,
chunksizes=chunksize,
compression=compression,
shuffle=shuffle,
fill_value=fillvalue)

dvar[:] = data

nm1, nm2 = n - 1, n - 2
Expand Down Expand Up @@ -179,7 +194,7 @@ def make_ncdata(filename, chunksize, n, compression=None,
# all important close at the end!!
ds.close()

return mindices, findices, vrindices, vm1indices, vm2indices
return mindices, findices, vrindices, vm1indices, vm2indices


if __name__=="__main__":
Expand Down
29 changes: 24 additions & 5 deletions activestorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
rfile - the actual file with the data
offset, size - where and what we want ...
compression - optional `numcodecs.abc.Codec` compression codec
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to pass an actual codec instance, I'd rather have a complete separation of concerns and pass an identifier for a particular codec, just as is done in the NetCDF API ... after all this is our API layer???

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a fair point. I was quite surprised to see an actual codec instance being passed through, although it did make the implementation trivial.

Would it make sense to align the S3 active storage API parameters and the storage driver parameters used here?

Currently I'm using a lower case string to describe the (de)compression algorithm. I need to check that that no parameters are required for other decompression using other algorithms. Filters may need parameters. Shuffle needs to know the data type size, although we do already have that info elsewhere.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to know how other communities are handling this. How does Zarr doit? We should find out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of the Zarr Python API, it looks like numcodecs is the interface: https://zarr.readthedocs.io/en/stable/api/codecs.html

I believe it was originally part of zarr, then got extracted into a separate library.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that's not so helpful I guess as they have the entire block in memory client-side so they can put it all in the stack. Our problem of course is that the client and server are not on the same stack, so we have to go through an API interface. Where we put that is of course a moot point.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that the main API to get right is that of the active storage server. It's JSON based, so can't accept a Python object, but in my current implementation I'm just grabbing the codec_id from the numcodecs Codec object, which happens to match my API.

If you want this reduce_chunk interface to form a driver API for this library then it would also be important to keep it simple and stable, although currently all "drivers" are in-tree in this repo.

Is this something we should iterate on from here, with an issue in the backlog?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow up with a bit more info on Zarr, the compression algorithm, filters, missing data, etc. end up in the JSON file:

"b/.zarray": "{\"chunks\":[32],\"compressor\":{\"id\":\"zlib\",\"level\":1},\"dtype\":\"<f8\",\"fill_value\":9.969209968386869e+36,\"filters\":null,\"order\":\"C\",\"shape\":[32],\"zarr_format\":2}",

Each compression algorithm has an ID and a dict of parameters than are used to instantiate one of the Codec objects, with a registry interface that looks like it would accept the compressor dict: https://github.com/zarr-developers/numcodecs/blob/350cbd1eb6751cb32e66aa5a8bc4002077083a83/numcodecs/registry.py

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filters - optional list of `numcodecs.abc.Codec` filter codecs
dtype - likely float32 in most cases.
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
chunk itself
Expand All @@ -22,15 +24,12 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
"""

if compression is not None:
raise NotImplementedError("Compression is not yet supported!")
if filters is not None:
raise NotImplementedError("Filters are not yet supported!")

#FIXME: for the moment, open the file every time ... we might want to do that, or not
with open(rfile,'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
Expand All @@ -51,6 +50,26 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
else:
return tmp, None


def filter_pipeline(chunk, compression, filters):
"""
Reverse any compression and filters applied to the chunk.
When a chunk is written, the filters are applied in order, then compression
is applied. For reading, we must reverse this pipeline.
:param chunk: possibly filtered and compressed bytes
:param compression: optional `numcodecs.abc.Codec` compression codec
:param filters: optional list of `numcodecs.abc.Codec` filter codecs
:returns: decompressed and defiltered chunk bytes
"""
if compression is not None:
chunk = compression.decode(chunk)
for filter in reversed(filters or []):
chunk = filter.decode(chunk)
return chunk


def remove_missing(data, missing):
"""
As we are using numpy, we can use a masked array, storage implementations
Expand Down
98 changes: 98 additions & 0 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import os
import numpy as np
import pytest

from netCDF4 import Dataset
from pathlib import Path

from activestorage.active import Active
from activestorage.config import *
from activestorage.dummy_data import make_compressed_ncdata

import utils


def check_dataset_filters(temp_file: str, ncvar: str, compression: str, shuffle: bool):
# Sanity check that test data is compressed and filtered as expected.
with Dataset(temp_file) as test_data:
test_data_filters = test_data.variables[ncvar].filters()
print(test_data_filters)
assert test_data_filters[compression]
assert test_data_filters['shuffle'] == shuffle


def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool):
"""
Make a vanilla test dataset which is compressed and optionally shuffled.
"""
temp_file = str(tmp_path / "test_compression.nc")
test_data = make_compressed_ncdata(filename=temp_file, compression=compression, shuffle=shuffle)

check_dataset_filters(temp_file, "data", compression, shuffle)

test_file = utils.write_to_storage(temp_file)
if USE_S3:
os.remove(temp_file)
return test_file


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
@pytest.mark.parametrize('compression', ['zlib'])
@pytest.mark.parametrize('shuffle', [False, True])
def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool):
"""
Test use of datasets with compression and filters applied.
"""
test_file = create_compressed_dataset(tmp_path, compression, shuffle)

active = Active(test_file, 'data', utils.get_storage_type())
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
assert result == 740.0


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
def test_compression_and_filters_cmip6_data():
"""
Test use of datasets with compression and filters applied for a real
CMIP6 dataset (CMIP6_IPSL-CM6A-LR_tas).
"""
test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6_IPSL-CM6A-LR_tas.nc')

check_dataset_filters(test_file, "tas", "zlib", False)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check suggests this data is not compressed.

Output from filters:

{'zlib': False, 'szip': False, 'zstd': False, 'bzip2': False, 'blosc': False, 'shuffle': False, 'complevel': 0, 'fletcher32': False}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for obs4mips

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agh! what a palaver - iris saves cubes with no compression by default - good spotting it, man! Let me plug in zlib at save point 👍


with Dataset(test_file) as nc_data:
nc_min = np.min(nc_data["tas"][0:2,4:6,7:9])
print(f"Numpy min from compressed file {nc_min}")

active = Active(test_file, 'tas', utils.get_storage_type())
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
assert nc_min == result
assert result == 239.25946044921875


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
def test_compression_and_filters_obs4mips_data():
"""
Test use of datasets with compression and filters applied for a real
obs4mips dataset (obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc) at CMIP5 MIP standard
but with CMIP6-standard file packaging.
"""
test_file = str(Path(__file__).resolve().parent / 'test_data' / 'obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc')

check_dataset_filters(test_file, "rlut", "zlib", False)

with Dataset(test_file) as nc_data:
nc_min = np.min(nc_data["rlut"][0:2,4:6,7:9])
print(f"Numpy min from compressed file {nc_min}")

active = Active(test_file, 'rlut', utils.get_storage_type())
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
print(nc_min)
assert nc_min == result
assert result == 124.0
Binary file added tests/test_data/CMIP6_IPSL-CM6A-LR_tas.nc
Binary file not shown.
Binary file not shown.
20 changes: 0 additions & 20 deletions tests/unit/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,6 @@ def test_reduce_chunk():
assert rc[0] == -1
assert rc[1] == 15

# with compression
with pytest.raises(NotImplementedError) as exc:
rc = st.reduce_chunk(rfile, offset, size,
compression="Blosc", filters=None,
missing=[None, 2050, None, None],
dtype="i2", shape=(8, 8),
order="C", chunk_selection=slice(0, 2, 1),
method=np.max)
assert str(exc.value) == "Compression is not yet supported!"

# with filters
with pytest.raises(NotImplementedError) as exc:
rc = st.reduce_chunk(rfile, offset, size,
compression=None, filters="filters",
missing=[None, 2050, None, None],
dtype="i2", shape=(8, 8),
order="C", chunk_selection=slice(0, 2, 1),
method=np.max)
assert str(exc.value) == "Filters are not yet supported!"


def test_reduced_chunk_masked_data():
"""Test method with masked data."""
Expand Down