Skip to content

Commit

Permalink
Merge pull request #232 from NCAS-CMS/add_mock_s3_test
Browse files Browse the repository at this point in the history
Add mock s3 test
  • Loading branch information
valeriupredoi authored Jan 14, 2025
2 parents 582137b + ea54fda commit 0e43748
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 0 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
# see github.com/zarr-developers/zarr-python/issues/1362
- zarr >=2.13.6 # KVStore to FSStore
# Python packages for testing
- moto # mock S3 tests
- pytest
- pytest-cov >=2.10.1
- pytest-html !=2.1.0
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# pin Zarr to use new FSStore instead of KVStore
'zarr>=2.13.3', # github.com/zarr-developers/zarr-python/issues/1362
# for testing
'moto', # mock S3 tests
'pytest',
'pytest-cov>=2.10.1',
'pytest-html!=2.1.0',
Expand Down
108 changes: 108 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import s3fs
import pathlib
import json
import moto
import pytest

from moto.moto_server.threaded_moto_server import ThreadedMotoServer


# some spoofy server parameters
# test parameters; don't modify these
port = 5555
endpoint_uri = "http://127.0.0.1:%s/" % port
test_bucket_name = "test"
versioned_bucket_name = "test-versioned"
secure_bucket_name = "test-secure"

def get_boto3_client():
from botocore.session import Session

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=endpoint_uri)


@pytest.fixture(scope="module")
def s3_base():
# writable local S3 system

# This fixture is module-scoped, meaning that we can re-use the MotoServer across all tests
#####
# lifted from https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py
#####
server = ThreadedMotoServer(ip_address="127.0.0.1", port=port)
server.start()
# the user ID and secret key are needed when accessing a public bucket
# since our S3 FS and bucket are not actually on an AWS system, they can have
# bogus values
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
os.environ.pop("AWS_PROFILE", None)

print("server up")
yield
print("moto done")
server.stop()


@pytest.fixture()
def s3fs_s3(s3_base):
"""
Create a fully functional "virtual" S3 FileSystem compatible with fsspec/s3fs.
Method inspired by https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py
The S3 FS, being AWS-like but not actually physically deployed anywhere, still needs
all the usual user IDs, secret keys, endpoint URLs etc; the setup makes use of the ACL=public
configuration (public-read, or public-read-write). Public DOES NOT mean anon=True, but rather,
All Users group – https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html
Access permission to this group allows anyone with AWS credentials to access the resource.
The requests need be signed (authenticated) or not.
Also, keys are encrypted using AWS-KMS
https://docs.aws.amazon.com/kms/latest/developerguide/overview.html
"""
client = get_boto3_client()

# see not above about ACL=public-read
client.create_bucket(Bucket=test_bucket_name, ACL="public-read")

client.create_bucket(Bucket=versioned_bucket_name, ACL="public-read")
client.put_bucket_versioning(
Bucket=versioned_bucket_name, VersioningConfiguration={"Status": "Enabled"}
)

# initialize secure bucket
client.create_bucket(Bucket=secure_bucket_name, ACL="public-read")
policy = json.dumps(
{
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "DenyUnEncryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::{bucket_name}/*".format(
bucket_name=secure_bucket_name
),
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
},
}
],
}
)

client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy)
s3fs.S3FileSystem.clear_instance_cache()
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})
s3.invalidate_cache()

yield s3
149 changes: 149 additions & 0 deletions tests/unit/test_mock_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
import s3fs
import pathlib
import pytest
import h5netcdf

from tempfile import NamedTemporaryFile
from activestorage.active import load_from_s3


# needed by the spoofed s3 filesystem
port = 5555
endpoint_uri = "http://127.0.0.1:%s/" % port


def test_s3fs_s3(s3fs_s3):
"""Test mock S3 filesystem constructor."""
# this is an entire mock S3 FS
mock_s3_filesystem = s3fs_s3

# explore its attributes and methods
print(dir(mock_s3_filesystem))

assert not mock_s3_filesystem.anon
assert not mock_s3_filesystem.version_aware
assert mock_s3_filesystem.client_kwargs == {'endpoint_url': 'http://127.0.0.1:5555/'}


