From e1dc731e8960cdb728767b10502058bae7bac041 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 9 Jan 2025 12:54:25 +0000 Subject: [PATCH] removed unneeded test file --- tests/unit/test_s3fs.py | 2903 --------------------------------------- 1 file changed, 2903 deletions(-) delete mode 100644 tests/unit/test_s3fs.py diff --git a/tests/unit/test_s3fs.py b/tests/unit/test_s3fs.py deleted file mode 100644 index bc08c16..0000000 --- a/tests/unit/test_s3fs.py +++ /dev/null @@ -1,2903 +0,0 @@ -# -*- coding: utf-8 -*- -import asyncio -import errno -import datetime -from contextlib import contextmanager -import json -from concurrent.futures import ProcessPoolExecutor -import io -import os -import random -import requests -import time -import sys -import pytest -import moto -from moto.moto_server.threaded_moto_server import ThreadedMotoServer -from itertools import chain -import fsspec.core -from dateutil.tz import tzutc - -import s3fs.core -from s3fs.core import S3FileSystem -from s3fs.utils import ignoring, SSEParams -from botocore.exceptions import NoCredentialsError -from fsspec.asyn import sync -from fsspec.callbacks import Callback -from packaging import version - -test_bucket_name = "test" -secure_bucket_name = "test-secure" -versioned_bucket_name = "test-versioned" -files = { - "test/accounts.1.json": ( - b'{"amount": 100, "name": "Alice"}\n' - b'{"amount": 200, "name": "Bob"}\n' - b'{"amount": 300, "name": "Charlie"}\n' - b'{"amount": 400, "name": "Dennis"}\n' - ), - "test/accounts.2.json": ( - b'{"amount": 500, "name": "Alice"}\n' - b'{"amount": 600, "name": "Bob"}\n' - b'{"amount": 700, "name": "Charlie"}\n' - b'{"amount": 800, "name": "Dennis"}\n' - ), -} - -csv_files = { - "2014-01-01.csv": ( - b"name,amount,id\n" b"Alice,100,1\n" b"Bob,200,2\n" b"Charlie,300,3\n" - ), - "2014-01-02.csv": (b"name,amount,id\n"), - "2014-01-03.csv": ( - b"name,amount,id\n" b"Dennis,400,4\n" b"Edith,500,5\n" b"Frank,600,6\n" - ), -} -text_files = { - "nested/file1": b"hello\n", - "nested/file2": b"world", - "nested/nested2/file1": b"hello\n", - "nested/nested2/file2": b"world", -} -glob_files = {"file.dat": b"", "filexdat": b""} -a = test_bucket_name + "/tmp/test/a" -b = test_bucket_name + "/tmp/test/b" -c = test_bucket_name + "/tmp/test/c" -d = test_bucket_name + "/tmp/test/d" -port = 5555 -endpoint_uri = "http://127.0.0.1:%s/" % port - - -@pytest.fixture(scope="module") -def s3_base(): - # writable local S3 system - - # This fixture is module-scoped, meaning that we can re-use the MotoServer across all tests - server = ThreadedMotoServer(ip_address="127.0.0.1", port=port) - server.start() - if "AWS_SECRET_ACCESS_KEY" not in os.environ: - os.environ["AWS_SECRET_ACCESS_KEY"] = "foo" - if "AWS_ACCESS_KEY_ID" not in os.environ: - os.environ["AWS_ACCESS_KEY_ID"] = "foo" - os.environ.pop("AWS_PROFILE", None) - - print("server up") - yield - print("moto done") - server.stop() - - -@pytest.fixture(autouse=True) -def reset_s3_fixture(): - # We reuse the MotoServer for all tests - # But we do want a clean state for every test - try: - requests.post(f"{endpoint_uri}/moto-api/reset") - except: - pass - - -def get_boto3_client(): - from botocore.session import Session - - # NB: we use the sync botocore client for setup - session = Session() - return session.create_client("s3", endpoint_url=endpoint_uri) - - -@pytest.fixture() -def s3(s3_base): - client = get_boto3_client() - client.create_bucket(Bucket=test_bucket_name, ACL="public-read") - - client.create_bucket(Bucket=versioned_bucket_name, ACL="public-read") - client.put_bucket_versioning( - Bucket=versioned_bucket_name, VersioningConfiguration={"Status": "Enabled"} - ) - - # initialize secure bucket - client.create_bucket(Bucket=secure_bucket_name, ACL="public-read") - policy = json.dumps( - { - "Version": "2012-10-17", - "Id": "PutObjPolicy", - "Statement": [ - { - "Sid": "DenyUnEncryptedObjectUploads", - "Effect": "Deny", - "Principal": "*", - "Action": "s3:PutObject", - "Resource": "arn:aws:s3:::{bucket_name}/*".format( - bucket_name=secure_bucket_name - ), - "Condition": { - "StringNotEquals": { - "s3:x-amz-server-side-encryption": "aws:kms" - } - }, - } - ], - } - ) - client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy) - for flist in [files, csv_files, text_files, glob_files]: - for f, data in flist.items(): - client.put_object(Bucket=test_bucket_name, Key=f, Body=data) - - S3FileSystem.clear_instance_cache() - s3 = S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}) - s3.invalidate_cache() - yield s3 - - -@contextmanager -def expect_errno(expected_errno): - """Expect an OSError and validate its errno code.""" - with pytest.raises(OSError) as error: - yield - assert error.value.errno == expected_errno, "OSError has wrong error code." - - -def test_simple(s3): - data = b"a" * (10 * 2**20) - - with s3.open(a, "wb") as f: - f.write(data) - - with s3.open(a, "rb") as f: - out = f.read(len(data)) - assert len(data) == len(out) - assert out == data - - -def test_with_size(s3): - data = b"a" * (10 * 2**20) - - with s3.open(a, "wb") as f: - f.write(data) - - with s3.open(a, "rb", size=100) as f: - assert f.size == 100 - out = f.read() - assert len(out) == 100 - - -@pytest.mark.parametrize("default_cache_type", ["none", "bytes", "mmap", "readahead"]) -def test_default_cache_type(s3, default_cache_type): - data = b"a" * (10 * 2**20) - s3 = S3FileSystem( - anon=False, - default_cache_type=default_cache_type, - client_kwargs={"endpoint_url": endpoint_uri}, - ) - - with s3.open(a, "wb") as f: - f.write(data) - - with s3.open(a, "rb") as f: - assert isinstance(f.cache, fsspec.core.caches[default_cache_type]) - out = f.read(len(data)) - assert len(data) == len(out) - assert out == data - - -def test_ssl_off(): - s3 = S3FileSystem(use_ssl=False, client_kwargs={"endpoint_url": endpoint_uri}) - assert s3.s3.meta.endpoint_url.startswith("http://") - - -def test_client_kwargs(): - s3 = S3FileSystem(client_kwargs={"endpoint_url": "http://foo"}) - assert s3.s3.meta.endpoint_url.startswith("http://foo") - - -def test_config_kwargs(): - s3 = S3FileSystem( - config_kwargs={"signature_version": "s3v4"}, - client_kwargs={"endpoint_url": endpoint_uri}, - ) - assert s3.connect().meta.config.signature_version == "s3v4" - - -def test_config_kwargs_class_attributes_default(): - s3 = S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}) - assert s3.connect().meta.config.connect_timeout == 5 - assert s3.connect().meta.config.read_timeout == 15 - - -def test_config_kwargs_class_attributes_override(): - s3 = S3FileSystem( - config_kwargs={ - "connect_timeout": 60, - "read_timeout": 120, - }, - client_kwargs={"endpoint_url": endpoint_uri}, - ) - assert s3.connect().meta.config.connect_timeout == 60 - assert s3.connect().meta.config.read_timeout == 120 - - -def test_user_session_is_preserved(): - from aiobotocore.session import get_session - - session = get_session() - s3 = S3FileSystem(session=session) - s3.connect() - assert s3.session == session - - -def test_idempotent_connect(s3): - first = s3.s3 - assert s3.connect(refresh=True) is not first - - -def test_multiple_objects(s3): - s3.connect() - s3.ls("test") - s32 = S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}) - assert s32.session - assert s3.ls("test") == s32.ls("test") - - -def test_info(s3): - s3.touch(a) - s3.touch(b) - info = s3.info(a) - linfo = s3.ls(a, detail=True)[0] - assert abs(info.pop("LastModified") - linfo.pop("LastModified")).seconds < 1 - info.pop("VersionId") - info.pop("ContentType") - linfo.pop("Key") - linfo.pop("Size") - assert info == linfo - parent = a.rsplit("/", 1)[0] - s3.invalidate_cache() # remove full path from the cache - s3.ls(parent) # fill the cache with parent dir - assert s3.info(a) == s3.dircache[parent][0] # correct value - assert id(s3.info(a)) == id(s3.dircache[parent][0]) # is object from cache - assert id(s3.info(f"/{a}")) == id(s3.dircache[parent][0]) # is object from cache - - new_parent = test_bucket_name + "/foo" - s3.mkdir(new_parent) - with pytest.raises(FileNotFoundError): - s3.info(new_parent) - with pytest.raises(FileNotFoundError): - s3.ls(new_parent) - with pytest.raises(FileNotFoundError): - s3.info(new_parent) - - -def test_info_cached(s3): - path = test_bucket_name + "/tmp/" - fqpath = "s3://" + path - s3.touch(path + "test") - info = s3.info(fqpath) - assert info == s3.info(fqpath) - assert info == s3.info(path) - - -def test_checksum(s3): - bucket = test_bucket_name - d = "checksum" - prefix = d + "/e" - o1 = prefix + "1" - o2 = prefix + "2" - path1 = bucket + "/" + o1 - path2 = bucket + "/" + o2 - - client = s3.s3 - - # init client and files - sync(s3.loop, client.put_object, Bucket=bucket, Key=o1, Body="") - sync(s3.loop, client.put_object, Bucket=bucket, Key=o2, Body="") - - # change one file, using cache - sync(s3.loop, client.put_object, Bucket=bucket, Key=o1, Body="foo") - checksum = s3.checksum(path1) - s3.ls(path1) # force caching - sync(s3.loop, client.put_object, Bucket=bucket, Key=o1, Body="bar") - # refresh == False => checksum doesn't change - assert checksum == s3.checksum(path1) - - # change one file, without cache - sync(s3.loop, client.put_object, Bucket=bucket, Key=o1, Body="foo") - checksum = s3.checksum(path1, refresh=True) - s3.ls(path1) # force caching - sync(s3.loop, client.put_object, Bucket=bucket, Key=o1, Body="bar") - # refresh == True => checksum changes - assert checksum != s3.checksum(path1, refresh=True) - - # Test for nonexistent file - sync(s3.loop, client.put_object, Bucket=bucket, Key=o1, Body="bar") - s3.ls(path1) # force caching - sync(s3.loop, client.delete_object, Bucket=bucket, Key=o1) - with pytest.raises(FileNotFoundError): - s3.checksum(o1, refresh=True) - - # Test multipart upload - upload_id = sync( - s3.loop, - client.create_multipart_upload, - Bucket=bucket, - Key=o1, - )["UploadId"] - etag1 = sync( - s3.loop, - client.upload_part, - Bucket=bucket, - Key=o1, - UploadId=upload_id, - PartNumber=1, - Body="0" * (5 * 1024 * 1024), - )["ETag"] - etag2 = sync( - s3.loop, - client.upload_part, - Bucket=bucket, - Key=o1, - UploadId=upload_id, - PartNumber=2, - Body="0", - )["ETag"] - sync( - s3.loop, - client.complete_multipart_upload, - Bucket=bucket, - Key=o1, - UploadId=upload_id, - MultipartUpload={ - "Parts": [ - {"PartNumber": 1, "ETag": etag1}, - {"PartNumber": 2, "ETag": etag2}, - ] - }, - ) - s3.checksum(path1, refresh=True) - - -def test_multi_checksum(s3): - # Moto accepts the request to add checksum, and accepts the checksum mode, - # but doesn't actually return the checksum - # So, this is mostly a stub test - file_key = "checksum" - path = test_bucket_name + "/" + file_key - s3 = S3FileSystem( - anon=False, - client_kwargs={"endpoint_url": endpoint_uri}, - s3_additional_kwargs={"ChecksumAlgorithm": "SHA256"}, - ) - with s3.open( - path, - "wb", - blocksize=5 * 2**20, - ) as f: - f.write(b"0" * (5 * 2**20 + 1)) # starts multipart and puts first part - f.write(b"data") # any extra data - assert s3.cat(path) == b"0" * (5 * 2**20 + 1) + b"data" - FileHead = sync( - s3.loop, - s3.s3.head_object, - Bucket=test_bucket_name, - Key=file_key, - ChecksumMode="ENABLED", - ) - # assert "ChecksumSHA256" in FileHead - - -test_xattr_sample_metadata = {"testxattr": "1"} - - -def test_xattr(s3): - bucket, key = (test_bucket_name, "tmp/test/xattr") - filename = bucket + "/" + key - body = b"aaaa" - public_read_acl = { - "Permission": "READ", - "Grantee": { - "URI": "http://acs.amazonaws.com/groups/global/AllUsers", - "Type": "Group", - }, - } - - resp = sync( - s3.loop, - s3.s3.put_object, - Bucket=bucket, - Key=key, - ACL="public-read", - Metadata=test_xattr_sample_metadata, - Body=body, - ) - - # save etag for later - etag = s3.info(filename)["ETag"] - assert ( - public_read_acl - in sync(s3.loop, s3.s3.get_object_acl, Bucket=bucket, Key=key)["Grants"] - ) - - assert s3.getxattr(filename, "testxattr") == test_xattr_sample_metadata["testxattr"] - assert s3.metadata(filename) == {"testxattr": "1"} # note _ became - - - s3file = s3.open(filename) - assert s3file.getxattr("testxattr") == test_xattr_sample_metadata["testxattr"] - assert s3file.metadata() == {"testxattr": "1"} # note _ became - - - s3file.setxattr(testxattr="2") - assert s3file.getxattr("testxattr") == "2" - s3file.setxattr(**{"testxattr": None}) - assert s3file.metadata() == {} - assert s3.cat(filename) == body - - # check that ACL and ETag are preserved after updating metadata - assert ( - public_read_acl - in sync(s3.loop, s3.s3.get_object_acl, Bucket=bucket, Key=key)["Grants"] - ) - assert s3.info(filename)["ETag"] == etag - - -def test_xattr_setxattr_in_write_mode(s3): - s3file = s3.open(a, "wb") - with pytest.raises(NotImplementedError): - s3file.setxattr(test_xattr="1") - - -@pytest.mark.xfail() -def test_delegate(s3): - out = s3.get_delegated_s3pars() - assert out - assert out["token"] - s32 = S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}, **out) - assert not s32.anon - assert out == s32.get_delegated_s3pars() - - -def test_not_delegate(): - s3 = S3FileSystem(anon=True, client_kwargs={"endpoint_url": endpoint_uri}) - out = s3.get_delegated_s3pars() - assert out == {"anon": True} - s3 = S3FileSystem( - anon=False, client_kwargs={"endpoint_url": endpoint_uri} - ) # auto credentials - out = s3.get_delegated_s3pars() - assert out == {"anon": False} - - -def test_ls(s3): - assert set(s3.ls("", detail=False)) == { - test_bucket_name, - secure_bucket_name, - versioned_bucket_name, - } - with pytest.raises(FileNotFoundError): - s3.ls("nonexistent") - fn = test_bucket_name + "/test/accounts.1.json" - assert fn in s3.ls(test_bucket_name + "/test", detail=False) - - -def test_pickle(s3): - import pickle - - s32 = pickle.loads(pickle.dumps(s3)) - assert s3.ls("test") == s32.ls("test") - s33 = pickle.loads(pickle.dumps(s32)) - assert s3.ls("test") == s33.ls("test") - - -def test_ls_touch(s3): - assert not s3.exists(test_bucket_name + "/tmp/test") - s3.touch(a) - s3.touch(b) - L = s3.ls(test_bucket_name + "/tmp/test", True) - assert {d["Key"] for d in L} == {a, b} - L = s3.ls(test_bucket_name + "/tmp/test", False) - assert set(L) == {a, b} - - -@pytest.mark.parametrize("version_aware", [True, False]) -def test_exists_versioned(s3, version_aware): - """Test to ensure that a prefix exists when using a versioned bucket""" - import uuid - - n = 3 - s3 = S3FileSystem( - anon=False, - version_aware=version_aware, - client_kwargs={"endpoint_url": endpoint_uri}, - ) - segments = [versioned_bucket_name] + [str(uuid.uuid4()) for _ in range(n)] - path = "/".join(segments) - for i in range(2, n + 1): - assert not s3.exists("/".join(segments[:i])) - s3.touch(path) - for i in range(2, n + 1): - assert s3.exists("/".join(segments[:i])) - - -def test_isfile(s3): - assert not s3.isfile("") - assert not s3.isfile("/") - assert not s3.isfile(test_bucket_name) - assert not s3.isfile(test_bucket_name + "/test") - - assert not s3.isfile(test_bucket_name + "/test/foo") - assert s3.isfile(test_bucket_name + "/test/accounts.1.json") - assert s3.isfile(test_bucket_name + "/test/accounts.2.json") - - assert not s3.isfile(a) - s3.touch(a) - assert s3.isfile(a) - - assert not s3.isfile(b) - assert not s3.isfile(b + "/") - s3.mkdir(b) - assert not s3.isfile(b) - assert not s3.isfile(b + "/") - - assert not s3.isfile(c) - assert not s3.isfile(c + "/") - s3.mkdir(c + "/") - assert not s3.isfile(c) - assert not s3.isfile(c + "/") - - -def test_isdir(s3): - assert s3.isdir("") - assert s3.isdir("/") - assert s3.isdir(test_bucket_name) - assert s3.isdir(test_bucket_name + "/test") - - assert not s3.isdir(test_bucket_name + "/test/foo") - assert not s3.isdir(test_bucket_name + "/test/accounts.1.json") - assert not s3.isdir(test_bucket_name + "/test/accounts.2.json") - - assert not s3.isdir(a) - s3.touch(a) - assert not s3.isdir(a) - - assert not s3.isdir(b) - assert not s3.isdir(b + "/") - - assert not s3.isdir(c) - assert not s3.isdir(c + "/") - - # test cache - s3.invalidate_cache() - assert not s3.dircache - s3.ls(test_bucket_name + "/nested") - assert test_bucket_name + "/nested" in s3.dircache - assert not s3.isdir(test_bucket_name + "/nested/file1") - assert not s3.isdir(test_bucket_name + "/nested/file2") - assert s3.isdir(test_bucket_name + "/nested/nested2") - assert s3.isdir(test_bucket_name + "/nested/nested2/") - - -def test_rm(s3): - assert not s3.exists(a) - s3.touch(a) - assert s3.exists(a) - s3.rm(a) - assert not s3.exists(a) - # the API is OK with deleting non-files; maybe this is an effect of using bulk - # with pytest.raises(FileNotFoundError): - # s3.rm(test_bucket_name + '/nonexistent') - with pytest.raises(FileNotFoundError): - s3.rm("nonexistent") - out = s3.rm(test_bucket_name + "/nested", recursive=True) - assert test_bucket_name + "/nested/nested2/file1" in out - assert not s3.exists(test_bucket_name + "/nested/nested2/file1") - - # whole bucket - out = s3.rm(test_bucket_name, recursive=True) - assert test_bucket_name + "/2014-01-01.csv" in out - assert not s3.exists(test_bucket_name + "/2014-01-01.csv") - assert not s3.exists(test_bucket_name) - - -def test_rmdir(s3): - bucket = "test1_bucket" - s3.mkdir(bucket) - s3.rmdir(bucket) - assert bucket not in s3.ls("/") - - # Issue 689, s3fs rmdir command returns error when given a valid s3 path. - dir = test_bucket_name + "/dir" - - assert not s3.exists(dir) - with pytest.raises(FileNotFoundError): - s3.rmdir(dir) - - s3.touch(dir + "/file") - assert s3.exists(dir) - assert s3.exists(dir + "/file") - with pytest.raises(FileExistsError): - s3.rmdir(dir) - - with pytest.raises(OSError): - s3.rmdir(test_bucket_name) - - -def test_mkdir(s3): - bucket = "test1_bucket" - s3.mkdir(bucket) - assert bucket in s3.ls("/") - - -def test_mkdir_existing_bucket(s3): - # mkdir called on existing bucket should be no-op and not calling create_bucket - # creating a s3 bucket - bucket = "test1_bucket" - s3.mkdir(bucket) - assert bucket in s3.ls("/") - # a second call. - with pytest.raises(FileExistsError): - s3.mkdir(bucket) - - -def test_mkdir_bucket_and_key_1(s3): - bucket = "test1_bucket" - file = bucket + "/a/b/c" - s3.mkdir(file, create_parents=True) - assert bucket in s3.ls("/") - - -def test_mkdir_bucket_and_key_2(s3): - bucket = "test1_bucket" - file = bucket + "/a/b/c" - with pytest.raises(FileNotFoundError): - s3.mkdir(file, create_parents=False) - assert bucket not in s3.ls("/") - - -def test_mkdir_region_name(s3): - bucket = "test2_bucket" - s3.mkdir(bucket, region_name="eu-central-1") - assert bucket in s3.ls("/") - - -def test_mkdir_client_region_name(s3): - bucket = "test3_bucket" - s3 = S3FileSystem( - anon=False, - client_kwargs={"region_name": "eu-central-1", "endpoint_url": endpoint_uri}, - ) - s3.mkdir(bucket) - assert bucket in s3.ls("/") - - -def test_makedirs(s3): - bucket = "test_makedirs_bucket" - test_file = bucket + "/a/b/c/file" - s3.makedirs(test_file) - assert bucket in s3.ls("/") - - -def test_makedirs_existing_bucket(s3): - bucket = "test_makedirs_bucket" - s3.mkdir(bucket) - assert bucket in s3.ls("/") - test_file = bucket + "/a/b/c/file" - # no-op, and no error. - s3.makedirs(test_file) - - -def test_makedirs_pure_bucket_exist_ok(s3): - bucket = "test1_bucket" - s3.mkdir(bucket) - s3.makedirs(bucket, exist_ok=True) - - -def test_makedirs_pure_bucket_error_on_exist(s3): - bucket = "test1_bucket" - s3.mkdir(bucket) - with pytest.raises(FileExistsError): - s3.makedirs(bucket, exist_ok=False) - - -def test_bulk_delete(s3): - with pytest.raises(FileNotFoundError): - s3.rm(["nonexistent/file"]) - filelist = s3.find(test_bucket_name + "/nested") - s3.rm(filelist) - assert not s3.exists(test_bucket_name + "/nested/nested2/file1") - - -@pytest.mark.xfail(reason="anon user is still privileged on moto") -def test_anonymous_access(s3): - with ignoring(NoCredentialsError): - s3 = S3FileSystem(anon=True, client_kwargs={"endpoint_url": endpoint_uri}) - assert s3.ls("") == [] - # TODO: public bucket doesn't work through moto - - with pytest.raises(PermissionError): - s3.mkdir("newbucket") - - -def test_s3_file_access(s3): - fn = test_bucket_name + "/nested/file1" - data = b"hello\n" - assert s3.cat(fn) == data - assert s3.head(fn, 3) == data[:3] - assert s3.tail(fn, 3) == data[-3:] - assert s3.tail(fn, 10000) == data - - -def test_s3_file_info(s3): - fn = test_bucket_name + "/nested/file1" - data = b"hello\n" - assert fn in s3.find(test_bucket_name) - assert s3.exists(fn) - assert not s3.exists(fn + "another") - assert s3.info(fn)["Size"] == len(data) - with pytest.raises(FileNotFoundError): - s3.info(fn + "another") - - -def test_content_type_is_set(s3, tmpdir): - test_file = str(tmpdir) + "/test.json" - destination = test_bucket_name + "/test.json" - open(test_file, "w").write("text") - s3.put(test_file, destination) - assert s3.info(destination)["ContentType"] == "application/json" - - -def test_content_type_is_not_overrided(s3, tmpdir): - test_file = os.path.join(str(tmpdir), "test.json") - destination = os.path.join(test_bucket_name, "test.json") - open(test_file, "w").write("text") - s3.put(test_file, destination, ContentType="text/css") - assert s3.info(destination)["ContentType"] == "text/css" - - -def test_bucket_exists(s3): - assert s3.exists(test_bucket_name) - assert not s3.exists(test_bucket_name + "x") - s3 = S3FileSystem(anon=True, client_kwargs={"endpoint_url": endpoint_uri}) - assert s3.exists(test_bucket_name) - assert not s3.exists(test_bucket_name + "x") - - -def test_du(s3): - d = s3.du(test_bucket_name, total=False) - assert all(isinstance(v, int) and v >= 0 for v in d.values()) - assert test_bucket_name + "/nested/file1" in d - - assert s3.du(test_bucket_name + "/test/", total=True) == sum( - map(len, files.values()) - ) - assert s3.du(test_bucket_name) == s3.du("s3://" + test_bucket_name) - - # Issue 450, s3.du of non-existent directory - dir = test_bucket_name + "/does-not-exist" - assert not s3.exists(dir) - assert s3.du(dir) == 0 - assert s3.du(dir + "/") == 0 - - -def test_s3_ls(s3): - fn = test_bucket_name + "/nested/file1" - assert fn not in s3.ls(test_bucket_name + "/") - assert fn in s3.ls(test_bucket_name + "/nested/") - assert fn in s3.ls(test_bucket_name + "/nested") - assert s3.ls("s3://" + test_bucket_name + "/nested/") == s3.ls( - test_bucket_name + "/nested" - ) - - -def test_s3_big_ls(s3): - for x in range(1200): - s3.touch(test_bucket_name + "/thousand/%i.part" % x) - assert len(s3.find(test_bucket_name)) > 1200 - s3.rm(test_bucket_name + "/thousand/", recursive=True) - assert len(s3.find(test_bucket_name + "/thousand/")) == 0 - - -def test_s3_ls_detail(s3): - L = s3.ls(test_bucket_name + "/nested", detail=True) - assert all(isinstance(item, dict) for item in L) - - -def test_s3_glob(s3): - fn = test_bucket_name + "/nested/file1" - assert fn not in s3.glob(test_bucket_name + "/") - assert fn not in s3.glob(test_bucket_name + "/*") - assert fn not in s3.glob(test_bucket_name + "/nested") - assert fn in s3.glob(test_bucket_name + "/nested/*") - assert fn in s3.glob(test_bucket_name + "/nested/file*") - assert fn in s3.glob(test_bucket_name + "/*/*") - assert all( - any(p.startswith(f + "/") or p == f for p in s3.find(test_bucket_name)) - for f in s3.glob(test_bucket_name + "/nested/*") - ) - assert [test_bucket_name + "/nested/nested2"] == s3.glob( - test_bucket_name + "/nested/nested2" - ) - out = s3.glob(test_bucket_name + "/nested/nested2/*") - assert {"test/nested/nested2/file1", "test/nested/nested2/file2"} == set(out) - - with pytest.raises(ValueError): - s3.glob("*") - - # Make sure glob() deals with the dot character (.) correctly. - assert test_bucket_name + "/file.dat" in s3.glob(test_bucket_name + "/file.*") - assert test_bucket_name + "/filexdat" not in s3.glob(test_bucket_name + "/file.*") - - -def test_get_list_of_summary_objects(s3): - L = s3.ls(test_bucket_name + "/test") - - assert len(L) == 2 - assert [l.lstrip(test_bucket_name).lstrip("/") for l in sorted(L)] == sorted( - list(files) - ) - - L2 = s3.ls("s3://" + test_bucket_name + "/test") - - assert L == L2 - - -def test_read_keys_from_bucket(s3): - for k, data in files.items(): - file_contents = s3.cat("/".join([test_bucket_name, k])) - assert file_contents == data - - assert s3.cat("/".join([test_bucket_name, k])) == s3.cat( - "s3://" + "/".join([test_bucket_name, k]) - ) - - -def test_url(s3): - fn = test_bucket_name + "/nested/file1" - url = s3.url(fn, expires=100) - assert "http" in url - import urllib.parse - - components = urllib.parse.urlparse(url) - query = urllib.parse.parse_qs(components.query) - exp = int(query["Expires"][0]) - - delta = abs(exp - time.time() - 100) - assert delta < 5 - - with s3.open(fn) as f: - assert "http" in f.url() - - -def test_seek(s3): - with s3.open(a, "wb") as f: - f.write(b"123") - - with s3.open(a) as f: - assert f.read() == b"123" - - with s3.open(a) as f: - f.seek(1000) - with pytest.raises(ValueError): - f.seek(-1) - with pytest.raises(ValueError): - f.seek(-5, 2) - with pytest.raises(ValueError): - f.seek(0, 10) - f.seek(0) - assert f.read(1) == b"1" - f.seek(0) - assert f.read(1) == b"1" - f.seek(3) - assert f.read(1) == b"" - f.seek(-1, 2) - assert f.read(1) == b"3" - f.seek(-1, 1) - f.seek(-1, 1) - assert f.read(1) == b"2" - for i in range(4): - assert f.seek(i) == i - - -def test_bad_open(s3): - with pytest.raises(ValueError): - s3.open("") - - -def test_copy(s3): - fn = test_bucket_name + "/test/accounts.1.json" - s3.copy(fn, fn + "2") - assert s3.cat(fn) == s3.cat(fn + "2") - - -def test_copy_managed(s3): - data = b"abc" * 12 * 2**20 - fn = test_bucket_name + "/test/biggerfile" - with s3.open(fn, "wb") as f: - f.write(data) - sync(s3.loop, s3._copy_managed, fn, fn + "2", size=len(data), block=5 * 2**20) - assert s3.cat(fn) == s3.cat(fn + "2") - with pytest.raises(ValueError): - sync(s3.loop, s3._copy_managed, fn, fn + "3", size=len(data), block=4 * 2**20) - with pytest.raises(ValueError): - sync(s3.loop, s3._copy_managed, fn, fn + "3", size=len(data), block=6 * 2**30) - - -@pytest.mark.parametrize("recursive", [True, False]) -def test_move(s3, recursive): - fn = test_bucket_name + "/test/accounts.1.json" - data = s3.cat(fn) - s3.mv(fn, fn + "2", recursive=recursive) - assert s3.cat(fn + "2") == data - assert not s3.exists(fn) - - -def test_get_put(s3, tmpdir): - test_file = str(tmpdir.join("test.json")) - - s3.get(test_bucket_name + "/test/accounts.1.json", test_file) - data = files["test/accounts.1.json"] - assert open(test_file, "rb").read() == data - s3.put(test_file, test_bucket_name + "/temp") - assert s3.du(test_bucket_name + "/temp", total=False)[ - test_bucket_name + "/temp" - ] == len(data) - assert s3.cat(test_bucket_name + "/temp") == data - - -def test_get_put_big(s3, tmpdir): - test_file = str(tmpdir.join("test")) - data = b"1234567890A" * 2**20 - open(test_file, "wb").write(data) - - s3.put(test_file, test_bucket_name + "/bigfile") - test_file = str(tmpdir.join("test2")) - s3.get(test_bucket_name + "/bigfile", test_file) - assert open(test_file, "rb").read() == data - - -def test_get_put_with_callback(s3, tmpdir): - test_file = str(tmpdir.join("test.json")) - - class BranchingCallback(Callback): - def branch(self, path_1, path_2, kwargs): - kwargs["callback"] = BranchingCallback() - - cb = BranchingCallback() - s3.get(test_bucket_name + "/test/accounts.1.json", test_file, callback=cb) - assert cb.size == 1 - assert cb.value == 1 - - cb = BranchingCallback() - s3.put(test_file, test_bucket_name + "/temp", callback=cb) - assert cb.size == 1 - assert cb.value == 1 - - -def test_get_file_with_callback(s3, tmpdir): - test_file = str(tmpdir.join("test.json")) - - cb = Callback() - s3.get_file(test_bucket_name + "/test/accounts.1.json", test_file, callback=cb) - assert cb.size == os.stat(test_file).st_size - assert cb.value == cb.size - - -def test_get_file_with_kwargs(s3, tmpdir): - test_file = str(tmpdir.join("test.json")) - - get_file_kwargs = {"max_concurency": 1, "random_kwarg": "value"} - s3.get_file( - test_bucket_name + "/test/accounts.1.json", test_file, **get_file_kwargs - ) - - -@pytest.mark.parametrize("size", [2**10, 10 * 2**20]) -def test_put_file_with_callback(s3, tmpdir, size): - test_file = str(tmpdir.join("test.json")) - with open(test_file, "wb") as f: - f.write(b"1234567890A" * size) - - cb = Callback() - s3.put_file(test_file, test_bucket_name + "/temp", callback=cb) - assert cb.size == os.stat(test_file).st_size - assert cb.value == cb.size - - assert s3.size(test_bucket_name + "/temp") == 11 * size - - -@pytest.mark.parametrize("factor", [1, 5, 6]) -def test_put_file_does_not_truncate(s3, tmpdir, factor): - test_file = str(tmpdir.join("test.json")) - - chunksize = 5 * 2**20 - block = b"x" * chunksize - - with open(test_file, "wb") as f: - f.write(block * factor) - - s3.put_file( - test_file, test_bucket_name + "/temp", max_concurrency=5, chunksize=chunksize - ) - assert s3.size(test_bucket_name + "/temp") == factor * chunksize - - -@pytest.mark.parametrize("size", [2**10, 2**20, 10 * 2**20]) -def test_pipe_cat_big(s3, size): - data = b"1234567890A" * size - s3.pipe(test_bucket_name + "/bigfile", data) - assert s3.cat(test_bucket_name + "/bigfile") == data - - -def test_errors(s3): - with pytest.raises(FileNotFoundError): - s3.open(test_bucket_name + "/tmp/test/shfoshf", "rb") - - # This is fine, no need for interleaving directories on S3 - # with pytest.raises((IOError, OSError)): - # s3.touch('tmp/test/shfoshf/x') - - # Deleting nonexistent or zero paths is allowed for now - # with pytest.raises(FileNotFoundError): - # s3.rm(test_bucket_name + '/tmp/test/shfoshf/x') - - with pytest.raises(FileNotFoundError): - s3.mv(test_bucket_name + "/tmp/test/shfoshf/x", "tmp/test/shfoshf/y") - - with pytest.raises(ValueError): - s3.open("x", "rb") - - with pytest.raises(FileNotFoundError): - s3.rm("unknown") - - with pytest.raises(ValueError): - with s3.open(test_bucket_name + "/temp", "wb") as f: - f.read() - - with pytest.raises(ValueError): - f = s3.open(test_bucket_name + "/temp", "rb") - f.close() - f.read() - - with pytest.raises(ValueError): - s3.mkdir("/") - - with pytest.raises(ValueError): - s3.find("") - - with pytest.raises(ValueError): - s3.find("s3://") - - -def test_errors_cause_preservings(monkeypatch, s3): - # We translate the error, and preserve the original one - with pytest.raises(FileNotFoundError) as exc: - s3.rm("unknown") - - assert type(exc.value.__cause__).__name__ == "NoSuchBucket" - - async def head_object(*args, **kwargs): - raise NoCredentialsError - - monkeypatch.setattr(type(s3.s3), "head_object", head_object) - - # Since the error is not translate, the __cause__ would - # be None - with pytest.raises(NoCredentialsError) as exc: - s3.info("test/a.txt") - - assert exc.value.__cause__ is None - - -def test_read_small(s3): - fn = test_bucket_name + "/2014-01-01.csv" - with s3.open(fn, "rb", block_size=10, cache_type="bytes") as f: - out = [] - while True: - data = f.read(3) - if data == b"": - break - out.append(data) - assert s3.cat(fn) == b"".join(out) - # cache drop - assert len(f.cache) < len(out) - - -def test_read_s3_block(s3): - data = files["test/accounts.1.json"] - lines = io.BytesIO(data).readlines() - path = test_bucket_name + "/test/accounts.1.json" - assert s3.read_block(path, 1, 35, b"\n") == lines[1] - assert s3.read_block(path, 0, 30, b"\n") == lines[0] - assert s3.read_block(path, 0, 35, b"\n") == lines[0] + lines[1] - assert s3.read_block(path, 0, 5000, b"\n") == data - assert len(s3.read_block(path, 0, 5)) == 5 - assert len(s3.read_block(path, 4, 5000)) == len(data) - 4 - assert s3.read_block(path, 5000, 5010) == b"" - - assert s3.read_block(path, 5, None) == s3.read_block(path, 5, 1000) - - -def test_new_bucket(s3): - assert not s3.exists("new") - s3.mkdir("new") - assert s3.exists("new") - with s3.open("new/temp", "wb") as f: - f.write(b"hello") - with pytest.raises(OSError): - s3.rmdir("new") - - s3.rm("new/temp") - s3.rmdir("new") - assert "new" not in s3.ls("") - assert not s3.exists("new") - with pytest.raises(FileNotFoundError): - s3.ls("new") - - -def test_new_bucket_auto(s3): - assert not s3.exists("new") - with pytest.raises(Exception): - s3.mkdir("new/other", create_parents=False) - s3.mkdir("new/other", create_parents=True) - assert s3.exists("new") - s3.touch("new/afile") - with pytest.raises(Exception): - s3.rm("new") - with pytest.raises(Exception): - s3.rmdir("new") - s3.rm("new", recursive=True) - assert not s3.exists("new") - - -def test_dynamic_add_rm(s3): - s3.mkdir("one") - s3.mkdir("one/two") - assert s3.exists("one") - s3.ls("one") - s3.touch("one/two/file_a") - assert s3.exists("one/two/file_a") - s3.rm("one", recursive=True) - assert not s3.exists("one") - - -def test_write_small(s3): - with s3.open(test_bucket_name + "/test", "wb") as f: - f.write(b"hello") - assert s3.cat(test_bucket_name + "/test") == b"hello" - s3.open(test_bucket_name + "/test", "wb").close() - assert s3.info(test_bucket_name + "/test")["size"] == 0 - - -def test_write_small_with_acl(s3): - bucket, key = (test_bucket_name, "test-acl") - filename = bucket + "/" + key - body = b"hello" - public_read_acl = { - "Permission": "READ", - "Grantee": { - "URI": "http://acs.amazonaws.com/groups/global/AllUsers", - "Type": "Group", - }, - } - - with s3.open(filename, "wb", acl="public-read") as f: - f.write(body) - assert s3.cat(filename) == body - - assert ( - public_read_acl - in sync(s3.loop, s3.s3.get_object_acl, Bucket=bucket, Key=key)["Grants"] - ) - - -def test_write_large(s3): - "flush() chunks buffer when processing large singular payload" - mb = 2**20 - payload_size = int(2.5 * 5 * mb) - payload = b"0" * payload_size - - with s3.open(test_bucket_name + "/test", "wb") as fd: - fd.write(payload) - - assert s3.cat(test_bucket_name + "/test") == payload - assert s3.info(test_bucket_name + "/test")["size"] == payload_size - - -def test_write_limit(s3): - "flush() respects part_max when processing large singular payload" - mb = 2**20 - block_size = 15 * mb - payload_size = 44 * mb - payload = b"0" * payload_size - - with s3.open(test_bucket_name + "/test", "wb", blocksize=block_size) as fd: - fd.write(payload) - - assert s3.cat(test_bucket_name + "/test") == payload - - assert s3.info(test_bucket_name + "/test")["size"] == payload_size - - -def test_write_small_secure(s3): - s3 = S3FileSystem( - s3_additional_kwargs={"ServerSideEncryption": "aws:kms"}, - client_kwargs={"endpoint_url": endpoint_uri}, - ) - s3.mkdir("mybucket") - with s3.open("mybucket/test", "wb") as f: - f.write(b"hello") - assert s3.cat("mybucket/test") == b"hello" - sync(s3.loop, s3.s3.head_object, Bucket="mybucket", Key="test") - - -def test_write_large_secure(s3): - # build our own s3fs with the relevant additional kwarg - s3 = S3FileSystem( - s3_additional_kwargs={"ServerSideEncryption": "AES256"}, - client_kwargs={"endpoint_url": endpoint_uri}, - ) - s3.mkdir("mybucket") - - with s3.open("mybucket/myfile", "wb") as f: - f.write(b"hello hello" * 10**6) - - assert s3.cat("mybucket/myfile") == b"hello hello" * 10**6 - - -def test_write_fails(s3): - with pytest.raises(ValueError): - s3.touch(test_bucket_name + "/temp") - s3.open(test_bucket_name + "/temp", "rb").write(b"hello") - with pytest.raises(ValueError): - s3.open(test_bucket_name + "/temp", "wb", block_size=10) - f = s3.open(test_bucket_name + "/temp", "wb") - f.close() - with pytest.raises(ValueError): - f.write(b"hello") - with pytest.raises(FileNotFoundError): - s3.open("nonexistentbucket/temp", "wb").close() - - -def test_write_blocks(s3): - with s3.open(test_bucket_name + "/temp", "wb", block_size=5 * 2**20) as f: - f.write(b"a" * 2 * 2**20) - assert f.buffer.tell() == 2 * 2**20 - assert not (f.parts) - f.flush() - assert f.buffer.tell() == 2 * 2**20 - assert not (f.parts) - f.write(b"a" * 2 * 2**20) - f.write(b"a" * 2 * 2**20) - assert f.mpu - assert f.parts - assert s3.info(test_bucket_name + "/temp")["size"] == 6 * 2**20 - with s3.open(test_bucket_name + "/temp2", "wb", block_size=10 * 2**20) as f: - f.write(b"a" * 15 * 2**20) - assert f.buffer.tell() == 0 - assert s3.info(test_bucket_name + "/temp2")["size"] == 15 * 2**20 - - -def test_readline(s3): - all_items = chain.from_iterable( - [files.items(), csv_files.items(), text_files.items()] - ) - for k, data in all_items: - with s3.open("/".join([test_bucket_name, k]), "rb") as f: - result = f.readline() - expected = data.split(b"\n")[0] + (b"\n" if data.count(b"\n") else b"") - assert result == expected - - -def test_readline_empty(s3): - data = b"" - with s3.open(a, "wb") as f: - f.write(data) - with s3.open(a, "rb") as f: - result = f.readline() - assert result == data - - -def test_readline_blocksize(s3): - data = b"ab\n" + b"a" * (10 * 2**20) + b"\nab" - with s3.open(a, "wb") as f: - f.write(data) - with s3.open(a, "rb") as f: - result = f.readline() - expected = b"ab\n" - assert result == expected - - result = f.readline() - expected = b"a" * (10 * 2**20) + b"\n" - assert result == expected - - result = f.readline() - expected = b"ab" - assert result == expected - - -def test_next(s3): - expected = csv_files["2014-01-01.csv"].split(b"\n")[0] + b"\n" - with s3.open(test_bucket_name + "/2014-01-01.csv") as f: - result = next(f) - assert result == expected - - -def test_iterable(s3): - data = b"abc\n123" - with s3.open(a, "wb") as f: - f.write(data) - with s3.open(a) as f, io.BytesIO(data) as g: - for froms3, fromio in zip(f, g): - assert froms3 == fromio - f.seek(0) - assert f.readline() == b"abc\n" - assert f.readline() == b"123" - f.seek(1) - assert f.readline() == b"bc\n" - - with s3.open(a) as f: - out = list(f) - with s3.open(a) as f: - out2 = f.readlines() - assert out == out2 - assert b"".join(out) == data - - -def test_readable(s3): - with s3.open(a, "wb") as f: - assert not f.readable() - - with s3.open(a, "rb") as f: - assert f.readable() - - -def test_seekable(s3): - with s3.open(a, "wb") as f: - assert not f.seekable() - - with s3.open(a, "rb") as f: - assert f.seekable() - - -def test_writable(s3): - with s3.open(a, "wb") as f: - assert f.writable() - - with s3.open(a, "rb") as f: - assert not f.writable() - - -def test_merge(s3): - with s3.open(a, "wb") as f: - f.write(b"a" * 10 * 2**20) - - with s3.open(b, "wb") as f: - f.write(b"a" * 10 * 2**20) - s3.merge(test_bucket_name + "/joined", [a, b]) - assert s3.info(test_bucket_name + "/joined")["size"] == 2 * 10 * 2**20 - - -def test_append(s3): - data = text_files["nested/file1"] - with s3.open(test_bucket_name + "/nested/file1", "ab") as f: - assert f.tell() == len(data) # append, no write, small file - assert s3.cat(test_bucket_name + "/nested/file1") == data - with s3.open(test_bucket_name + "/nested/file1", "ab") as f: - f.write(b"extra") # append, write, small file - assert s3.cat(test_bucket_name + "/nested/file1") == data + b"extra" - - with s3.open(a, "wb") as f: - f.write(b"a" * 10 * 2**20) - with s3.open(a, "ab") as f: - pass # append, no write, big file - data = s3.cat(a) - assert len(data) == 10 * 2**20 and set(data) == set(b"a") - - with s3.open(a, "ab") as f: - assert f.parts is None - f._initiate_upload() - assert f.parts - assert f.tell() == 10 * 2**20 - f.write(b"extra") # append, small write, big file - data = s3.cat(a) - assert len(data) == 10 * 2**20 + len(b"extra") - assert data[-5:] == b"extra" - - with s3.open(a, "ab") as f: - assert f.tell() == 10 * 2**20 + 5 - f.write(b"b" * 10 * 2**20) # append, big write, big file - assert f.tell() == 20 * 2**20 + 5 - data = s3.cat(a) - assert len(data) == 10 * 2**20 + len(b"extra") + 10 * 2**20 - assert data[10 * 2**20 : 10 * 2**20 + 5] == b"extra" - assert set(data[-10 * 2**20 :]) == set(b"b") - - # Keep Head Metadata - head = dict( - CacheControl="public", - ContentDisposition="string", - ContentEncoding="gzip", - ContentLanguage="ru-RU", - ContentType="text/csv", - Expires=datetime.datetime(2015, 1, 1, 0, 0, tzinfo=tzutc()), - Metadata={"string": "string"}, - ServerSideEncryption="AES256", - StorageClass="REDUCED_REDUNDANCY", - WebsiteRedirectLocation="https://www.example.com/", - ) - with s3.open(a, "wb", **head) as f: - f.write(b"data") - - with s3.open(a, "ab") as f: - f.write(b"other") - - with s3.open(a) as f: - filehead = { - k: v - for k, v in f._call_s3( - "head_object", f.kwargs, Bucket=f.bucket, Key=f.key - ).items() - if k in head - } - assert filehead == head - - -def test_bigger_than_block_read(s3): - with s3.open(test_bucket_name + "/2014-01-01.csv", "rb", block_size=3) as f: - out = [] - while True: - data = f.read(20) - out.append(data) - if len(data) == 0: - break - assert b"".join(out) == csv_files["2014-01-01.csv"] - - -def test_current(s3): - s3._cache.clear() - s3 = S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}) - assert s3.current() is s3 - assert S3FileSystem.current() is s3 - - -def test_array(s3): - from array import array - - data = array("B", [65] * 1000) - - with s3.open(a, "wb") as f: - f.write(data) - - with s3.open(a, "rb") as f: - out = f.read() - assert out == b"A" * 1000 - - -def _get_s3_id(s3): - return id(s3.s3) - - -@pytest.mark.parametrize( - "method", - [ - "spawn", - pytest.param( - "forkserver", - marks=pytest.mark.skipif( - sys.platform.startswith("win"), - reason="'forkserver' not available on windows", - ), - ), - ], -) -def test_no_connection_sharing_among_processes(s3, method): - import multiprocessing as mp - - ctx = mp.get_context(method) - executor = ProcessPoolExecutor(mp_context=ctx) - conn_id = executor.submit(_get_s3_id, s3).result() - assert id(s3.connect()) != conn_id, "Processes should not share S3 connections." - - -@pytest.mark.xfail() -def test_public_file(s3): - # works on real s3, not on moto - test_bucket_name = "s3fs_public_test" - other_bucket_name = "s3fs_private_test" - - s3.touch(test_bucket_name) - s3.touch(test_bucket_name + "/afile") - s3.touch(other_bucket_name, acl="public-read") - s3.touch(other_bucket_name + "/afile", acl="public-read") - - s = S3FileSystem(anon=True, client_kwargs={"endpoint_url": endpoint_uri}) - with pytest.raises(PermissionError): - s.ls(test_bucket_name) - s.ls(other_bucket_name) - - s3.chmod(test_bucket_name, acl="public-read") - s3.chmod(other_bucket_name, acl="private") - with pytest.raises(PermissionError): - s.ls(other_bucket_name, refresh=True) - assert s.ls(test_bucket_name, refresh=True) - - # public file in private bucket - with s3.open(other_bucket_name + "/see_me", "wb", acl="public-read") as f: - f.write(b"hello") - assert s.cat(other_bucket_name + "/see_me") == b"hello" - - -def test_upload_with_s3fs_prefix(s3): - path = "s3://test/prefix/key" - - with s3.open(path, "wb") as f: - f.write(b"a" * (10 * 2**20)) - - with s3.open(path, "ab") as f: - f.write(b"b" * (10 * 2**20)) - - -def test_multipart_upload_blocksize(s3): - blocksize = 5 * (2**20) - expected_parts = 3 - - s3f = s3.open(a, "wb", block_size=blocksize) - for _ in range(3): - data = b"b" * blocksize - s3f.write(data) - - # Ensure that the multipart upload consists of only 3 parts - assert len(s3f.parts) == expected_parts - s3f.close() - - -def test_default_pars(s3): - s3 = S3FileSystem( - default_block_size=20, - default_fill_cache=False, - client_kwargs={"endpoint_url": endpoint_uri}, - ) - fn = test_bucket_name + "/" + list(files)[0] - with s3.open(fn) as f: - assert f.blocksize == 20 - assert f.fill_cache is False - with s3.open(fn, block_size=40, fill_cache=True) as f: - assert f.blocksize == 40 - assert f.fill_cache is True - - -def test_tags(s3): - tagset = {"tag1": "value1", "tag2": "value2"} - fname = list(files)[0] - s3.touch(fname) - s3.put_tags(fname, tagset) - assert s3.get_tags(fname) == tagset - - # Ensure merge mode updates value of existing key and adds new one - new_tagset = {"tag2": "updatedvalue2", "tag3": "value3"} - s3.put_tags(fname, new_tagset, mode="m") - tagset.update(new_tagset) - assert s3.get_tags(fname) == tagset - - -@pytest.mark.parametrize("prefix", ["", "/dir", "/dir/subdir"]) -def test_versions(s3, prefix): - parent = versioned_bucket_name + prefix - versioned_file = parent + "/versioned_file" - - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - with s3.open(versioned_file, "wb") as fo: - fo.write(b"1") - first_version = fo.version_id - - with s3.open(versioned_file, "wb") as fo: - fo.write(b"2") - second_version = fo.version_id - - assert s3.isfile(versioned_file) - versions = s3.object_version_info(versioned_file) - assert len(versions) == 2 - assert {version["VersionId"] for version in versions} == { - first_version, - second_version, - } - - with s3.open(versioned_file) as fo: - assert fo.version_id == second_version - assert fo.read() == b"2" - - with s3.open(versioned_file, version_id=first_version) as fo: - assert fo.version_id == first_version - assert fo.read() == b"1" - - versioned_file_v1 = f"{versioned_file}?versionId={first_version}" - versioned_file_v2 = f"{versioned_file}?versionId={second_version}" - - assert s3.ls(parent) == [versioned_file] - assert set(s3.ls(parent, versions=True)) == {versioned_file_v1, versioned_file_v2} - - assert s3.exists(versioned_file_v1) - assert s3.info(versioned_file_v1) - assert s3.exists(versioned_file_v2) - assert s3.info(versioned_file_v2) - - -def test_list_versions_many(s3): - # moto doesn't actually behave in the same way that s3 does here so this doesn't test - # anything really in moto 1.2 - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - versioned_file = versioned_bucket_name + "/versioned_file2" - for i in range(1200): - with s3.open(versioned_file, "wb") as fo: - fo.write(b"1") - versions = s3.object_version_info(versioned_file) - assert len(versions) == 1200 - - -def test_fsspec_versions_multiple(s3): - """Test that the standard fsspec.core.get_fs_token_paths behaves as expected for versionId urls""" - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - versioned_file = versioned_bucket_name + "/versioned_file3" - version_lookup = {} - for i in range(20): - contents = str(i).encode() - with s3.open(versioned_file, "wb") as fo: - fo.write(contents) - version_lookup[fo.version_id] = contents - urls = [ - "s3://{}?versionId={}".format(versioned_file, version) - for version in version_lookup.keys() - ] - fs, token, paths = fsspec.core.get_fs_token_paths( - urls, storage_options=dict(client_kwargs={"endpoint_url": endpoint_uri}) - ) - assert isinstance(fs, S3FileSystem) - assert fs.version_aware - for path in paths: - with fs.open(path, "rb") as fo: - contents = fo.read() - assert contents == version_lookup[fo.version_id] - - -def test_versioned_file_fullpath(s3): - versioned_file = versioned_bucket_name + "/versioned_file_fullpath" - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - with s3.open(versioned_file, "wb") as fo: - fo.write(b"1") - # moto doesn't correctly return a versionId for a multipart upload. So we resort to this. - # version_id = fo.version_id - versions = s3.object_version_info(versioned_file) - version_ids = [version["VersionId"] for version in versions] - version_id = version_ids[0] - - with s3.open(versioned_file, "wb") as fo: - fo.write(b"2") - - file_with_version = "{}?versionId={}".format(versioned_file, version_id) - - with s3.open(file_with_version, "rb") as fo: - assert fo.version_id == version_id - assert fo.read() == b"1" - - versions = s3.object_version_info(versioned_file) - version_ids = [version["VersionId"] for version in versions] - assert set(s3.ls(versioned_bucket_name, versions=True)) == { - f"{versioned_file}?versionId={vid}" for vid in version_ids - } - - -def test_versions_unaware(s3): - versioned_file = versioned_bucket_name + "/versioned_file3" - s3 = S3FileSystem( - anon=False, version_aware=False, client_kwargs={"endpoint_url": endpoint_uri} - ) - with s3.open(versioned_file, "wb") as fo: - fo.write(b"1") - with s3.open(versioned_file, "wb") as fo: - fo.write(b"2") - - with s3.open(versioned_file) as fo: - assert fo.version_id is None - assert fo.read() == b"2" - - with pytest.raises(ValueError): - with s3.open(versioned_file, version_id="0"): - fo.read() - - -def test_versions_dircached(s3): - versioned_file = versioned_bucket_name + "/dir/versioned_file" - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - with s3.open(versioned_file, "wb") as fo: - fo.write(b"1") - first_version = fo.version_id - with s3.open(versioned_file, "wb") as fo: - fo.write(b"2") - second_version = fo.version_id - s3.find(versioned_bucket_name) - cached = s3.dircache[versioned_bucket_name + "/dir"][0] - - assert cached.get("VersionId") == second_version - assert s3.info(versioned_file) == cached - - assert ( - s3.info(versioned_file, version_id=first_version).get("VersionId") - == first_version - ) - assert ( - s3.info(versioned_file, version_id=second_version).get("VersionId") - == second_version - ) - - -def test_text_io__stream_wrapper_works(s3): - """Ensure using TextIOWrapper works.""" - s3.mkdir("bucket") - - with s3.open("bucket/file.txt", "wb") as fd: - fd.write("\u00af\\_(\u30c4)_/\u00af".encode("utf-16-le")) - - with s3.open("bucket/file.txt", "rb") as fd: - with io.TextIOWrapper(fd, "utf-16-le") as stream: - assert stream.readline() == "\u00af\\_(\u30c4)_/\u00af" - - -def test_text_io__basic(s3): - """Text mode is now allowed.""" - s3.mkdir("bucket") - - with s3.open("bucket/file.txt", "w", encoding="utf-8") as fd: - fd.write("\u00af\\_(\u30c4)_/\u00af") - - with s3.open("bucket/file.txt", "r", encoding="utf-8") as fd: - assert fd.read() == "\u00af\\_(\u30c4)_/\u00af" - - -def test_text_io__override_encoding(s3): - """Allow overriding the default text encoding.""" - s3.mkdir("bucket") - - with s3.open("bucket/file.txt", "w", encoding="ibm500") as fd: - fd.write("Hello, World!") - - with s3.open("bucket/file.txt", "r", encoding="ibm500") as fd: - assert fd.read() == "Hello, World!" - - -def test_readinto(s3): - s3.mkdir("bucket") - - with s3.open("bucket/file.txt", "wb") as fd: - fd.write(b"Hello, World!") - - contents = bytearray(15) - - with s3.open("bucket/file.txt", "rb") as fd: - assert fd.readinto(contents) == 13 - - assert contents.startswith(b"Hello, World!") - - -def test_change_defaults_only_subsequent(): - """Test for Issue #135 - - Ensure that changing the default block size doesn't affect existing file - systems that were created using that default. It should only affect file - systems created after the change. - """ - try: - S3FileSystem.cachable = False # don't reuse instances with same pars - - fs_default = S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}) - assert fs_default.default_block_size == 50 * (1024**2) - - fs_overridden = S3FileSystem( - default_block_size=64 * (1024**2), - client_kwargs={"endpoint_url": endpoint_uri}, - ) - assert fs_overridden.default_block_size == 64 * (1024**2) - - # Suppose I want all subsequent file systems to have a block size of 1 GiB - # instead of 5 MiB: - S3FileSystem.default_block_size = 1024**3 - - fs_big = S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}) - assert fs_big.default_block_size == 1024**3 - - # Test the other file systems created to see if their block sizes changed - assert fs_overridden.default_block_size == 64 * (1024**2) - assert fs_default.default_block_size == 50 * (1024**2) - finally: - S3FileSystem.default_block_size = 5 * (1024**2) - S3FileSystem.cachable = True - - -def test_cache_after_copy(s3): - # https://github.com/dask/dask/issues/5134 - s3.touch("test/afile") - assert "test/afile" in s3.ls("s3://test", False) - s3.cp("test/afile", "test/bfile") - assert "test/bfile" in s3.ls("s3://test", False) - - -def test_autocommit(s3): - auto_file = test_bucket_name + "/auto_file" - committed_file = test_bucket_name + "/commit_file" - aborted_file = test_bucket_name + "/aborted_file" - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - - def write_and_flush(path, autocommit): - with s3.open(path, "wb", autocommit=autocommit) as fo: - fo.write(b"1") - return fo - - # regular behavior - fo = write_and_flush(auto_file, autocommit=True) - assert fo.autocommit - assert s3.exists(auto_file) - - fo = write_and_flush(committed_file, autocommit=False) - assert not fo.autocommit - assert not s3.exists(committed_file) - fo.commit() - assert s3.exists(committed_file) - - fo = write_and_flush(aborted_file, autocommit=False) - assert not s3.exists(aborted_file) - fo.discard() - assert not s3.exists(aborted_file) - # Cannot commit a file that was discarded - with pytest.raises(Exception): - fo.commit() - - -def test_autocommit_mpu(s3): - """When not autocommitting we always want to use multipart uploads""" - path = test_bucket_name + "/auto_commit_with_mpu" - with s3.open(path, "wb", autocommit=False) as fo: - fo.write(b"1") - assert fo.mpu is not None - assert len(fo.parts) == 1 - - -def test_touch(s3): - # create - fn = test_bucket_name + "/touched" - assert not s3.exists(fn) - s3.touch(fn) - assert s3.exists(fn) - assert s3.size(fn) == 0 - - # truncates - with s3.open(fn, "wb") as f: - f.write(b"data") - assert s3.size(fn) == 4 - s3.touch(fn, truncate=True) - assert s3.size(fn) == 0 - - # exists error - with s3.open(fn, "wb") as f: - f.write(b"data") - assert s3.size(fn) == 4 - with pytest.raises(ValueError): - s3.touch(fn, truncate=False) - assert s3.size(fn) == 4 - - -def test_touch_versions(s3): - versioned_file = versioned_bucket_name + "/versioned_file" - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - - with s3.open(versioned_file, "wb") as fo: - fo.write(b"1") - first_version = fo.version_id - with s3.open(versioned_file, "wb") as fo: - fo.write(b"") - second_version = fo.version_id - - assert s3.isfile(versioned_file) - versions = s3.object_version_info(versioned_file) - assert len(versions) == 2 - assert {version["VersionId"] for version in versions} == { - first_version, - second_version, - } - - with s3.open(versioned_file) as fo: - assert fo.version_id == second_version - assert fo.read() == b"" - - with s3.open(versioned_file, version_id=first_version) as fo: - assert fo.version_id == first_version - assert fo.read() == b"1" - - -def test_cat_missing(s3): - fn0 = test_bucket_name + "/file0" - fn1 = test_bucket_name + "/file1" - s3.touch(fn0) - with pytest.raises(FileNotFoundError): - s3.cat([fn0, fn1], on_error="raise") - out = s3.cat([fn0, fn1], on_error="omit") - assert list(out) == [fn0] - out = s3.cat([fn0, fn1], on_error="return") - assert fn1 in out - assert isinstance(out[fn1], FileNotFoundError) - - -def test_get_directories(s3, tmpdir): - s3.touch(test_bucket_name + "/dir/dirkey/key0") - s3.touch(test_bucket_name + "/dir/dirkey/key1") - s3.touch(test_bucket_name + "/dir/dirkey") - s3.touch(test_bucket_name + "/dir/dir/key") - d = str(tmpdir) - - # Target directory with trailing slash - s3.get(test_bucket_name + "/dir/", d, recursive=True) - assert {"dirkey", "dir"} == set(os.listdir(d)) - assert ["key"] == os.listdir(os.path.join(d, "dir")) - assert {"key0", "key1"} == set(os.listdir(os.path.join(d, "dirkey"))) - - local_fs = fsspec.filesystem("file") - local_fs.rm(os.path.join(d, "dir"), recursive=True) - local_fs.rm(os.path.join(d, "dirkey"), recursive=True) - - # Target directory without trailing slash - s3.get(test_bucket_name + "/dir", d, recursive=True) - assert ["dir"] == os.listdir(d) - assert {"dirkey", "dir"} == set(os.listdir(os.path.join(d, "dir"))) - assert {"key0", "key1"} == set(os.listdir(os.path.join(d, "dir", "dirkey"))) - - -def test_seek_reads(s3): - fn = test_bucket_name + "/myfile" - with s3.open(fn, "wb") as f: - f.write(b"a" * 175627146) - with s3.open(fn, "rb", blocksize=100) as f: - f.seek(175561610) - d1 = f.read(65536) - - f.seek(4) - size = 17562198 - d2 = f.read(size) - assert len(d2) == size - - f.seek(17562288) - size = 17562187 - d3 = f.read(size) - assert len(d3) == size - - -def test_connect_many(s3): - from multiprocessing.pool import ThreadPool - - def task(i): - S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}).ls("") - return True - - pool = ThreadPool(processes=20) - out = pool.map(task, range(40)) - assert all(out) - pool.close() - pool.join() - - -def test_requester_pays(s3): - fn = test_bucket_name + "/myfile" - s3 = S3FileSystem(requester_pays=True, client_kwargs={"endpoint_url": endpoint_uri}) - assert s3.req_kw["RequestPayer"] == "requester" - s3.touch(fn) - with s3.open(fn, "rb") as f: - assert f.req_kw["RequestPayer"] == "requester" - - -def test_credentials(): - s3 = S3FileSystem( - key="foo", secret="foo", client_kwargs={"endpoint_url": endpoint_uri} - ) - assert s3.s3._request_signer._credentials.access_key == "foo" - assert s3.s3._request_signer._credentials.secret_key == "foo" - s3 = S3FileSystem( - client_kwargs={ - "aws_access_key_id": "bar", - "aws_secret_access_key": "bar", - "endpoint_url": endpoint_uri, - } - ) - assert s3.s3._request_signer._credentials.access_key == "bar" - assert s3.s3._request_signer._credentials.secret_key == "bar" - s3 = S3FileSystem( - key="foo", - client_kwargs={"aws_secret_access_key": "bar", "endpoint_url": endpoint_uri}, - ) - assert s3.s3._request_signer._credentials.access_key == "foo" - assert s3.s3._request_signer._credentials.secret_key == "bar" - s3 = S3FileSystem( - key="foobar", - secret="foobar", - client_kwargs={ - "aws_access_key_id": "foobar", - "aws_secret_access_key": "foobar", - "endpoint_url": endpoint_uri, - }, - ) - assert s3.s3._request_signer._credentials.access_key == "foobar" - assert s3.s3._request_signer._credentials.secret_key == "foobar" - with pytest.raises((TypeError, KeyError)): - # should be TypeError: arg passed twice; but in moto can be KeyError - S3FileSystem( - key="foo", - secret="foo", - client_kwargs={ - "aws_access_key_id": "bar", - "aws_secret_access_key": "bar", - "endpoint_url": endpoint_uri, - }, - ).s3 - - -def test_modified(s3): - dir_path = test_bucket_name + "/modified" - file_path = dir_path + "/file" - - # Test file - s3.touch(file_path) - modified = s3.modified(path=file_path) - assert isinstance(modified, datetime.datetime) - assert modified.tzinfo is not None - - # Test directory - with pytest.raises(IsADirectoryError): - modified = s3.modified(path=dir_path) - - # Test bucket - with pytest.raises(IsADirectoryError): - s3.modified(path=test_bucket_name) - - -def test_async_s3(s3): - async def _(): - s3 = S3FileSystem( - anon=False, - asynchronous=True, - loop=asyncio.get_running_loop(), - client_kwargs={"region_name": "eu-central-1", "endpoint_url": endpoint_uri}, - ) - - fn = test_bucket_name + "/nested/file1" - data = b"hello\n" - - # Is good with or without connect() - await s3._cat_file(fn) - - session = await s3.set_session() # creates client - - assert await s3._cat_file(fn) == data - - assert await s3._cat_file(fn, start=0, end=3) == data[:3] - - # TODO: file IO is *not* async - # with s3.open(fn, "rb") as f: - # assert f.read() == data - - try: - await session.close() - except AttributeError: - # bug in aiobotocore 1.4.1 - await session._endpoint.http_session._session.close() - - asyncio.run(_()) - - -def test_cat_ranges(s3): - data = b"a string to select from" - fn = test_bucket_name + "/parts" - s3.pipe(fn, data) - - assert s3.cat_file(fn) == data - assert s3.cat_file(fn, start=5) == data[5:] - assert s3.cat_file(fn, end=5) == data[:5] - assert s3.cat_file(fn, start=1, end=-1) == data[1:-1] - assert s3.cat_file(fn, start=-5) == data[-5:] - - -def test_async_s3_old(s3): - async def _(): - s3 = S3FileSystem( - anon=False, - asynchronous=True, - loop=asyncio.get_running_loop(), - client_kwargs={"region_name": "eu-central-1", "endpoint_url": endpoint_uri}, - ) - - fn = test_bucket_name + "/nested/file1" - data = b"hello\n" - - # Check old API - session = await s3._connect() - assert await s3._cat_file(fn, start=0, end=3) == data[:3] - try: - await session.close() - except AttributeError: - # bug in aiobotocore 1.4.1 - await session._endpoint.http_session._session.close() - - asyncio.run(_()) - - -def test_via_fsspec(s3): - import fsspec - - s3.mkdir("mine") - with fsspec.open( - "s3://mine/oi", "wb", client_kwargs={"endpoint_url": endpoint_uri} - ) as f: - f.write(b"hello") - with fsspec.open( - "s3://mine/oi", "rb", client_kwargs={"endpoint_url": endpoint_uri} - ) as f: - assert f.read() == b"hello" - - -@pytest.mark.parametrize( - ["raw_url", "expected_url", "expected_version_aware"], - [ - ( - "s3://arn:aws:s3:us-west-2:123456789012:accesspoint/abc/123.jpg", - "arn:aws:s3:us-west-2:123456789012:accesspoint/abc/123.jpg", - False, - ), - ( - "s3://arn:aws:s3:us-west-2:123456789012:accesspoint/abc/123.jpg?versionId=some_version_id", - "arn:aws:s3:us-west-2:123456789012:accesspoint/abc/123.jpg?versionId=some_version_id", - True, - ), - ( - "s3://xyz/abc/123.jpg", - "xyz/abc/123.jpg", - False, - ), - ( - "s3://xyz/abc/123.jpg?versionId=some_version_id", - "xyz/abc/123.jpg?versionId=some_version_id", - True, - ), - ], -) -def test_fsspec_url_to_fs_compatability( - s3, raw_url, expected_url, expected_version_aware -): - import fsspec - - fs, url = fsspec.url_to_fs(raw_url) - assert isinstance(fs, type(s3)) - assert fs.version_aware is expected_version_aware - assert url == expected_url - - -def test_repeat_exists(s3): - fn = "s3://" + test_bucket_name + "/file1" - s3.touch(fn) - - assert s3.exists(fn) - assert s3.exists(fn) - - -def test_with_xzarr(s3): - da = pytest.importorskip("dask.array") - xr = pytest.importorskip("xarray") - name = "sample" - - nana = xr.DataArray(da.random.random((1024, 1024, 10, 9, 1))) - - s3_path = f"{test_bucket_name}/{name}" - s3store = s3.get_mapper(s3_path) - - s3.ls("") - nana.to_dataset().to_zarr(store=s3store, mode="w", consolidated=True, compute=True) - - -def test_async_close(): - async def _(): - loop = asyncio.get_event_loop() - s3 = S3FileSystem(anon=False, asynchronous=True, loop=loop) - await s3._connect() - - fn = test_bucket_name + "/afile" - - async def async_wrapper(): - coros = [ - asyncio.ensure_future(s3._get_file(fn, "/nonexistent/a/b/c"), loop=loop) - for _ in range(3) - ] - completed, pending = await asyncio.wait(coros) - for future in completed: - with pytest.raises(OSError): - future.result() - - await asyncio.gather(*[async_wrapper() for __ in range(2)]) - try: - await s3._s3.close() - except AttributeError: - # bug in aiobotocore 1.4.1 - await s3._s3._endpoint.http_session._session.close() - - asyncio.run(_()) - - -def test_put_single(s3, tmpdir): - fn = os.path.join(str(tmpdir), "dir") - os.mkdir(fn) - open(os.path.join(fn, "abc"), "w").write("text") - - # Put with trailing slash - s3.put(fn + "/", test_bucket_name) # no-op, no files - assert not s3.exists(test_bucket_name + "/abc") - assert not s3.exists(test_bucket_name + "/dir") - - s3.put(fn + "/", test_bucket_name, recursive=True) - assert s3.cat(test_bucket_name + "/abc") == b"text" - - # Put without trailing slash - s3.put(fn, test_bucket_name, recursive=True) - assert s3.cat(test_bucket_name + "/dir/abc") == b"text" - - -def test_shallow_find(s3): - """Test that find method respects maxdepth. - - Verify that the ``find`` method respects the ``maxdepth`` parameter. With - ``maxdepth=1``, the results of ``find`` should be the same as those of - ``ls``, without returning subdirectories. See also issue 378. - """ - ls_output = s3.ls(test_bucket_name) - assert sorted(ls_output + [test_bucket_name]) == s3.find( - test_bucket_name, maxdepth=1, withdirs=True - ) - assert ls_output == s3.glob(test_bucket_name + "/*") - - -def test_multi_find(s3): - s3.mkdir("bucket/test") - s3.mkdir("bucket/test/sub") - s3.write_text("bucket/test/file.txt", "some_text") - s3.write_text("bucket/test/sub/file.txt", "some_text") - - out1 = s3.find("bucket", withdirs=True) - out2 = s3.find("bucket", withdirs=True) - assert ( - out1 - == out2 - == [ - "bucket/test", - "bucket/test/file.txt", - "bucket/test/sub", - "bucket/test/sub/file.txt", - ] - ) - out1 = s3.find("bucket", withdirs=False) - out2 = s3.find("bucket", withdirs=False) - assert out1 == out2 == ["bucket/test/file.txt", "bucket/test/sub/file.txt"] - - -def test_version_sizes(s3): - # protect against caching of incorrect version details - s3 = S3FileSystem( - anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} - ) - import gzip - - path = f"s3://{versioned_bucket_name}/test.txt.gz" - versions = [ - s3.pipe_file(path, gzip.compress(text)) - for text in ( - b"good morning!", - b"hello!", - b"hi!", - b"hello!", - ) - ] - for version in versions: - version_id = version["VersionId"] - with s3.open(path, version_id=version_id) as f: - with gzip.GzipFile(fileobj=f) as zfp: - zfp.read() - - -def test_find_no_side_effect(s3): - infos1 = s3.find(test_bucket_name, maxdepth=1, withdirs=True, detail=True) - s3.find(test_bucket_name, maxdepth=None, withdirs=True, detail=True) - infos3 = s3.find(test_bucket_name, maxdepth=1, withdirs=True, detail=True) - assert infos1.keys() == infos3.keys() - - -def test_get_file_info_with_selector(s3): - fs = s3 - base_dir = "selector-dir/" - file_a = "selector-dir/test_file_a" - file_b = "selector-dir/test_file_b" - dir_a = "selector-dir/test_dir_a" - file_c = "selector-dir/test_dir_a/test_file_c" - - try: - fs.mkdir(base_dir) - with fs.open(file_a, mode="wb"): - pass - with fs.open(file_b, mode="wb"): - pass - fs.mkdir(dir_a) - with fs.open(file_c, mode="wb"): - pass - - infos = fs.find(base_dir, maxdepth=None, withdirs=True, detail=True) - assert len(infos) == 5 # includes base_dir directory - - for info in infos.values(): - if info["name"].endswith(file_a): - assert info["type"] == "file" - elif info["name"].endswith(file_b): - assert info["type"] == "file" - elif info["name"].endswith(file_c): - assert info["type"] == "file" - elif info["name"].rstrip("/").endswith(dir_a): - assert info["type"] == "directory" - finally: - fs.rm(base_dir, recursive=True) - - -@pytest.mark.xfail( - condition=version.parse(moto.__version__) <= version.parse("1.3.16"), - reason="Moto 1.3.16 is not supporting pre-conditions.", -) -def test_raise_exception_when_file_has_changed_during_reading(s3): - test_file_name = "file1" - test_file = "s3://" + test_bucket_name + "/" + test_file_name - content1 = b"123" - content2 = b"ABCDEFG" - - boto3_client = get_boto3_client() - - def create_file(content: bytes): - boto3_client.put_object( - Bucket=test_bucket_name, Key=test_file_name, Body=content - ) - - create_file(b"123") - - with s3.open(test_file, "rb") as f: - content = f.read() - assert content == content1 - - with s3.open(test_file, "rb") as f: - create_file(content2) - with expect_errno(errno.EBUSY): - f.read() - - -def test_s3fs_etag_preserving_multipart_copy(monkeypatch, s3): - # Set this to a lower value so that we can actually - # test this without creating giant objects in memory - monkeypatch.setattr(s3fs.core, "MANAGED_COPY_THRESHOLD", 5 * 2**20) - - test_file1 = test_bucket_name + "/test/multipart-upload.txt" - test_file2 = test_bucket_name + "/test/multipart-upload-copy.txt" - - with s3.open(test_file1, "wb", block_size=5 * 2**21) as stream: - for _ in range(5): - stream.write(b"b" * (stream.blocksize + random.randrange(200))) - - file_1 = s3.info(test_file1) - - s3.copy(test_file1, test_file2) - file_2 = s3.info(test_file2) - s3.rm(test_file2) - - # normal copy() uses a block size of 5GB - assert file_1["ETag"] != file_2["ETag"] - - s3.copy(test_file1, test_file2, preserve_etag=True) - file_2 = s3.info(test_file2) - s3.rm(test_file2) - - # etag preserving copy() determines each part size for the destination - # by checking out the matching part's size on the source - assert file_1["ETag"] == file_2["ETag"] - - s3.rm(test_file1) - - -def test_sync_from_wihin_async(s3): - # if treating as sync but within an even loop, e.g., calling from jupyter; - # IO happens on dedicated thread. - async def f(): - S3FileSystem.clear_instance_cache() - s3 = S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}) - assert s3.ls(test_bucket_name) - - asyncio.run(f()) - - -def test_token_paths(s3): - fs, tok, files = fsspec.get_fs_token_paths( - "s3://" + test_bucket_name + "/*.csv", - storage_options={"client_kwargs": {"endpoint_url": endpoint_uri}}, - ) - assert files - - -def test_same_name_but_no_exact(s3): - s3.touch(test_bucket_name + "/very/similar/prefix1") - s3.touch(test_bucket_name + "/very/similar/prefix2") - s3.touch(test_bucket_name + "/very/similar/prefix3/something") - assert not s3.exists(test_bucket_name + "/very/similar/prefix") - assert not s3.exists(test_bucket_name + "/very/similar/prefi") - assert not s3.exists(test_bucket_name + "/very/similar/pref") - - assert s3.exists(test_bucket_name + "/very/similar/") - assert s3.exists(test_bucket_name + "/very/similar/prefix1") - assert s3.exists(test_bucket_name + "/very/similar/prefix2") - assert s3.exists(test_bucket_name + "/very/similar/prefix3") - assert s3.exists(test_bucket_name + "/very/similar/prefix3/") - assert s3.exists(test_bucket_name + "/very/similar/prefix3/something") - - assert not s3.exists(test_bucket_name + "/very/similar/prefix3/some") - - s3.touch(test_bucket_name + "/starting/very/similar/prefix") - - assert not s3.exists(test_bucket_name + "/starting/very/similar/prefix1") - assert not s3.exists(test_bucket_name + "/starting/very/similar/prefix2") - assert not s3.exists(test_bucket_name + "/starting/very/similar/prefix3") - assert not s3.exists(test_bucket_name + "/starting/very/similar/prefix3/") - assert not s3.exists(test_bucket_name + "/starting/very/similar/prefix3/something") - - assert s3.exists(test_bucket_name + "/starting/very/similar/prefix") - assert s3.exists(test_bucket_name + "/starting/very/similar/prefix/") - - -def test_leading_forward_slash(s3): - s3.touch(test_bucket_name + "/some/file") - assert s3.ls(test_bucket_name + "/some/") - assert s3.exists(test_bucket_name + "/some/file") - assert s3.exists("s3://" + test_bucket_name + "/some/file") - - -def test_lsdir(s3): - # https://github.com/fsspec/s3fs/issues/475 - s3.find(test_bucket_name) - - d = test_bucket_name + "/test" - assert d in s3.ls(test_bucket_name) - - -def test_rm_recursive_folder(s3): - s3.touch(test_bucket_name + "/sub/file") - s3.rm(test_bucket_name + "/sub", recursive=True) - assert not s3.exists(test_bucket_name + "/sub/file") - assert not s3.exists(test_bucket_name + "/sub") - - s3.touch(test_bucket_name + "/sub/file") - s3.touch(test_bucket_name + "/sub/") # placeholder - s3.rm(test_bucket_name + "/sub", recursive=True) - assert not s3.exists(test_bucket_name + "/sub/file") - assert not s3.exists(test_bucket_name + "/sub") - - s3.touch(test_bucket_name + "/sub/file") - s3.rm(test_bucket_name, recursive=True) - assert not s3.exists(test_bucket_name + "/sub/file") - assert not s3.exists(test_bucket_name + "/sub") - assert not s3.exists(test_bucket_name) - - -def test_copy_file_without_etag(s3, monkeypatch): - - s3.touch(test_bucket_name + "/copy_tests/file") - s3.ls(test_bucket_name + "/copy_tests/") - - [file] = s3.dircache[test_bucket_name + "/copy_tests"] - - assert file["name"] == test_bucket_name + "/copy_tests/file" - file.pop("ETag") - - assert s3.info(file["name"]).get("ETag", None) is None - - s3.cp_file(file["name"], test_bucket_name + "/copy_tests/file2") - assert s3.info(test_bucket_name + "/copy_tests/file2")["ETag"] is not None - - -def test_find_with_prefix(s3): - for cursor in range(100): - s3.touch(test_bucket_name + f"/prefixes/test_{cursor}") - - s3.touch(test_bucket_name + "/prefixes2") - assert len(s3.find(test_bucket_name + "/prefixes")) == 100 - assert len(s3.find(test_bucket_name, prefix="prefixes")) == 101 - assert len(s3.find(test_bucket_name + "/prefixes", prefix="test2_")) == 0 - - assert len(s3.find(test_bucket_name + "/prefixes/test_")) == 0 - assert len(s3.find(test_bucket_name + "/prefixes", prefix="test_")) == 100 - assert len(s3.find(test_bucket_name + "/prefixes/", prefix="test_")) == 100 - - test_1s = s3.find(test_bucket_name + "/prefixes/test_1") - assert len(test_1s) == 1 - assert test_1s[0] == test_bucket_name + "/prefixes/test_1" - - test_1s = s3.find(test_bucket_name + "/prefixes/", prefix="test_1") - assert len(test_1s) == 11 - assert test_1s == [test_bucket_name + "/prefixes/test_1"] + [ - test_bucket_name + f"/prefixes/test_{cursor}" for cursor in range(10, 20) - ] - assert s3.find(test_bucket_name + "/prefixes/") == s3.find( - test_bucket_name + "/prefixes/", prefix=None - ) - - -def test_list_after_find(s3): - before = s3.ls("s3://test") - s3.invalidate_cache("s3://test/2014-01-01.csv") - s3.find("s3://test/2014-01-01.csv") - after = s3.ls("s3://test") - assert before == after - - -def test_upload_recursive_to_bucket(s3, tmpdir): - # GH#491 - folders = [os.path.join(tmpdir, d) for d in ["outer", "outer/inner"]] - files = [os.path.join(tmpdir, f) for f in ["outer/afile", "outer/inner/bfile"]] - for d in folders: - os.mkdir(d) - for f in files: - open(f, "w").write("hello") - s3.put(folders[0], "newbucket", recursive=True) - - -def test_rm_file(s3): - target = test_bucket_name + "/to_be_removed/file" - s3.touch(target) - s3.rm_file(target) - assert not s3.exists(target) - assert not s3.exists(test_bucket_name + "/to_be_removed") - - -def test_exists_isdir(s3): - bad_path = "s3://nyc-tlc-asdfasdf/trip data/" - assert not s3.exists(bad_path) - assert not s3.isdir(bad_path) - - -def test_list_del_multipart(s3): - path = test_bucket_name + "/afile" - f = s3.open(path, "wb") - f.write(b"0" * 6 * 2**20) - - out = s3.list_multipart_uploads(test_bucket_name) - assert [_ for _ in out if _["Key"] == "afile"] - - s3.clear_multipart_uploads(test_bucket_name) - out = s3.list_multipart_uploads(test_bucket_name) - assert not [_ for _ in out if _["Key"] == "afile"] - - try: - f.close() # may error - except Exception: - pass - - -def test_split_path(s3): - buckets = [ - "my-test-bucket", - "arn:aws:s3:region:123456789012:accesspoint/my-access-point-name", - "arn:aws:s3-outposts:region:123456789012:outpost/outpost-id/bucket/my-test-bucket", - "arn:aws:s3-outposts:region:123456789012:outpost/outpost-id/accesspoint/my-accesspoint-name", - "arn:aws:s3-object-lambda:region:123456789012:accesspoint/my-lambda-object-name", - ] - test_key = "my/test/path" - for test_bucket in buckets: - bucket, key, _ = s3.split_path("s3://" + test_bucket + "/" + test_key) - assert bucket == test_bucket - assert key == test_key - - -def test_cp_directory_recursive(s3): - src = test_bucket_name + "/src" - src_file = src + "/file" - s3.mkdir(src) - s3.touch(src_file) - - target = test_bucket_name + "/target" - - # cp without slash - assert not s3.exists(target) - for loop in range(2): - s3.cp(src, target, recursive=True) - assert s3.isdir(target) - - if loop == 0: - correct = [target + "/file"] - assert s3.find(target) == correct - else: - correct = [target + "/file", target + "/src/file"] - assert sorted(s3.find(target)) == correct - - s3.rm(target, recursive=True) - - # cp with slash - assert not s3.exists(target) - for loop in range(2): - s3.cp(src + "/", target, recursive=True) - assert s3.isdir(target) - correct = [target + "/file"] - assert s3.find(target) == correct - - -def test_get_directory_recursive(s3, tmpdir): - src = test_bucket_name + "/src" - src_file = src + "/file" - s3.mkdir(src) - s3.touch(src_file) - - target = os.path.join(tmpdir, "target") - target_fs = fsspec.filesystem("file") - - # get without slash - assert not target_fs.exists(target) - for loop in range(2): - s3.get(src, target, recursive=True) - assert target_fs.isdir(target) - - if loop == 0: - assert target_fs.find(target) == [os.path.join(target, "file")] - else: - assert sorted(target_fs.find(target)) == [ - os.path.join(target, "file"), - os.path.join(target, "src", "file"), - ] - - target_fs.rm(target, recursive=True) - - # get with slash - assert not target_fs.exists(target) - for loop in range(2): - s3.get(src + "/", target, recursive=True) - assert target_fs.isdir(target) - assert target_fs.find(target) == [os.path.join(target, "file")] - - -def test_put_directory_recursive(s3, tmpdir): - src = os.path.join(tmpdir, "src") - src_file = os.path.join(src, "file") - source_fs = fsspec.filesystem("file") - source_fs.mkdir(src) - source_fs.touch(src_file) - - target = test_bucket_name + "/target" - - # put without slash - assert not s3.exists(target) - for loop in range(2): - s3.put(src, target, recursive=True) - assert s3.isdir(target) - - if loop == 0: - assert s3.find(target) == [target + "/file"] - else: - assert sorted(s3.find(target)) == [target + "/file", target + "/src/file"] - - s3.rm(target, recursive=True) - - # put with slash - assert not s3.exists(target) - for loop in range(2): - s3.put(src + "/", target, recursive=True) - assert s3.isdir(target) - assert s3.find(target) == [target + "/file"] - - -def test_cp_two_files(s3): - src = test_bucket_name + "/src" - file0 = src + "/file0" - file1 = src + "/file1" - s3.mkdir(src) - s3.touch(file0) - s3.touch(file1) - - target = test_bucket_name + "/target" - assert not s3.exists(target) - - s3.cp([file0, file1], target) - - assert s3.isdir(target) - assert sorted(s3.find(target)) == [ - target + "/file0", - target + "/file1", - ] - - -def test_async_stream(s3_base): - fn = test_bucket_name + "/target" - data = b"hello world" * 1000 - out = [] - - async def read_stream(): - fs = S3FileSystem( - anon=False, - client_kwargs={"endpoint_url": endpoint_uri}, - skip_instance_cache=True, - ) - await fs._mkdir(test_bucket_name) - await fs._pipe(fn, data) - f = await fs.open_async(fn, mode="rb", block_seze=1000) - while True: - got = await f.read(1000) - assert f.size == len(data) - assert f.tell() - if not got: - break - out.append(got) - - asyncio.run(read_stream()) - assert b"".join(out) == data - - -def test_rm_invalidates_cache(s3): - # Issue 761: rm_file does not invalidate cache - fn = test_bucket_name + "/2014-01-01.csv" - assert s3.exists(fn) - assert fn in s3.ls(test_bucket_name) - s3.rm(fn) - assert not s3.exists(fn) - assert fn not in s3.ls(test_bucket_name) - - fn = test_bucket_name + "/2014-01-02.csv" - assert s3.exists(fn) - assert fn in s3.ls(test_bucket_name) - s3.rm_file(fn) - assert not s3.exists(fn) - assert fn not in s3.ls(test_bucket_name) - - -def test_cache_handles_find_with_maxdepth(s3): - # Issue 773: invalidate_cache should not be needed when find is called with different maxdepth - base_name = test_bucket_name + "/main" - dir = base_name + "/dir1/fileB" - file = base_name + "/fileA" - s3.touch(dir) - s3.touch(file) - - # Find with maxdepth=None - f = s3.find(base_name, maxdepth=None, withdirs=False) - assert base_name + "/fileA" in f - assert base_name + "/dir1" not in f - assert base_name + "/dir1/fileB" in f - - # Find with maxdepth=1. - # Performed twice with cache invalidated between them which should give same result - for _ in range(2): - f = s3.find(base_name, maxdepth=1, withdirs=True) - assert base_name + "/fileA" in f - assert base_name + "/dir1" in f - assert base_name + "/dir1/fileB" not in f - - s3.invalidate_cache() - - -def test_bucket_versioning(s3): - s3.mkdir("maybe_versioned") - assert not s3.is_bucket_versioned("maybe_versioned") - s3.make_bucket_versioned("maybe_versioned") - assert s3.is_bucket_versioned("maybe_versioned") - s3.make_bucket_versioned("maybe_versioned", False) - assert not s3.is_bucket_versioned("maybe_versioned") - - -@pytest.fixture() -def s3_fixed_upload_size(s3): - s3_fixed = S3FileSystem( - anon=False, - client_kwargs={"endpoint_url": endpoint_uri}, - fixed_upload_size=True, - ) - s3_fixed.invalidate_cache() - yield s3_fixed - - -def test_upload_parts(s3_fixed_upload_size): - with s3_fixed_upload_size.open(a, "wb", block_size=6_000_000) as f: - f.write(b" " * 6_001_000) - assert len(f.buffer.getbuffer()) == 1000 - # check we are at the right position - assert f.tell() == 6_001_000 - # offset is introduced in fsspec.core, but never used. - # apparently it should keep offset for part that is already uploaded - assert f.offset == 6_000_000 - f.write(b" " * 6_001_000) - assert len(f.buffer.getbuffer()) == 2000 - assert f.tell() == 2 * 6_001_000 - assert f.offset == 2 * 6_000_000 - - with s3_fixed_upload_size.open(a, "r") as f: - assert len(f.read()) == 6_001_000 * 2 - - -def test_upload_part_with_prime_pads(s3_fixed_upload_size): - block = 6_000_000 - pad1, pad2 = 1013, 1019 # prime pad sizes to exclude divisibility - with s3_fixed_upload_size.open(a, "wb", block_size=block) as f: - f.write(b" " * (block + pad1)) - assert len(f.buffer.getbuffer()) == pad1 - # check we are at the right position - assert f.tell() == block + pad1 - assert f.offset == block - f.write(b" " * (block + pad2)) - assert len(f.buffer.getbuffer()) == pad1 + pad2 - assert f.tell() == 2 * block + pad1 + pad2 - assert f.offset == 2 * block - - with s3_fixed_upload_size.open(a, "r") as f: - assert len(f.read()) == 2 * block + pad1 + pad2 - - -@pytest.mark.asyncio -async def test_invalidate_cache(s3: s3fs.S3FileSystem) -> None: - await s3._call_s3("put_object", Bucket=test_bucket_name, Key="a/b.txt", Body=b"abc") - before = await s3._ls(f"{test_bucket_name}/a/") - assert sorted(before) == ["test/a/b.txt"] - await s3._pipe_file(f"{test_bucket_name}/a/c.txt", data=b"abc") - after = await s3._ls(f"{test_bucket_name}/a/") - assert sorted(after) == ["test/a/b.txt", "test/a/c.txt"] - - -@pytest.mark.xfail(reason="moto doesn't support conditional MPU") -def test_pipe_exclusive_big(s3): - chunksize = 5 * 2**20 # minimum allowed - data = b"x" * chunksize * 3 - s3.pipe(f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) - s3.pipe(f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) - with pytest.raises(FileExistsError): - s3.pipe(f"{test_bucket_name}/afile", data, mode="create", chunksize=chunksize) - assert not s3.list_multipart_uploads(test_bucket_name) - - -@pytest.mark.xfail(reason="moto doesn't support conditional MPU") -def test_put_exclusive_big(s3, tempdir): - chunksize = 5 * 2**20 # minimum allowed - data = b"x" * chunksize * 3 - fn = f"{tempdir}/afile" - with open(fn, "wb") as f: - f.write(fn) - s3.put(fn, f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) - s3.put(fn, f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) - with pytest.raises(FileExistsError): - s3.put( - fn, f"{test_bucket_name}/afile", data, mode="create", chunksize=chunksize - ) - assert not s3.list_multipart_uploads(test_bucket_name)