diff --git a/.gitignore b/.gitignore index ce17d0db..703cf466 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,8 @@ tests/test_data/cesm2_native_TREFHT.json tests/test_data/daily_data_tas.json tests/test_data/daily_data_ta.json tests/test_data/daily_data_masked_ta.json +tests/test_data/CMIP6_IPSL-CM6A-LR_tas_tas.json +tests/test_data/obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut_rlut.json # test coverage .coverage diff --git a/activestorage/dummy_data.py b/activestorage/dummy_data.py index 03e6d598..edfb8282 100644 --- a/activestorage/dummy_data.py +++ b/activestorage/dummy_data.py @@ -67,6 +67,15 @@ def make_validrange_ncdata(filename='test_validrange.nc', chunksize=(3, 3, 1), n return make_ncdata(filename, chunksize, n, compression=None, valid_range=valid_range) +def make_compressed_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10, compression=None, shuffle=False): + """ + Make a compressed and optionally shuffled vanilla test dataset which is + three dimensional with indices and values that aid in testing data + extraction. + """ + return make_ncdata(filename, chunksize, n, compression=compression, shuffle=shuffle) + + def make_vanilla_ncdata(filename='test_vanilla.nc', chunksize=(3, 3, 1), n=10): """ Make a vanilla test dataset which is three dimensional with indices and values that @@ -81,7 +90,8 @@ def make_ncdata(filename, chunksize, n, compression=None, valid_range=None, valid_min=None, valid_max=None, - partially_missing_data=False): + partially_missing_data=False, + shuffle=False): """ If compression is required, it can be passed in via keyword and is applied to all variables. @@ -96,6 +106,8 @@ def make_ncdata(filename, chunksize, n, compression=None, partially_missing_data = True makes half the data missing so we can ensure we find some chunks which are all missing ... Can only be used in combination with a missing value. + + shuffle: if True, apply the HDF5 shuffle filter before compression. """ if partially_missing_data and not missing: raise ValueError(f'Missing data value keyword provided and set to {missing} ' @@ -109,16 +121,19 @@ def make_ncdata(filename, chunksize, n, compression=None, ydim = ds.createDimension("ydim", n) zdim = ds.createDimension("zdim", n) - x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression) - y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression) - z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression) + x = ds.createVariable("x","i4",("xdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle) + y = ds.createVariable("y","i4",("ydim",), fill_value=fillvalue, compression=compression, shuffle=shuffle) + z = ds.createVariable("z","i4",("zdim",), fill_value=fillvalue, compression=compression, shuffle=shuffle) for a,s in zip([x, y, z],[1, n, n * n]): a[:] = dd * s dvar = ds.createVariable("data","f8", ("xdim","ydim","zdim"), - chunksizes=chunksize, compression=compression, + chunksizes=chunksize, + compression=compression, + shuffle=shuffle, fill_value=fillvalue) + dvar[:] = data nm1, nm2 = n - 1, n - 2 @@ -179,7 +194,7 @@ def make_ncdata(filename, chunksize, n, compression=None, # all important close at the end!! ds.close() - return mindices, findices, vrindices, vm1indices, vm2indices + return mindices, findices, vrindices, vm1indices, vm2indices if __name__=="__main__": diff --git a/activestorage/storage.py b/activestorage/storage.py index c66f5367..28d91f35 100644 --- a/activestorage/storage.py +++ b/activestorage/storage.py @@ -8,6 +8,8 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap rfile - the actual file with the data offset, size - where and what we want ... + compression - optional `numcodecs.abc.Codec` compression codec + filters - optional list of `numcodecs.abc.Codec` filter codecs dtype - likely float32 in most cases. shape - will be a tuple, something like (3,3,1), this is the dimensionality of the chunk itself @@ -22,15 +24,12 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap """ - if compression is not None: - raise NotImplementedError("Compression is not yet supported!") - if filters is not None: - raise NotImplementedError("Filters are not yet supported!") - #FIXME: for the moment, open the file every time ... we might want to do that, or not with open(rfile,'rb') as open_file: # get the data chunk = read_block(open_file, offset, size) + # reverse any compression and filters + chunk = filter_pipeline(chunk, compression, filters) # make it a numpy array of bytes chunk = ensure_ndarray(chunk) # convert to the appropriate data type @@ -51,6 +50,26 @@ def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shap else: return tmp, None + +def filter_pipeline(chunk, compression, filters): + """ + Reverse any compression and filters applied to the chunk. + + When a chunk is written, the filters are applied in order, then compression + is applied. For reading, we must reverse this pipeline. + + :param chunk: possibly filtered and compressed bytes + :param compression: optional `numcodecs.abc.Codec` compression codec + :param filters: optional list of `numcodecs.abc.Codec` filter codecs + :returns: decompressed and defiltered chunk bytes + """ + if compression is not None: + chunk = compression.decode(chunk) + for filter in reversed(filters or []): + chunk = filter.decode(chunk) + return chunk + + def remove_missing(data, missing): """ As we are using numpy, we can use a masked array, storage implementations diff --git a/tests/test_compression.py b/tests/test_compression.py new file mode 100644 index 00000000..e32ab70e --- /dev/null +++ b/tests/test_compression.py @@ -0,0 +1,98 @@ +import os +import numpy as np +import pytest + +from netCDF4 import Dataset +from pathlib import Path + +from activestorage.active import Active +from activestorage.config import * +from activestorage.dummy_data import make_compressed_ncdata + +import utils + + +def check_dataset_filters(temp_file: str, ncvar: str, compression: str, shuffle: bool): + # Sanity check that test data is compressed and filtered as expected. + with Dataset(temp_file) as test_data: + test_data_filters = test_data.variables[ncvar].filters() + print(test_data_filters) + assert test_data_filters[compression] + assert test_data_filters['shuffle'] == shuffle + + +def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool): + """ + Make a vanilla test dataset which is compressed and optionally shuffled. + """ + temp_file = str(tmp_path / "test_compression.nc") + test_data = make_compressed_ncdata(filename=temp_file, compression=compression, shuffle=shuffle) + + check_dataset_filters(temp_file, "data", compression, shuffle) + + test_file = utils.write_to_storage(temp_file) + if USE_S3: + os.remove(temp_file) + return test_file + + +@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet") +@pytest.mark.parametrize('compression', ['zlib']) +@pytest.mark.parametrize('shuffle', [False, True]) +def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool): + """ + Test use of datasets with compression and filters applied. + """ + test_file = create_compressed_dataset(tmp_path, compression, shuffle) + + active = Active(test_file, 'data', utils.get_storage_type()) + active._version = 1 + active._method = "min" + result = active[0:2,4:6,7:9] + assert result == 740.0 + + +@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet") +def test_compression_and_filters_cmip6_data(): + """ + Test use of datasets with compression and filters applied for a real + CMIP6 dataset (CMIP6_IPSL-CM6A-LR_tas). + """ + test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6_IPSL-CM6A-LR_tas.nc') + + check_dataset_filters(test_file, "tas", "zlib", False) + + with Dataset(test_file) as nc_data: + nc_min = np.min(nc_data["tas"][0:2,4:6,7:9]) + print(f"Numpy min from compressed file {nc_min}") + + active = Active(test_file, 'tas', utils.get_storage_type()) + active._version = 1 + active._method = "min" + result = active[0:2,4:6,7:9] + assert nc_min == result + assert result == 239.25946044921875 + + +@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet") +def test_compression_and_filters_obs4mips_data(): + """ + Test use of datasets with compression and filters applied for a real + obs4mips dataset (obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc) at CMIP5 MIP standard + but with CMIP6-standard file packaging. + """ + test_file = str(Path(__file__).resolve().parent / 'test_data' / 'obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc') + + check_dataset_filters(test_file, "rlut", "zlib", False) + + with Dataset(test_file) as nc_data: + nc_min = np.min(nc_data["rlut"][0:2,4:6,7:9]) + print(f"Numpy min from compressed file {nc_min}") + + active = Active(test_file, 'rlut', utils.get_storage_type()) + active._version = 1 + active._method = "min" + result = active[0:2,4:6,7:9] + print(nc_min) + assert nc_min == result + assert result == 124.0 diff --git a/tests/test_data/CMIP6_IPSL-CM6A-LR_tas.nc b/tests/test_data/CMIP6_IPSL-CM6A-LR_tas.nc new file mode 100644 index 00000000..fcbb8e4f Binary files /dev/null and b/tests/test_data/CMIP6_IPSL-CM6A-LR_tas.nc differ diff --git a/tests/test_data/obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc b/tests/test_data/obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc new file mode 100644 index 00000000..c2c935a0 Binary files /dev/null and b/tests/test_data/obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc differ diff --git a/tests/unit/test_storage.py b/tests/unit/test_storage.py index 168e8bbc..22e6dfbb 100644 --- a/tests/unit/test_storage.py +++ b/tests/unit/test_storage.py @@ -21,26 +21,6 @@ def test_reduce_chunk(): assert rc[0] == -1 assert rc[1] == 15 - # with compression - with pytest.raises(NotImplementedError) as exc: - rc = st.reduce_chunk(rfile, offset, size, - compression="Blosc", filters=None, - missing=[None, 2050, None, None], - dtype="i2", shape=(8, 8), - order="C", chunk_selection=slice(0, 2, 1), - method=np.max) - assert str(exc.value) == "Compression is not yet supported!" - - # with filters - with pytest.raises(NotImplementedError) as exc: - rc = st.reduce_chunk(rfile, offset, size, - compression=None, filters="filters", - missing=[None, 2050, None, None], - dtype="i2", shape=(8, 8), - order="C", chunk_selection=slice(0, 2, 1), - method=np.max) - assert str(exc.value) == "Filters are not yet supported!" - def test_reduced_chunk_masked_data(): """Test method with masked data."""