Skip to content

Commit

Permalink
Support GZip/Zlib compression and Shuffle filter for S3
Browse files Browse the repository at this point in the history
Adds support for netCDF files with Gzip/Zlib compression and/or the byte
shuffle filter when using the S3 storage backend.

The compression and filter integration tests now run when testing
against the S3 storage backend.

Closes #120
  • Loading branch information
markgoddard committed Aug 15, 2023
1 parent 4ed6003 commit 1d4b67e
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 38 deletions.
5 changes: 0 additions & 5 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue
# FIXME: We do not get the correct byte order on the Zarr Array's dtype
# when using S3, so capture it here.
self._dtype = ds_var.dtype
try:
self._filters = ds_var.filters()
# ds from h5netcdf may not have _filters and other such metadata
except AttributeError:
self._filters = None
if isinstance(ds, Dataset):
self._missing = getattr(ds_var, 'missing_value', None)
self._fillvalue = getattr(ds_var, '_FillValue', None)
Expand Down
32 changes: 24 additions & 8 deletions activestorage/reductionist.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import http.client
import json
import requests
import numcodecs
import numpy as np
import sys

Expand All @@ -21,8 +22,8 @@ def reduce_chunk(server, username, password, source, bucket, object, offset,
:param object: S3 object
:param offset: offset of data in object
:param size: size of data in object
:param compression: name of compression, unsupported
:param filters: name of filters, unsupported
:param compression: optional `numcodecs.abc.Codec` compression codec
:param filters: optional list of `numcodecs.abc.Codec` filter codecs
:param missing: optional 4-tuple describing missing data
:param dtype: numpy data type
:param shape: will be a tuple, something like (3,3,1), this is the
Expand All @@ -38,11 +39,6 @@ def reduce_chunk(server, username, password, source, bucket, object, offset,
:raises ReductionistError: if the request to Reductionist fails
"""

if compression is not None:
raise NotImplementedError("Compression is not yet supported!")
if filters is not None:
raise NotImplementedError("Filters are not yet supported!")

request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection)
api_operation = "sum" if operation == "mean" else operation or "select"
url = f'{server}/v1/{api_operation}/'
Expand Down Expand Up @@ -77,6 +73,19 @@ def encode_slice(s):
return [encode_slice(s) for s in selection]


def encode_filter(filter):
"""Encode a filter algorithm in a JSON-compatible format."""
if filter.codec_id == "shuffle":
return {"id": filter.codec_id, "element_size": filter.elementsize}
else:
raise ValueError(f"Unsupported filter {filter})")


def encode_filters(filters):
"""Encode the list of filter algorithms in a JSON-compatible format."""
return [encode_filter(filter) for filter in filters or []]


def encode_dvalue(value):
"""Encode a data value in a JSON-compatible format."""
if isinstance(value, np.float32):
Expand Down Expand Up @@ -108,7 +117,6 @@ def build_request_data(source: str, bucket: str, object: str, offset: int,
size: int, compression, filters, missing, dtype, shape,
order, selection) -> dict:
"""Build request data for Reductionist API."""
# TODO: compression, filters
request_data = {
'source': source,
'bucket': bucket,
Expand All @@ -123,8 +131,16 @@ def build_request_data(source: str, bucket: str, object: str, offset: int,
request_data["shape"] = shape
if selection:
request_data["selection"] = encode_selection(selection)
if compression:
# Currently the numcodecs codec IDs map to the supported compression
# algorithm names in the active storage server. If that changes we may
# need to maintain a mapping here.
request_data["compression"] = {"id": compression.codec_id}
if filters:
request_data["filters"] = encode_filters(filters)
if any(missing):
request_data["missing"] = encode_missing(missing)

return {k: v for k, v in request_data.items() if v is not None}


Expand Down
43 changes: 23 additions & 20 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from netCDF4 import Dataset
from pathlib import Path

from activestorage.active import Active
from activestorage.active import Active, load_from_s3
from activestorage.config import *
from activestorage.dummy_data import make_compressed_ncdata

Expand All @@ -14,11 +14,16 @@

def check_dataset_filters(temp_file: str, ncvar: str, compression: str, shuffle: bool):
# Sanity check that test data is compressed and filtered as expected.
with Dataset(temp_file) as test_data:
test_data_filters = test_data.variables[ncvar].filters()
print(test_data_filters)
assert test_data_filters[compression]
assert test_data_filters['shuffle'] == shuffle
if USE_S3:
with load_from_s3(temp_file) as test_data:
# NOTE: h5netcdf thinks zlib is gzip
assert test_data.variables[ncvar].compression == "gzip"
assert test_data.variables[ncvar].shuffle == shuffle
else:
with Dataset(temp_file) as test_data:
test_data_filters = test_data.variables[ncvar].filters()
assert test_data_filters[compression]
assert test_data_filters['shuffle'] == shuffle


def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool):
Expand All @@ -27,16 +32,14 @@ def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool):
"""
temp_file = str(tmp_path / "test_compression.nc")
test_data = make_compressed_ncdata(filename=temp_file, compression=compression, shuffle=shuffle)

check_dataset_filters(temp_file, "data", compression, shuffle)

test_file = utils.write_to_storage(temp_file)
test_data = utils.write_to_storage(temp_file)
if USE_S3:
os.remove(temp_file)
return test_file

check_dataset_filters(test_data, "data", compression, shuffle)
return test_data


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
@pytest.mark.parametrize('compression', ['zlib'])
@pytest.mark.parametrize('shuffle', [False, True])
def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool):
Expand All @@ -52,20 +55,20 @@ def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool)
assert result == 740.0


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
def test_compression_and_filters_cmip6_data():
"""
Test use of datasets with compression and filters applied for a real
CMIP6 dataset (CMIP6_IPSL-CM6A-LR_tas).
"""
test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6_IPSL-CM6A-LR_tas.nc')

