Skip to content

Commit

Permalink
Merge pull request #119 from valeriupredoi/compression
Browse files Browse the repository at this point in the history
Support Zlib compression and shuffle filter for local storage
  • Loading branch information
valeriupredoi authored Jul 20, 2023
2 parents bce41b5 + 61c7516 commit a5a8920
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 31 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ tests/test_data/cesm2_native_TREFHT.json
tests/test_data/daily_data_tas.json
tests/test_data/daily_data_ta.json
tests/test_data/daily_data_masked_ta.json
tests/test_data/CMIP6_IPSL-CM6A-LR_tas_tas.json
tests/test_data/obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut_rlut.json

# test coverage
.coverage
Expand Down
27 changes: 21 additions & 6 deletions activestorage/dummy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ def make_validrange_ncdata(filename='test_validrange.nc', chunksize=(3, 3, 1), n
return make_ncdata(filename, chunksize, n, compression=None, valid_range=valid_range)


def make_compressed_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10, compression=None, shuffle=False):
"""
Make a compressed and optionally shuffled vanilla test dataset which is
three dimensional with indices and values that aid in testing data
extraction.
"""
return make_ncdata(filename, chunksize, n, compression=compression, shuffle=shuffle)


def make_vanilla_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10):
"""
Make a vanilla test dataset which is three dimensional with indices and values that
Expand All @@ -81,7 +90,8 @@ def make_ncdata(filename, chunksize, n, compression=None,
valid_range=None,
valid_min=None,
valid_max=None,
partially_missing_data=False):
partially_missing_data=False,
shuffle=False):
"""
If compression is required, it can be passed in via keyword
and is applied to all variables.
Expand All @@ -96,6 +106,8 @@ def make_ncdata(filename, chunksize, n, compression=None,
partially_missing_data = True makes half the data missing so we can
ensure we find some chunks which are all missing ... Can
only be used in combination with a missing value.
shuffle: if True, apply the HDF5 shuffle filter before compression.
"""
if partially_missing_data and not missing:
raise ValueError(f'Missing data value keyword provided and set to {missing} '
Expand All @@ -109,16 +121,19 @@ def make_ncdata(filename, chunksize, n, compression=None,
ydim = ds.createDimension("ydim", n)
zdim = ds.createDimension("zdim", n)

x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression)
y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression)
z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression)
x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)
y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)
z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle)

for a,s in zip([x, y, z],[1, n, n * n]):
a[:] = dd * s

dvar = ds.createVariable("data","f8", ("xdim","ydim","zdim"),
chunksizes=chunksize, compression=compression,
chunksizes=chunksize,
compression=compression,
shuffle=shuffle,
fill_value=fillvalue)

dvar[:] = data

nm1, nm2 = n - 1, n - 2
Expand Down Expand Up @@ -179,7 +194,7 @@ def make_ncdata(filename, chunksize, n, compression=None,
# all important close at the end!!
ds.close()

return mindices, findices, vrindices, vm1indices, vm2indices
return mindices, findices, vrindices, vm1indices, vm2indices


if __name__=="__main__":
Expand Down
29 changes: 24 additions & 5 deletions activestorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
rfile - the actual file with the data
offset, size - where and what we want ...
compression - optional `numcodecs.abc.Codec` compression codec
filters - optional list of `numcodecs.abc.Codec` filter codecs
dtype - likely float32 in most cases.
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
chunk itself
Expand All @@ -22,15 +24,12 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
"""

if compression is not None:
raise NotImplementedError("Compression is not yet supported!")
if filters is not None:
raise NotImplementedError("Filters are not yet supported!")

#FIXME: for the moment, open the file every time ... we might want to do that, or not
with open(rfile,'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
Expand All @@ -51,6 +50,26 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap
else:
return tmp, None


def filter_pipeline(chunk, compression, filters):
"""
Reverse any compression and filters applied to the chunk.
When a chunk is written, the filters are applied in order, then compression
is applied. For reading, we must reverse this pipeline.
:param chunk: possibly filtered and compressed bytes
:param compression: optional `numcodecs.abc.Codec` compression codec
:param filters: optional list of `numcodecs.abc.Codec` filter codecs
:returns: decompressed and defiltered chunk bytes
"""
if compression is not None:
chunk = compression.decode(chunk)
for filter in reversed(filters or []):
chunk = filter.decode(chunk)
return chunk


def remove_missing(data, missing):
"""
As we are using numpy, we can use a masked array, storage implementations
Expand Down
98 changes: 98 additions & 0 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import os
import numpy as np
import pytest

from netCDF4 import Dataset
from pathlib import Path

from activestorage.active import Active
from activestorage.config import *
from activestorage.dummy_data import make_compressed_ncdata

import utils


def check_dataset_filters(temp_file: str, ncvar: str, compression: str, shuffle: bool):
# Sanity check that test data is compressed and filtered as expected.
with Dataset(temp_file) as test_data:
test_data_filters = test_data.variables[ncvar].filters()
print(test_data_filters)
assert test_data_filters[compression]
assert test_data_filters['shuffle'] == shuffle


def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool):
"""
Make a vanilla test dataset which is compressed and optionally shuffled.
"""
temp_file = str(tmp_path / "test_compression.nc")
test_data = make_compressed_ncdata(filename=temp_file, compression=compression, shuffle=shuffle)

check_dataset_filters(temp_file, "data", compression, shuffle)

test_file = utils.write_to_storage(temp_file)
if USE_S3:
os.remove(temp_file)
return test_file


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
@pytest.mark.parametrize('compression', ['zlib'])
@pytest.mark.parametrize('shuffle', [False, True])
def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool):
"""
Test use of datasets with compression and filters applied.
"""
test_file = create_compressed_dataset(tmp_path, compression, shuffle)

active = Active(test_file, 'data', utils.get_storage_type())
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
assert result == 740.0


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
def test_compression_and_filters_cmip6_data():
"""
Test use of datasets with compression and filters applied for a real
CMIP6 dataset (CMIP6_IPSL-CM6A-LR_tas).
"""
test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6_IPSL-CM6A-LR_tas.nc')

check_dataset_filters(test_file, "tas", "zlib", False)

with Dataset(test_file) as nc_data:
nc_min = np.min(nc_data["tas"][0:2,4:6,7:9])
print(f"Numpy min from compressed file {nc_min}")

active = Active(test_file, 'tas', utils.get_storage_type())
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
assert nc_min == result
assert result == 239.25946044921875


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
def test_compression_and_filters_obs4mips_data():
"""
Test use of datasets with compression and filters applied for a real
obs4mips dataset (obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc) at CMIP5 MIP standard
but with CMIP6-standard file packaging.
"""
test_file = str(Path(__file__).resolve().parent / 'test_data' / 'obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc')

check_dataset_filters(test_file, "rlut", "zlib", False)

with Dataset(test_file) as nc_data:
nc_min = np.min(nc_data["rlut"][0:2,4:6,7:9])
print(f"Numpy min from compressed file {nc_min}")

active = Active(test_file, 'rlut', utils.get_storage_type())
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
print(nc_min)
assert nc_min == result
assert result == 124.0
Binary file added tests/test_data/CMIP6_IPSL-CM6A-LR_tas.nc
Binary file not shown.
Binary file not shown.
20 changes: 0 additions & 20 deletions tests/unit/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,6 @@ def test_reduce_chunk():
assert rc[0] == -1
assert rc[1] == 15

# with compression
with pytest.raises(NotImplementedError) as exc:
rc = st.reduce_chunk(rfile, offset, size,
compression="Blosc", filters=None,
missing=[None, 2050, None, None],
dtype="i2", shape=(8, 8),
order="C", chunk_selection=slice(0, 2, 1),
method=np.max)
assert str(exc.value) == "Compression is not yet supported!"

# with filters
with pytest.raises(NotImplementedError) as exc:
rc = st.reduce_chunk(rfile, offset, size,
compression=None, filters="filters",
missing=[None, 2050, None, None],
dtype="i2", shape=(8, 8),
order="C", chunk_selection=slice(0, 2, 1),
method=np.max)
assert str(exc.value) == "Filters are not yet supported!"


def test_reduced_chunk_masked_data():
"""Test method with masked data."""
Expand Down

0 comments on commit a5a8920

Please sign in to comment.