Skip to content

Commit

Permalink
Merge pull request #77 from markgoddard/main
Browse files Browse the repository at this point in the history
Add initial support for S3 active storage
  • Loading branch information
valeriupredoi authored May 9, 2023
2 parents 79f48af + 7ab0332 commit 3332d12
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 7 deletions.
28 changes: 21 additions & 7 deletions activestorage/active.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import numpy as np
import pathlib

#FIXME: Consider using h5py throughout, for more generality
from netCDF4 import Dataset
from zarr.indexing import (
OrthogonalIndexer,
)
from activestorage.config import *
from activestorage.s3 import reduce_chunk as s3_reduce_chunk
from activestorage.storage import reduce_chunk
from activestorage import netcdf_to_zarr as nz

Expand Down Expand Up @@ -333,13 +336,24 @@ def _process_chunk(self, fsref, chunk_coords, chunk_selection, out, counts,
key = f"{self.ncvar}/{coord}"
rfile, offset, size = tuple(fsref[key])

# note there is an ongoing discussion about this interface, and what it returns
# see https://github.com/valeriupredoi/PyActiveStorage/issues/33
# so neither the returned data or the interface should be considered stable
# although we will version changes.
tmp, count = reduce_chunk(rfile, offset, size, compressor, filters, missing,
self.zds._dtype, self.zds._chunks, self.zds._order,
chunk_selection, method=self.method)
if USE_S3:
object = os.path.basename(rfile)
tmp, count = s3_reduce_chunk(S3_ACTIVE_STORAGE_URL, S3_ACCESS_KEY,
S3_SECRET_KEY, S3_URL, S3_BUCKET,
object, offset, size,
compressor, filters, missing,
self.zds._dtype, self.zds._chunks,
self.zds._order, chunk_selection,
operation=self._method)
else:
# note there is an ongoing discussion about this interface, and what it returns
# see https://github.com/valeriupredoi/PyActiveStorage/issues/33
# so neither the returned data or the interface should be considered stable
# although we will version changes.
tmp, count = reduce_chunk(rfile, offset, size, compressor, filters,
missing, self.zds._dtype,
self.zds._chunks, self.zds._order,
chunk_selection, method=self.method)

if self.method is not None:
out.append(tmp)
Expand Down
19 changes: 19 additions & 0 deletions activestorage/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This file contains configuration for PyActiveStorage.

# Whether to use the S3 Active Storage interface.
USE_S3 = False

# URL of S3 Active Storage server.
S3_ACTIVE_STORAGE_URL = "http://localhost:8080"

# URL of S3 object store.
S3_URL = "http://localhost:9000"

# S3 access key / username.
S3_ACCESS_KEY = "minioadmin"

# S3 secret key / password.
S3_SECRET_KEY = "minioadmin"

# S3 bucket.
S3_BUCKET = "pyactivestorage"
127 changes: 127 additions & 0 deletions activestorage/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""S3 active storage module."""

import http.client
import json
import requests
import numpy as np
import sys


def reduce_chunk(server, username, password, source, bucket, object, offset,
size, compression, filters, missing, dtype, shape, order,
chunk_selection, operation):
"""Perform a reduction on a chunk using S3 Active Storage.
:param server: S3 active storage server URL
:param username: S3 username / access key
:param password: S3 password / secret key
:param source: S3 URL
:param bucket: S3 bucket
:param object: S3 object
:param offset: offset of data in object
:param size: size of data in object
:param compression: name of compression, unsupported
:param filters: name of filters, unsupported
:param missing: 4-tuple describing missing data, unsupported
:param dtype: data type name
:param shape: will be a tuple, something like (3,3,1), this is the
dimensionality of the chunk itself
:param order: typically 'C' for c-type ordering
:param chunk_selection: N-tuple where N is the length of `shape`, and each
item is an integer or slice. e.g. (slice(0, 2,
1), slice(1, 3, 1), slice(0, 1, 1))
this defines the part of the chunk which is to be
obtained or operated upon.
:param operation: name of operation to perform
:returns: the reduced data as a numpy array or scalar
:raises S3ActiveStorageError: if the request to S3 active storage fails
"""

if compression is not None:
raise NotImplementedError("Compression is not yet supported!")
if filters is not None:
raise NotImplementedError("Filters are not yet supported!")

request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection)
api_operation = "sum" if operation == "mean" else operation or "select"
url = f'{server}/v1/{api_operation}/'
response = request(url, username, password, request_data)

if response.ok:
# FIXME: Return count from mean
result = decode_result(response)
if operation == "mean":
count = reduce_chunk(server, username, password, source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, "count")[0]
else:
count = None
return result, count
else:
decode_and_raise_error(response)


def encode_selection(selection):
"""Encode a chunk selection in a JSON-compatible format."""
def encode_slice(s):
if isinstance(s, slice):
return [s.start, s.stop, s.step]
else:
# Integer - select single value
return [s, s + 1, 1]

return [encode_slice(s) for s in selection]


def build_request_data(source: str, bucket: str, object: str, offset: int,
size: int, compression, filters, missing, dtype, shape,
order, selection) -> dict:
"""Build request data for S3 active storage API."""
# TODO: compression, filters, missing
request_data = {
'source': source,
'bucket': bucket,
'object': object,
'dtype': dtype.name,
'offset': offset,
'size': size,
'order': order,
}
if shape:
request_data["shape"] = shape
if selection:
request_data["selection"] = encode_selection(selection)
return {k: v for k, v in request_data.items() if v is not None}


def request(url: str, username: str, password: str, request_data: dict):
"""Make a request to an S3 active storage API."""
response = requests.post(
url,
json=request_data,
auth=(username, password)
)
return response


def decode_result(response):
"""Decode a successful response, return as a numpy array or scalar."""
dtype = response.headers['x-activestorage-dtype']
shape = json.loads(response.headers['x-activestorage-shape'])
result = np.frombuffer(response.content, dtype=dtype)
result = result.reshape(shape)
return result


class S3ActiveStorageError(Exception):
"""Exception for S3 Active Storage failures."""

def __init__(self, status_code, error):
super(S3ActiveStorageError, self).__init__(f"S3 Active Storage error: HTTP {status_code}: {error}")


def decode_and_raise_error(response):
"""Decode an error response and raise S3ActiveStorageError."""
try:
error = json.dumps(response.json())
raise S3ActiveStorageError(response.status_code, error)
except requests.exceptions.JSONDecodeError as exc:
raise S3ActiveStorageError(response.status_code, "-") from exc
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- netcdf4
- numpy
- pip !=21.3
- s3fs
# pin Zarr to avoid using old KVStore interface
# see github.com/zarr-developers/zarr-python/issues/1362
- zarr >=2.13.6 # KVStore to FSStore
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'kerchunk',
'netcdf4',
'numpy',
's3fs',
# pin Zarr to use new FSStore instead of KVStore
'zarr>=2.13.3', # github.com/zarr-developers/zarr-python/issues/1362
# for testing
Expand Down
17 changes: 17 additions & 0 deletions tests/test_bigger_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import numpy as np
from netCDF4 import Dataset
from pathlib import Path
import s3fs

from activestorage.active import Active
from activestorage.config import *


@pytest.fixture
Expand Down Expand Up @@ -85,13 +87,28 @@ def create_hyb_pres_file_with_a(dataset, short_name):
'p0: p0 a: a_bnds b: b_bnds ps: ps')


def upload_to_s3(server, username, password, bucket, object, rfile):
"""Upload a file to an S3 object store."""
s3_fs = s3fs.S3FileSystem(key=username, secret=password, client_kwargs={'endpoint_url': server})
# Make sure s3 bucket exists
try:
s3_fs.mkdir(bucket)
except FileExistsError:
pass

s3_fs.put_file(rfile, os.path.join(bucket, object))


def save_cl_file_with_a(tmp_path):
"""Create netcdf file for ``cl`` with ``a`` coordinate."""
save_path = tmp_path / 'common_cl_a.nc'
nc_path = os.path.join(save_path)
dataset = Dataset(nc_path, mode='w')
create_hyb_pres_file_with_a(dataset, 'cl')
dataset.close()
if USE_S3:
object = os.path.basename(nc_path)
upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, object, nc_path)
print(f"Saved {save_path}")
return str(save_path)

Expand Down

0 comments on commit 3332d12

Please sign in to comment.