From 3b22b03cf3d906567238a7ae94f025be73ef8c6d Mon Sep 17 00:00:00 2001 From: joseph-sentry <136376984+joseph-sentry@users.noreply.github.com> Date: Thu, 5 Dec 2024 09:48:37 -0500 Subject: [PATCH] feat: support zstd compression in miniostorage (#405) * feat: support zstd compression in miniostorage we want to use zstd compression when compressing files for storage in object storage because it performs better than gzip which is what we were using before these changes are only being made to the minio storage service because we want to consolidate the storage service functionality into this one so both worker and API will be using this backend in the future (API was already using this one) we have to manually decompress the zstd compressed files in read_file but HTTPResponse takes care of it for us if the content encoding of the file is gzip the is_already_gzipped argument is being deprecated in favour of compression_type and is_compressed, also the ability to pass a str to write_file is being deprecated. we're keeping track of the use of these using sentry capture_message * fix: address feedback - using fget_object was unecessary since we were streaming the response data regardless - no need for all the warning logs and sentry stuff, we'll just do a 3 step migration in both API and worker (update shared supporting old behaviour, update {api,worker}, remove old behaviour support from shared) - zstandard version pinning can be more flexible - add test for content type = application/x-gzip since there was some specific handling for that in the GCP storage service * fix: update MinioStorageService - in write file: - data arg is not BinaryIO it's actually bytes | str | IO[bytes] bytes and str are self-explanatory it's just how it's being used currently, so we must support it. IO[bytes] is there to support files handles opened with "rb" that are being passed and BytesIO objects - start accepting None value for compression_type which will mean no automatic compression even if is_compressed is false - do automatic compression using gzip if is_compressed=False and compression_type="gzip" - in put_object set size = -1 and use a part_size of 20MiB. the specific part size is arbitrary. Different sources online suggest different numbers. It probably depends on the size of the underlying data we're trying to send but 20MiB seems like a good flat number to pick for now. - in read_file: - generally reorganize the function do spend less time under the try except blocks - use the CHUNK_SIZE const defined in storage/base for the amount to read from the streams - accept IO[bytes] for the file_obj since we don't use any of the BinaryIO specific methods - create GZipStreamReader that takes in a IO[bytes] and implements a read() method that reads a certain amount of bytes from the IO[bytes] compresses whatever it reads using gzip, and returns the result * fix(minio): check urllib3 version in read_file this is because if urllib3 is >= 2.0.0 and the zstd extra is installed then it is capable (and will) decode zstd encoded data when it's used in get_object so when we create the MinioStorageService we check the urllib3 version and we check if it's been installed with the zstd extra this commit also adds a test to ensure that the gzip compression and decompression used in the GzipStreamReader actually works * feat: add feature flag for new minio storage instead of doing a 0-100 launch of the new minio storage service i'd like to have it so we incrementally ship it using a feature flag. so if a repoid is passed to the get_appropriate_storage_service function and the chosen storage is minio, then it will check the use_new_minio feature to decide whether to use the new or old minio storage service as mentioned this will be decided via the repoid (to reduce the impact IF it is broken) changes had to be made to avoid circular imports in the model_utils and rollout_utils files * fix: revert changes to old minio --- pyproject.toml | 1 + shared/django_apps/rollouts/models.py | 2 +- shared/django_apps/utils/model_utils.py | 20 -- shared/django_apps/utils/rollout_utils.py | 21 ++ shared/rollouts/__init__.py | 2 +- shared/rollouts/features.py | 1 + shared/storage/__init__.py | 14 +- shared/storage/base.py | 1 + shared/storage/new_minio.py | 330 ++++++++++++++++++ tests/unit/storage/__init__.py | 0 tests/unit/storage/test_init.py | 30 ++ tests/unit/storage/test_new_minio.py | 396 ++++++++++++++++++++++ uv.lock | 45 +++ 13 files changed, 837 insertions(+), 26 deletions(-) create mode 100644 shared/django_apps/utils/rollout_utils.py create mode 100644 shared/storage/new_minio.py create mode 100644 tests/unit/storage/__init__.py create mode 100644 tests/unit/storage/test_new_minio.py diff --git a/pyproject.toml b/pyproject.toml index fcbbe631..a07b664f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "requests>=2.32.3", "sentry-sdk>=2.13.0", "sqlalchemy<2", + "zstandard>=0.23.0", ] [build-system] diff --git a/shared/django_apps/rollouts/models.py b/shared/django_apps/rollouts/models.py index 2c054657..59394003 100644 --- a/shared/django_apps/rollouts/models.py +++ b/shared/django_apps/rollouts/models.py @@ -23,7 +23,7 @@ class RolloutUniverse(models.TextChoices): def default_random_salt(): # to resolve circular dependency - from shared.django_apps.utils.model_utils import default_random_salt + from shared.django_apps.utils.rollout_utils import default_random_salt return default_random_salt() diff --git a/shared/django_apps/utils/model_utils.py b/shared/django_apps/utils/model_utils.py index efb30a14..1067cbe6 100644 --- a/shared/django_apps/utils/model_utils.py +++ b/shared/django_apps/utils/model_utils.py @@ -1,10 +1,8 @@ import json import logging -from random import choice from typing import Any, Callable, Optional from shared.api_archive.archive import ArchiveService -from shared.django_apps.rollouts.models import RolloutUniverse from shared.storage.exceptions import FileNotInStorageError from shared.utils.ReportEncoder import ReportEncoder @@ -148,24 +146,6 @@ def __set__(self, obj, value): setattr(obj, self.cached_value_property_name, value) -def default_random_salt(): - ALPHABET = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - return "".join([choice(ALPHABET) for _ in range(16)]) - - -def rollout_universe_to_override_string(rollout_universe: RolloutUniverse): - if rollout_universe == RolloutUniverse.OWNER_ID: - return "override_owner_ids" - elif rollout_universe == RolloutUniverse.REPO_ID: - return "override_repo_ids" - elif rollout_universe == RolloutUniverse.EMAIL: - return "override_emails" - elif rollout_universe == RolloutUniverse.ORG_ID: - return "override_org_ids" - else: - return "" - - # This is the place for DB trigger logic that's been moved into code # Owner def get_ownerid_if_member( diff --git a/shared/django_apps/utils/rollout_utils.py b/shared/django_apps/utils/rollout_utils.py new file mode 100644 index 00000000..f4a63561 --- /dev/null +++ b/shared/django_apps/utils/rollout_utils.py @@ -0,0 +1,21 @@ +from random import choice + +from shared.django_apps.rollouts.models import RolloutUniverse + + +def default_random_salt(): + ALPHABET = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + return "".join([choice(ALPHABET) for _ in range(16)]) + + +def rollout_universe_to_override_string(rollout_universe: RolloutUniverse): + if rollout_universe == RolloutUniverse.OWNER_ID: + return "override_owner_ids" + elif rollout_universe == RolloutUniverse.REPO_ID: + return "override_repo_ids" + elif rollout_universe == RolloutUniverse.EMAIL: + return "override_emails" + elif rollout_universe == RolloutUniverse.ORG_ID: + return "override_org_ids" + else: + return "" diff --git a/shared/rollouts/__init__.py b/shared/rollouts/__init__.py index c6290995..94b064d0 100644 --- a/shared/rollouts/__init__.py +++ b/shared/rollouts/__init__.py @@ -17,7 +17,7 @@ Platform, RolloutUniverse, ) -from shared.django_apps.utils.model_utils import rollout_universe_to_override_string +from shared.django_apps.utils.rollout_utils import rollout_universe_to_override_string log = logging.getLogger("__name__") diff --git a/shared/rollouts/features.py b/shared/rollouts/features.py index 8e566462..611c907a 100644 --- a/shared/rollouts/features.py +++ b/shared/rollouts/features.py @@ -2,3 +2,4 @@ BUNDLE_THRESHOLD_FLAG = Feature("bundle_threshold_flag") INCLUDE_GITHUB_COMMENT_ACTIONS_BY_OWNER = Feature("include_github_comment_actions") +USE_NEW_MINIO = Feature("use_new_minio") diff --git a/shared/storage/__init__.py b/shared/storage/__init__.py index edfc5388..2d905c8b 100644 --- a/shared/storage/__init__.py +++ b/shared/storage/__init__.py @@ -1,18 +1,22 @@ from shared.config import get_config +from shared.rollouts.features import USE_NEW_MINIO from shared.storage.aws import AWSStorageService from shared.storage.base import BaseStorageService from shared.storage.fallback import StorageWithFallbackService from shared.storage.gcp import GCPStorageService from shared.storage.minio import MinioStorageService +from shared.storage.new_minio import NewMinioStorageService -def get_appropriate_storage_service() -> BaseStorageService: - chosen_storage = get_config("services", "chosen_storage", default="minio") - return _get_appropriate_storage_service_given_storage(chosen_storage) +def get_appropriate_storage_service( + repoid: int | None = None, +) -> BaseStorageService: + chosen_storage: str = get_config("services", "chosen_storage", default="minio") # type: ignore + return _get_appropriate_storage_service_given_storage(chosen_storage, repoid) def _get_appropriate_storage_service_given_storage( - chosen_storage: str, + chosen_storage: str, repoid: int | None ) -> BaseStorageService: if chosen_storage == "gcp": gcp_config = get_config("services", "gcp", default={}) @@ -28,4 +32,6 @@ def _get_appropriate_storage_service_given_storage( return StorageWithFallbackService(gcp_service, aws_service) else: minio_config = get_config("services", "minio", default={}) + if repoid and USE_NEW_MINIO.check_value(repoid, default=False): + return NewMinioStorageService(minio_config) return MinioStorageService(minio_config) diff --git a/shared/storage/base.py b/shared/storage/base.py index 57bd1c5b..ac6421c7 100644 --- a/shared/storage/base.py +++ b/shared/storage/base.py @@ -1,6 +1,7 @@ from typing import BinaryIO, overload CHUNK_SIZE = 1024 * 32 +PART_SIZE = 1024 * 1024 * 20 # 20MiB # Interface class for interfacing with codecov's underlying storage layer diff --git a/shared/storage/new_minio.py b/shared/storage/new_minio.py new file mode 100644 index 00000000..da52f0c6 --- /dev/null +++ b/shared/storage/new_minio.py @@ -0,0 +1,330 @@ +import gzip +import importlib.metadata +import json +import logging +import sys +from io import BytesIO +from typing import IO, BinaryIO, Tuple, cast, overload + +import zstandard +from minio import Minio +from minio.credentials.providers import ( + ChainedProvider, + EnvAWSProvider, + EnvMinioProvider, + IamAwsProvider, +) +from minio.deleteobjects import DeleteObject +from minio.error import MinioException, S3Error +from minio.helpers import ObjectWriteResult +from urllib3 import HTTPResponse + +from shared.storage.base import CHUNK_SIZE, PART_SIZE, BaseStorageService +from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError + +log = logging.getLogger(__name__) + + +class GZipStreamReader: + def __init__(self, fileobj: IO[bytes]): + self.data = fileobj + + def read(self, size: int = -1, /) -> bytes: + curr_data = self.data.read(size) + + if not curr_data: + return b"" + + return gzip.compress(curr_data) + + +def zstd_decoded_by_default() -> bool: + try: + version = importlib.metadata.version("urllib3") + except importlib.metadata.PackageNotFoundError: + return False + + if version < "2.0.0": + return False + + distribution = importlib.metadata.metadata("urllib3") + if requires_dist := distribution.get_all("Requires-Dist"): + for req in requires_dist: + if "[zstd]" in req: + return True + + return False + + +# Service class for interfacing with codecov's underlying storage layer, minio +class NewMinioStorageService(BaseStorageService): + def __init__(self, minio_config): + self.zstd_default = zstd_decoded_by_default() + + self.minio_config = minio_config + log.debug("Connecting to minio with config %s", self.minio_config) + + self.minio_client = self.init_minio_client( + self.minio_config["host"], + self.minio_config.get("port"), + self.minio_config["access_key_id"], + self.minio_config["secret_access_key"], + self.minio_config["verify_ssl"], + self.minio_config.get("iam_auth", False), + self.minio_config["iam_endpoint"], + self.minio_config.get("region"), + ) + log.debug("Done setting up minio client") + + def client(self): + return self.minio_client if self.minio_client else None + + def init_minio_client( + self, + host: str, + port: str, + access_key: str | None = None, + secret_key: str | None = None, + verify_ssl: bool = False, + iam_auth: bool = False, + iam_endpoint: str | None = None, + region: str | None = None, + ): + """ + Initialize the minio client + + `iam_auth` adds support for IAM base authentication in a fallback pattern. + The following will be checked in order: + + * EC2 metadata -- a custom endpoint can be provided, default is None. + * Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY + * AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY + + to support backward compatibility, the iam_auth setting should be used + in the installation configuration + + Args: + host (str): The address of the host where minio lives + + port (str): The port number (as str or int should be ok) + access_key (str, optional): The access key (optional if IAM is being used) + secret_key (str, optional): The secret key (optional if IAM is being used) + verify_ssl (bool, optional): Whether minio should verify ssl + iam_auth (bool, optional): Whether to use iam_auth + iam_endpoint (str, optional): The endpoint to try to fetch EC2 metadata + region (str, optional): The region of the host where minio lives + """ + if port is not None: + host = "{}:{}".format(host, port) + + if iam_auth: + return Minio( + host, + secure=verify_ssl, + region=region, + credentials=ChainedProvider( + providers=[ + IamAwsProvider(custom_endpoint=iam_endpoint), + EnvMinioProvider(), + EnvAWSProvider(), + ] + ), + ) + return Minio( + host, + access_key=access_key, + secret_key=secret_key, + secure=verify_ssl, + region=region, + ) + + # writes the initial storage bucket to storage via minio. + def create_root_storage(self, bucket_name="archive", region="us-east-1"): + read_only_policy = { + "Statement": [ + { + "Action": ["s3:GetObject"], + "Effect": "Allow", + "Principal": {"AWS": ["*"]}, + "Resource": [f"arn:aws:s3:::{bucket_name}/*"], + } + ], + "Version": "2012-10-17", + } + try: + if not self.minio_client.bucket_exists(bucket_name): + log.debug( + "Making bucket on bucket %s on location %s", bucket_name, region + ) + self.minio_client.make_bucket(bucket_name, location=region) + log.debug("Setting policy") + self.minio_client.set_bucket_policy( + bucket_name, json.dumps(read_only_policy) + ) + log.debug("Done creating root storage") + return {"name": bucket_name} + else: + raise BucketAlreadyExistsError(f"Bucket {bucket_name} already exists") + # todo should only pass or raise + except S3Error as e: + if e.code == "BucketAlreadyOwnedByYou": + raise BucketAlreadyExistsError(f"Bucket {bucket_name} already exists") + elif e.code == "BucketAlreadyExists": + pass + raise + except MinioException: + raise + + # Writes a file to storage will gzip if not compressed already + def write_file( + self, + bucket_name: str, + path: str, + data: IO[bytes] | str | bytes, + reduced_redundancy: bool = False, + *, + is_already_gzipped: bool = False, # deprecated + is_compressed: bool = False, + compression_type: str | None = "zstd", + ) -> ObjectWriteResult: + if isinstance(data, str): + data = BytesIO(data.encode()) + elif isinstance(data, (bytes, bytearray, memoryview)): + data = BytesIO(data) + + if is_already_gzipped: + is_compressed = True + compression_type = "gzip" + + if is_compressed: + result = data + else: + if compression_type == "zstd": + cctx = zstandard.ZstdCompressor() + result = cctx.stream_reader(data) + + elif compression_type == "gzip": + result = GZipStreamReader(data) + + else: + result = data + + headers: dict[str, str | list[str] | Tuple[str]] = {} + + if compression_type: + headers["Content-Encoding"] = compression_type + + if reduced_redundancy: + headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" + + # it's safe to do a BinaryIO cast here because we know that put_object only uses a function of the shape: + # read(self, size: int = -1, /) -> bytes + # GZipStreamReader implements this (we did it ourselves) + # ZstdCompressionReader implements read(): https://github.com/indygreg/python-zstandard/blob/12a80fac558820adf43e6f16206120685b9eb880/zstandard/__init__.pyi#L233C5-L233C49 + # BytesIO implements read(): https://docs.python.org/3/library/io.html#io.BufferedReader.read + # IO[bytes] implements read(): https://github.com/python/cpython/blob/3.13/Lib/typing.py#L3502 + + return self.minio_client.put_object( + bucket_name, + path, + cast(BinaryIO, result), + -1, + metadata=headers, + content_type="text/plain", + part_size=PART_SIZE, + ) + + @overload + def read_file( + self, bucket_name: str, path: str, file_obj: None = None + ) -> bytes: ... + + @overload + def read_file(self, bucket_name: str, path: str, file_obj: IO[bytes]) -> None: ... + + def read_file( + self, bucket_name: str, path: str, file_obj: IO[bytes] | None = None + ) -> bytes | None: + headers: dict[str, str | list[str] | Tuple[str]] = { + "Accept-Encoding": "gzip, zstd" + } + try: + response = cast( + HTTPResponse, + self.minio_client.get_object( # this returns an HTTPResponse + bucket_name, path, request_headers=headers + ), + ) + except S3Error as e: + if e.code == "NoSuchKey": + raise FileNotInStorageError( + f"File {path} does not exist in {bucket_name}" + ) + raise e + if response.headers: + content_encoding = response.headers.get("Content-Encoding", None) + if not self.zstd_default and content_encoding == "zstd": + # we have to manually decompress zstandard compressed data + cctx = zstandard.ZstdDecompressor() + # if the object passed to this has a read method then that's + # all this object will ever need, since it will just call read + # and get the bytes object resulting from it then compress that + # HTTPResponse + reader = cctx.stream_reader(cast(IO[bytes], response)) + else: + reader = response + else: + reader = response + + if file_obj: + file_obj.seek(0) + while chunk := reader.read(CHUNK_SIZE): + file_obj.write(chunk) + response.close() + response.release_conn() + return None + else: + res = BytesIO() + while chunk := reader.read(CHUNK_SIZE): + res.write(chunk) + response.close() + response.release_conn() + return res.getvalue() + + """ + Deletes file url in specified bucket. + Return true on successful + deletion, returns a ResponseError otherwise. + """ + + def delete_file(self, bucket_name, url): + try: + # delete a file given a bucket name and a url + self.minio_client.remove_object(bucket_name, url) + return True + except MinioException: + raise + + def delete_files(self, bucket_name, urls=[]): + try: + for del_err in self.minio_client.remove_objects( + bucket_name, [DeleteObject(url) for url in urls] + ): + print("Deletion error: {}".format(del_err)) + return [True] * len(urls) + except MinioException: + raise + + def list_folder_contents(self, bucket_name, prefix=None, recursive=True): + return ( + self.object_to_dict(b) + for b in self.minio_client.list_objects(bucket_name, prefix, recursive) + ) + + def object_to_dict(self, obj): + return {"name": obj.object_name, "size": obj.size} + + # TODO remove this function -- just using it for output during testing. + def write(self, string, silence=False): + if not silence: + sys.stdout.write((string or "") + "\n") diff --git a/tests/unit/storage/__init__.py b/tests/unit/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/storage/test_init.py b/tests/unit/storage/test_init.py index c7ae6298..e9a62501 100644 --- a/tests/unit/storage/test_init.py +++ b/tests/unit/storage/test_init.py @@ -1,8 +1,10 @@ +from shared.rollouts.features import USE_NEW_MINIO from shared.storage import get_appropriate_storage_service from shared.storage.aws import AWSStorageService from shared.storage.fallback import StorageWithFallbackService from shared.storage.gcp import GCPStorageService from shared.storage.minio import MinioStorageService +from shared.storage.new_minio import NewMinioStorageService fake_private_key = """-----BEGIN PRIVATE KEY----- MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCnND/Neha4aNJ6 @@ -110,3 +112,31 @@ def test_get_appropriate_storage_service_minio(self, mock_configuration): res = get_appropriate_storage_service() assert isinstance(res, MinioStorageService) assert res.minio_config == minio_config + + def test_get_appropriate_storage_service_new_minio( + self, mock_configuration, mocker + ): + mock_configuration.params["services"] = { + "chosen_storage": "minio", + "gcp": gcp_config, + "aws": aws_config, + "minio": minio_config, + } + mocker.patch.object(USE_NEW_MINIO, "check_value", return_value=True) + res = get_appropriate_storage_service(repoid=123) + assert isinstance(res, NewMinioStorageService) + assert res.minio_config == minio_config + + def test_get_appropriate_storage_service_new_minio_false( + self, mock_configuration, mocker + ): + mock_configuration.params["services"] = { + "chosen_storage": "minio", + "gcp": gcp_config, + "aws": aws_config, + "minio": minio_config, + } + mocker.patch.object(USE_NEW_MINIO, "check_value", return_value=False) + res = get_appropriate_storage_service(repoid=123) + assert isinstance(res, MinioStorageService) + assert res.minio_config == minio_config diff --git a/tests/unit/storage/test_new_minio.py b/tests/unit/storage/test_new_minio.py new file mode 100644 index 00000000..2d3057a9 --- /dev/null +++ b/tests/unit/storage/test_new_minio.py @@ -0,0 +1,396 @@ +import gzip +import tempfile +from io import BytesIO +from uuid import uuid4 + +import pytest +import zstandard + +from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError +from shared.storage.new_minio import NewMinioStorageService, zstd_decoded_by_default + +BUCKET_NAME = "archivetest" + + +def test_zstd_by_default(): + assert not zstd_decoded_by_default() + + +def test_gzip_stream_compression(): + data = "lorem ipsum dolor test_write_then_read_file á" + + split_data = [data[i : i + 5] for i in range(0, len(data), 5)] + + compressed_pieces: list[bytes] = [ + gzip.compress(piece.encode()) for piece in split_data + ] + + assert gzip.decompress(b"".join(compressed_pieces)) == data.encode() + + +def make_storage() -> NewMinioStorageService: + return NewMinioStorageService( + { + "access_key_id": "codecov-default-key", + "secret_access_key": "codecov-default-secret", + "verify_ssl": False, + "host": "minio", + "port": "9000", + "iam_auth": False, + "iam_endpoint": None, + } + ) + + +def ensure_bucket(storage: NewMinioStorageService): + try: + storage.create_root_storage(BUCKET_NAME) + except Exception: + pass + + +def test_create_bucket(): + storage = make_storage() + bucket_name = uuid4().hex + + res = storage.create_root_storage(bucket_name, region="") + assert res == {"name": bucket_name} + + +def test_create_bucket_already_exists(): + storage = make_storage() + bucket_name = uuid4().hex + + storage.create_root_storage(bucket_name) + with pytest.raises(BucketAlreadyExistsError): + storage.create_root_storage(bucket_name) + + +def test_write_then_read_file(): + storage = make_storage() + path = f"test_write_then_read_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + writing_result = storage.write_file(BUCKET_NAME, path, data) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == data + + +def test_write_then_read_file_already_gzipped(): + storage = make_storage() + path = f"test_write_then_read_file_already_gzipped/{uuid4().hex}" + data = BytesIO( + gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + writing_result = storage.write_file( + BUCKET_NAME, path, data, is_already_gzipped=True + ) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_already_zstd(): + storage = make_storage() + path = f"test_write_then_read_file_already_zstd/{uuid4().hex}" + data = BytesIO( + zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + writing_result = storage.write_file( + BUCKET_NAME, path, data, compression_type="zstd", is_compressed=True + ) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj(): + storage = make_storage() + path = f"test_write_then_read_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file(BUCKET_NAME, path, f) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_gzip(): + storage = make_storage() + path = f"test_write_then_read_file_gzip/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, compression_type="gzip" + ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_no_compression(): + storage = make_storage() + path = f"test_write_then_read_file_no_compression/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file(BUCKET_NAME, path, f, compression_type=None) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_x_gzip(): + storage = make_storage() + path = f"test_write_then_read_file_obj_x_gzip/{uuid4().hex}" + compressed = gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + outsize = len(compressed) + data = BytesIO(compressed) + + ensure_bucket(storage) + + headers = {"Content-Encoding": "gzip"} + storage.minio_client.put_object( + BUCKET_NAME, + path, + data, + content_type="application/x-gzip", + metadata=headers, + length=outsize, + ) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj_already_gzipped(): + storage = make_storage() + path = f"test_write_then_read_file_obj_already_gzipped/{uuid4().hex}" + data = BytesIO( + gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + f.write(data.getvalue()) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, is_already_gzipped=True + ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj_already_zstd(): + storage = make_storage() + path = f"test_write_then_read_file_obj_already_zstd/{uuid4().hex}" + data = BytesIO( + zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + f.write(data.getvalue()) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, is_compressed=True, compression_type="zstd" + ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_read_file_does_not_exist(): + storage = make_storage() + path = f"test_read_file_does_not_exist/{uuid4().hex}" + + ensure_bucket(storage) + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, path) + + +def test_write_then_delete_file(): + storage = make_storage() + path = f"test_write_then_delete_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_delete_file á" + + ensure_bucket(storage) + writing_result = storage.write_file(BUCKET_NAME, path, data) + assert writing_result + + deletion_result = storage.delete_file(BUCKET_NAME, path) + assert deletion_result is True + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, path) + + +def test_batch_delete_files(): + storage = make_storage() + path = f"test_batch_delete_files/{uuid4().hex}" + path_1 = f"{path}/result_1.txt" + path_2 = f"{path}/result_2.txt" + path_3 = f"{path}/result_3.txt" + paths = [path_1, path_2, path_3] + data = "lorem ipsum dolor test_batch_delete_files á" + + ensure_bucket(storage) + storage.write_file(BUCKET_NAME, path_1, data) + storage.write_file(BUCKET_NAME, path_3, data) + + deletion_result = storage.delete_files(BUCKET_NAME, paths) + assert deletion_result == [True, True, True] + for p in paths: + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, p) + + +def test_list_folder_contents(): + storage = make_storage() + path = f"test_list_folder_contents/{uuid4().hex}" + path_1 = "/result_1.txt" + path_2 = "/result_2.txt" + path_3 = "/result_3.txt" + path_4 = "/x1/result_1.txt" + path_5 = "/x1/result_2.txt" + path_6 = "/x1/result_3.txt" + all_paths = [path_1, path_2, path_3, path_4, path_5, path_6] + + ensure_bucket(storage) + for i, p in enumerate(all_paths): + data = f"Lorem ipsum on file {p} for {i * 'po'}" + storage.write_file(BUCKET_NAME, f"{path}{p}", data) + + results_1 = sorted( + storage.list_folder_contents(BUCKET_NAME, path), + key=lambda x: x["name"], + ) + # NOTE: the `size` here is actually the compressed (currently gzip) size + assert results_1 == [ + {"name": f"{path}{path_1}", "size": 47}, + {"name": f"{path}{path_2}", "size": 49}, + {"name": f"{path}{path_3}", "size": 51}, + {"name": f"{path}{path_4}", "size": 56}, + {"name": f"{path}{path_5}", "size": 58}, + {"name": f"{path}{path_6}", "size": 60}, + ] + + results_2 = sorted( + storage.list_folder_contents(BUCKET_NAME, f"{path}/x1"), + key=lambda x: x["name"], + ) + assert results_2 == [ + {"name": f"{path}{path_4}", "size": 56}, + {"name": f"{path}{path_5}", "size": 58}, + {"name": f"{path}{path_6}", "size": 60}, + ] + + +def test_minio_without_ports(mocker): + mocked_minio_client = mocker.patch("shared.storage.new_minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "iam_auth": True, + "iam_endpoint": None, + } + + storage = NewMinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports", credentials=mocker.ANY, secure=False, region=None + ) + + +def test_minio_with_ports(mocker): + mocked_minio_client = mocker.patch("shared.storage.new_minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + } + + storage = NewMinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", credentials=mocker.ANY, secure=False, region=None + ) + + +def test_minio_with_region(mocker): + mocked_minio_client = mocker.patch("shared.storage.new_minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + "region": "example", + } + + storage = NewMinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", + credentials=mocker.ANY, + secure=False, + region="example", + ) diff --git a/uv.lock b/uv.lock index e19cd618..37332381 100644 --- a/uv.lock +++ b/uv.lock @@ -1430,6 +1430,7 @@ dependencies = [ { name = "requests" }, { name = "sentry-sdk" }, { name = "sqlalchemy" }, + { name = "zstandard" }, ] [package.dev-dependencies] @@ -1483,6 +1484,7 @@ requires-dist = [ { name = "requests", specifier = ">=2.32.3" }, { name = "sentry-sdk", specifier = ">=2.13.0" }, { name = "sqlalchemy", specifier = "<2" }, + { name = "zstandard", specifier = ">=0.23.0" }, ] [package.metadata.requires-dev] @@ -1686,3 +1688,46 @@ sdist = { url = "https://files.pythonhosted.org/packages/3f/50/bad581df71744867e wheels = [ { url = "https://files.pythonhosted.org/packages/b7/1a/7e4798e9339adc931158c9d69ecc34f5e6791489d469f5e50ec15e35f458/zipp-3.21.0-py3-none-any.whl", hash = "sha256:ac1bbe05fd2991f160ebce24ffbac5f6d11d83dc90891255885223d42b3cd931", size = 9630 }, ] + +[[package]] +name = "zstandard" +version = "0.23.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cffi", marker = "platform_python_implementation == 'PyPy'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ed/f6/2ac0287b442160a89d726b17a9184a4c615bb5237db763791a7fd16d9df1/zstandard-0.23.0.tar.gz", hash = "sha256:b2d8c62d08e7255f68f7a740bae85b3c9b8e5466baa9cbf7f57f1cde0ac6bc09", size = 681701 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/83/f23338c963bd9de687d47bf32efe9fd30164e722ba27fb59df33e6b1719b/zstandard-0.23.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:b4567955a6bc1b20e9c31612e615af6b53733491aeaa19a6b3b37f3b65477094", size = 788713 }, + { url = "https://files.pythonhosted.org/packages/5b/b3/1a028f6750fd9227ee0b937a278a434ab7f7fdc3066c3173f64366fe2466/zstandard-0.23.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1e172f57cd78c20f13a3415cc8dfe24bf388614324d25539146594c16d78fcc8", size = 633459 }, + { url = "https://files.pythonhosted.org/packages/26/af/36d89aae0c1f95a0a98e50711bc5d92c144939efc1f81a2fcd3e78d7f4c1/zstandard-0.23.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b0e166f698c5a3e914947388c162be2583e0c638a4703fc6a543e23a88dea3c1", size = 4945707 }, + { url = "https://files.pythonhosted.org/packages/cd/2e/2051f5c772f4dfc0aae3741d5fc72c3dcfe3aaeb461cc231668a4db1ce14/zstandard-0.23.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:12a289832e520c6bd4dcaad68e944b86da3bad0d339ef7989fb7e88f92e96072", size = 5306545 }, + { url = "https://files.pythonhosted.org/packages/0a/9e/a11c97b087f89cab030fa71206963090d2fecd8eb83e67bb8f3ffb84c024/zstandard-0.23.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d50d31bfedd53a928fed6707b15a8dbeef011bb6366297cc435accc888b27c20", size = 5337533 }, + { url = "https://files.pythonhosted.org/packages/fc/79/edeb217c57fe1bf16d890aa91a1c2c96b28c07b46afed54a5dcf310c3f6f/zstandard-0.23.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:72c68dda124a1a138340fb62fa21b9bf4848437d9ca60bd35db36f2d3345f373", size = 5436510 }, + { url = "https://files.pythonhosted.org/packages/81/4f/c21383d97cb7a422ddf1ae824b53ce4b51063d0eeb2afa757eb40804a8ef/zstandard-0.23.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:53dd9d5e3d29f95acd5de6802e909ada8d8d8cfa37a3ac64836f3bc4bc5512db", size = 4859973 }, + { url = "https://files.pythonhosted.org/packages/ab/15/08d22e87753304405ccac8be2493a495f529edd81d39a0870621462276ef/zstandard-0.23.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:6a41c120c3dbc0d81a8e8adc73312d668cd34acd7725f036992b1b72d22c1772", size = 4936968 }, + { url = "https://files.pythonhosted.org/packages/eb/fa/f3670a597949fe7dcf38119a39f7da49a8a84a6f0b1a2e46b2f71a0ab83f/zstandard-0.23.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:40b33d93c6eddf02d2c19f5773196068d875c41ca25730e8288e9b672897c105", size = 5467179 }, + { url = "https://files.pythonhosted.org/packages/4e/a9/dad2ab22020211e380adc477a1dbf9f109b1f8d94c614944843e20dc2a99/zstandard-0.23.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:9206649ec587e6b02bd124fb7799b86cddec350f6f6c14bc82a2b70183e708ba", size = 4848577 }, + { url = "https://files.pythonhosted.org/packages/08/03/dd28b4484b0770f1e23478413e01bee476ae8227bbc81561f9c329e12564/zstandard-0.23.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:76e79bc28a65f467e0409098fa2c4376931fd3207fbeb6b956c7c476d53746dd", size = 4693899 }, + { url = "https://files.pythonhosted.org/packages/2b/64/3da7497eb635d025841e958bcd66a86117ae320c3b14b0ae86e9e8627518/zstandard-0.23.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:66b689c107857eceabf2cf3d3fc699c3c0fe8ccd18df2219d978c0283e4c508a", size = 5199964 }, + { url = "https://files.pythonhosted.org/packages/43/a4/d82decbab158a0e8a6ebb7fc98bc4d903266bce85b6e9aaedea1d288338c/zstandard-0.23.0-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:9c236e635582742fee16603042553d276cca506e824fa2e6489db04039521e90", size = 5655398 }, + { url = "https://files.pythonhosted.org/packages/f2/61/ac78a1263bc83a5cf29e7458b77a568eda5a8f81980691bbc6eb6a0d45cc/zstandard-0.23.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a8fffdbd9d1408006baaf02f1068d7dd1f016c6bcb7538682622c556e7b68e35", size = 5191313 }, + { url = "https://files.pythonhosted.org/packages/e7/54/967c478314e16af5baf849b6ee9d6ea724ae5b100eb506011f045d3d4e16/zstandard-0.23.0-cp312-cp312-win32.whl", hash = "sha256:dc1d33abb8a0d754ea4763bad944fd965d3d95b5baef6b121c0c9013eaf1907d", size = 430877 }, + { url = "https://files.pythonhosted.org/packages/75/37/872d74bd7739639c4553bf94c84af7d54d8211b626b352bc57f0fd8d1e3f/zstandard-0.23.0-cp312-cp312-win_amd64.whl", hash = "sha256:64585e1dba664dc67c7cdabd56c1e5685233fbb1fc1966cfba2a340ec0dfff7b", size = 495595 }, + { url = "https://files.pythonhosted.org/packages/80/f1/8386f3f7c10261fe85fbc2c012fdb3d4db793b921c9abcc995d8da1b7a80/zstandard-0.23.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:576856e8594e6649aee06ddbfc738fec6a834f7c85bf7cadd1c53d4a58186ef9", size = 788975 }, + { url = "https://files.pythonhosted.org/packages/16/e8/cbf01077550b3e5dc86089035ff8f6fbbb312bc0983757c2d1117ebba242/zstandard-0.23.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:38302b78a850ff82656beaddeb0bb989a0322a8bbb1bf1ab10c17506681d772a", size = 633448 }, + { url = "https://files.pythonhosted.org/packages/06/27/4a1b4c267c29a464a161aeb2589aff212b4db653a1d96bffe3598f3f0d22/zstandard-0.23.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d2240ddc86b74966c34554c49d00eaafa8200a18d3a5b6ffbf7da63b11d74ee2", size = 4945269 }, + { url = "https://files.pythonhosted.org/packages/7c/64/d99261cc57afd9ae65b707e38045ed8269fbdae73544fd2e4a4d50d0ed83/zstandard-0.23.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2ef230a8fd217a2015bc91b74f6b3b7d6522ba48be29ad4ea0ca3a3775bf7dd5", size = 5306228 }, + { url = "https://files.pythonhosted.org/packages/7a/cf/27b74c6f22541f0263016a0fd6369b1b7818941de639215c84e4e94b2a1c/zstandard-0.23.0-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:774d45b1fac1461f48698a9d4b5fa19a69d47ece02fa469825b442263f04021f", size = 5336891 }, + { url = "https://files.pythonhosted.org/packages/fa/18/89ac62eac46b69948bf35fcd90d37103f38722968e2981f752d69081ec4d/zstandard-0.23.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6f77fa49079891a4aab203d0b1744acc85577ed16d767b52fc089d83faf8d8ed", size = 5436310 }, + { url = "https://files.pythonhosted.org/packages/a8/a8/5ca5328ee568a873f5118d5b5f70d1f36c6387716efe2e369010289a5738/zstandard-0.23.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ac184f87ff521f4840e6ea0b10c0ec90c6b1dcd0bad2f1e4a9a1b4fa177982ea", size = 4859912 }, + { url = "https://files.pythonhosted.org/packages/ea/ca/3781059c95fd0868658b1cf0440edd832b942f84ae60685d0cfdb808bca1/zstandard-0.23.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:c363b53e257246a954ebc7c488304b5592b9c53fbe74d03bc1c64dda153fb847", size = 4936946 }, + { url = "https://files.pythonhosted.org/packages/ce/11/41a58986f809532742c2b832c53b74ba0e0a5dae7e8ab4642bf5876f35de/zstandard-0.23.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:e7792606d606c8df5277c32ccb58f29b9b8603bf83b48639b7aedf6df4fe8171", size = 5466994 }, + { url = "https://files.pythonhosted.org/packages/83/e3/97d84fe95edd38d7053af05159465d298c8b20cebe9ccb3d26783faa9094/zstandard-0.23.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a0817825b900fcd43ac5d05b8b3079937073d2b1ff9cf89427590718b70dd840", size = 4848681 }, + { url = "https://files.pythonhosted.org/packages/6e/99/cb1e63e931de15c88af26085e3f2d9af9ce53ccafac73b6e48418fd5a6e6/zstandard-0.23.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:9da6bc32faac9a293ddfdcb9108d4b20416219461e4ec64dfea8383cac186690", size = 4694239 }, + { url = "https://files.pythonhosted.org/packages/ab/50/b1e703016eebbc6501fc92f34db7b1c68e54e567ef39e6e59cf5fb6f2ec0/zstandard-0.23.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:fd7699e8fd9969f455ef2926221e0233f81a2542921471382e77a9e2f2b57f4b", size = 5200149 }, + { url = "https://files.pythonhosted.org/packages/aa/e0/932388630aaba70197c78bdb10cce2c91fae01a7e553b76ce85471aec690/zstandard-0.23.0-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:d477ed829077cd945b01fc3115edd132c47e6540ddcd96ca169facff28173057", size = 5655392 }, + { url = "https://files.pythonhosted.org/packages/02/90/2633473864f67a15526324b007a9f96c96f56d5f32ef2a56cc12f9548723/zstandard-0.23.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa6ce8b52c5987b3e34d5674b0ab529a4602b632ebab0a93b07bfb4dfc8f8a33", size = 5191299 }, + { url = "https://files.pythonhosted.org/packages/b0/4c/315ca5c32da7e2dc3455f3b2caee5c8c2246074a61aac6ec3378a97b7136/zstandard-0.23.0-cp313-cp313-win32.whl", hash = "sha256:a9b07268d0c3ca5c170a385a0ab9fb7fdd9f5fd866be004c4ea39e44edce47dd", size = 430862 }, + { url = "https://files.pythonhosted.org/packages/a2/bf/c6aaba098e2d04781e8f4f7c0ba3c7aa73d00e4c436bcc0cf059a66691d1/zstandard-0.23.0-cp313-cp313-win_amd64.whl", hash = "sha256:f3513916e8c645d0610815c257cbfd3242adfd5c4cfa78be514e5a3ebb42a41b", size = 495578 }, +]