Skip to content

Commit

Permalink
simplify via conftest and add wee test for s3
Browse files Browse the repository at this point in the history
  • Loading branch information
valeriupredoi committed Jan 14, 2025
1 parent bedf57f commit 353a836
Showing 1 changed file with 10 additions and 78 deletions.
88 changes: 10 additions & 78 deletions tests/unit/test_mock_s3.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,29 @@
import os
import s3fs
import pathlib
import json
import moto
import pytest
import h5netcdf

from tempfile import NamedTemporaryFile
from moto.moto_server.threaded_moto_server import ThreadedMotoServer
from activestorage.active import load_from_s3


# some spoofy server parameters
# needed by the spoofed s3 filesystem
port = 5555
endpoint_uri = "http://127.0.0.1:%s/" % port
test_bucket_name = "test"
versioned_bucket_name = "test-versioned"
secure_bucket_name = "test-secure"

def get_boto3_client():
from botocore.session import Session

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=endpoint_uri)

@pytest.fixture(scope="module")
def s3_base():
# writable local S3 system

# This fixture is module-scoped, meaning that we can re-use the MotoServer across all tests
#####
# lifted from https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py
#####
server = ThreadedMotoServer(ip_address="127.0.0.1", port=port)
server.start()
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
os.environ.pop("AWS_PROFILE", None)

print("server up")
yield
print("moto done")
server.stop()


@pytest.fixture()
def s3fs_s3(s3_base):
"""
Create a fully functional "virtual" S3 FileSystem compatible with fsspec/s3fs.
Method inspired by https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py
"""
client = get_boto3_client()
client.create_bucket(Bucket=test_bucket_name, ACL="public-read")

client.create_bucket(Bucket=versioned_bucket_name, ACL="public-read")
client.put_bucket_versioning(
Bucket=versioned_bucket_name, VersioningConfiguration={"Status": "Enabled"}
)

# initialize secure bucket
client.create_bucket(Bucket=secure_bucket_name, ACL="public-read")
policy = json.dumps(
{
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "DenyUnEncryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::{bucket_name}/*".format(
bucket_name=secure_bucket_name
),
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
},
}
],
}
)
def test_s3fs_s3(s3fs_s3):
"""Test mock S3 filesystem constructor."""
# this is an entire mock S3 FS
mock_s3_filesystem = s3fs_s3

client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy)
s3fs.S3FileSystem.clear_instance_cache()
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})
s3.invalidate_cache()
# explore its attributes and methods
print(dir(mock_s3_filesystem))

yield s3
assert not mock_s3_filesystem.anon
assert not mock_s3_filesystem.version_aware
assert mock_s3_filesystem.client_kwargs == {'endpoint_url': 'http://127.0.0.1:5555/'}


def spoof_boto3_s3(bucket, file_name, file_path):
Expand Down

0 comments on commit 353a836

Please sign in to comment.