check_dataset_filters(test_file, "tas", "zlib", False)

with Dataset(test_file) as nc_data:
nc_min = np.min(nc_data["tas"][0:2,4:6,7:9])
print(f"Numpy min from compressed file {nc_min}")

test_file = utils.write_to_storage(test_file)

check_dataset_filters(test_file, "tas", "zlib", False)

active = Active(test_file, 'tas', utils.get_storage_type())
active._version = 1
active._method = "min"
Expand All @@ -74,21 +77,21 @@ def test_compression_and_filters_cmip6_data():
assert result == 239.25946044921875


@pytest.mark.skipif(USE_S3, reason="Compression and filtering not supported in S3 yet")
def test_compression_and_filters_obs4mips_data():
"""
Test use of datasets with compression and filters applied for a real
obs4mips dataset (obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc) at CMIP5 MIP standard
but with CMIP6-standard file packaging.
"""
test_file = str(Path(__file__).resolve().parent / 'test_data' / 'obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc')

check_dataset_filters(test_file, "rlut", "zlib", False)

with Dataset(test_file) as nc_data:
nc_min = np.min(nc_data["rlut"][0:2,4:6,7:9])
print(f"Numpy min from compressed file {nc_min}")

test_file = utils.write_to_storage(test_file)

check_dataset_filters(test_file, "rlut", "zlib", False)

active = Active(test_file, 'rlut', utils.get_storage_type())
active._version = 1
active._method = "min"
Expand Down
68 changes: 63 additions & 5 deletions tests/unit/test_reductionist.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import numcodecs
import numpy as np
import pytest
import requests
Expand Down Expand Up @@ -45,8 +46,6 @@ def test_reduce_chunk_defaults(mock_request):
chunk_selection = None
operation = "min"

# no support for compression, filters

tmp, count = reductionist.reduce_chunk(active_url, access_key, secret_key,
s3_url, bucket, object, offset,
size, compression, filters, missing,
Expand All @@ -68,6 +67,67 @@ def test_reduce_chunk_defaults(mock_request):
expected_data)


@pytest.mark.parametrize(
"compression, filters",
[
(
numcodecs.Zlib(),
[numcodecs.Shuffle()],
),
]
)
@mock.patch.object(reductionist, 'request')
def test_reduce_chunk_compression(mock_request, compression, filters):
"""Unit test for reduce_chunk with compression and filter arguments."""
result = np.int32(134351386)
response = make_response(result.tobytes(), 200, "int32", "[]", "2")
mock_request.return_value = response

active_url = "https://s3.example.com"
access_key = "fake-access"
secret_key = "fake-secret"
s3_url = "https://active.example.com"
bucket = "fake-bucket"
object = "fake-object"
offset = 2
size = 128
missing = (None, None, None, None)
dtype = np.dtype("int32")
shape = (32, )
order = "C"
chunk_selection = [slice(0, 2, 1)]
operation = "min"

tmp, count = reductionist.reduce_chunk(active_url, access_key, secret_key,
s3_url, bucket, object, offset,
size, compression, filters, missing,
dtype, shape, order,
chunk_selection, operation)

assert tmp == result
assert count == 2

expected_url = f"{active_url}/v1/{operation}/"
expected_data = {
"source": s3_url,
"bucket": bucket,
"object": object,
"dtype": "int32",
"byte_order": sys.byteorder,
"offset": offset,
"size": size,
"order": order,
"shape": shape,
"selection": [[chunk_selection[0].start,
chunk_selection[0].stop,
chunk_selection[0].step]],
"compression": {"id": compression.codec_id},
"filters": [{"id": filter.codec_id, "element_size": filter.elementsize}
for filter in filters],
}
mock_request.assert_called_once_with(expected_url, access_key, secret_key,
expected_data)


@pytest.mark.parametrize(
"missing",
Expand Down Expand Up @@ -100,7 +160,7 @@ def test_reduce_chunk_defaults(mock_request):
)
@mock.patch.object(reductionist, 'request')
def test_reduce_chunk_missing(mock_request, missing):
"""Unit test for reduce_chunk."""
"""Unit test for reduce_chunk with missing data."""
reduce_arg, api_arg = missing

result = np.float32(-42.)
Expand All @@ -124,8 +184,6 @@ def test_reduce_chunk_missing(mock_request, missing):
chunk_selection = [slice(0, 2, 1)]
operation = "min"

# no compression, filters

tmp, count = reductionist.reduce_chunk(active_url, access_key, secret_key, s3_url,
bucket, object, offset, size,
compression, filters, missing,
Expand Down

0 comments on commit 1d4b67e

Please sign in to comment.