def spoof_boto3_s3(bucket, file_name, file_path):
# this is a pure boto3 implementation
# I am leaving it here just in case we'll ever need it in the future
# NOTE: we are NOT including boto3 as dependency yet, until we ever need it

# "put" file
if os.path.exists(file_path):
with open(file_path, "rb") as file_contents:
conn = boto3.session.Session()
s3 = conn.resource('s3')
object = s3.Object(bucket, file_name)
result = object.put(Body=file_contents)
res = result.get('ResponseMetadata')
if res.get('HTTPStatusCode') == 200:
print('File Uploaded Successfully')
else:
print('File Not Uploaded Successfully')

# "download" file
s3 = boto3.resource('s3')
# arg0: file in bucket; arg1: file to download to
target_file = "test.nc"
s3file = s3.Bucket(bucket).download_file(file_name, target_file)
print(os.path.isfile(target_file))

# "access" file "remotely" with s3fs
fs = s3fs.S3FileSystem(anon=True)
with open('testobj.nc', 'wb') as ncdata:
object.download_fileobj(ncdata)
with open('testobj.nc', 'rb') as ncdata:
ncfile = h5netcdf.File(ncdata, 'r', invalid_netcdf=True)
print(ncfile)

return res


@pytest.fixture(scope='session')
def aws_credentials():
"""
Mocked AWS Credentials for moto.
NOTE: Used ONLY by the pure boto3 test method spoof_boto3_s3.
"""
# NOTE: Used ONLY by the pure boto3 test method spoof_boto3_s3
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

try:
tmp = NamedTemporaryFile(delete=False)
tmp.write(b"""[wild weasel]
aws_access_key_id = testing
aws_secret_access_key = testing""")
tmp.close()
os.environ['AWS_SHARED_CREDENTIALS_FILE'] = str(tmp.name)
yield
finally:
os.unlink(tmp.name)


@pytest.fixture(scope='function')
def empty_bucket(aws_credentials):
"""Create an empty bucket."""
# NOTE: Used ONLY by the pure boto3 test method spoof_boto3_s3
moto_fake = moto.mock_aws()
try:
moto_fake.start()
conn = boto3.resource('s3')
conn.create_bucket(Bucket="MY_BUCKET")
yield conn
finally:
moto_fake.stop()


@pytest.mark.skip(reason="This test uses the pure boto3 implement which we don't need at the moment.")
def test_s3file_with_pure_boto3(empty_bucket):
ncfile = "./tests/test_data/daily_data.nc"
file_path = pathlib.Path(ncfile)
file_name = pathlib.Path(ncfile).name
# partial spoofing with only boto3+moto
result = spoof_s3("MY_BUCKET", file_name, file_path)
with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f:
ncfile = h5netcdf.File(f, 'r', invalid_netcdf=True)
assert result.get('HTTPStatusCode') == 200


def test_s3file_with_s3fs(s3fs_s3):
"""
This test spoofs a complete s3fs FileSystem via s3fs_s3,
creates a mock bucket inside it, then puts a REAL netCDF4 file in it,
then it loads it as if it was an S3 file. This is proper
Wild Weasel stuff right here.
"""
# set up physical file and Path properties
ncfile = "./tests/test_data/daily_data.nc"
file_path = pathlib.Path(ncfile)
file_name = pathlib.Path(ncfile).name

# use mocked s3fs
bucket = "MY_BUCKET"
s3fs_s3.mkdir(bucket)
s3fs_s3.put(file_path, bucket)
s3 = s3fs.S3FileSystem(
anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri}
)

# test load by h5netcdf
with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f:
print("File path", f.path)
ncfile = h5netcdf.File(f, 'r', invalid_netcdf=True)
print("File loaded from spoof S3 with h5netcdf:", ncfile)
print(ncfile["ta"])
assert "ta" in ncfile

# test Active
storage_options = dict(anon=False, version_aware=True,
client_kwargs={"endpoint_url": endpoint_uri})
with load_from_s3(os.path.join("MY_BUCKET", file_name), storage_options) as ac_file:
print(ac_file)
assert "ta" in ac_file

0 comments on commit 0e43748

Please sign in to comment.