From 1ad7fa7736015c3de8622f3507c48161cf1dd0dc Mon Sep 17 00:00:00 2001 From: joseph-sentry Date: Tue, 26 Nov 2024 12:37:55 -0500 Subject: [PATCH 1/6] feat: support zstd compression in miniostorage we want to use zstd compression when compressing files for storage in object storage because it performs better than gzip which is what we were using before these changes are only being made to the minio storage service because we want to consolidate the storage service functionality into this one so both worker and API will be using this backend in the future (API was already using this one) we have to manually decompress the zstd compressed files in read_file but HTTPResponse takes care of it for us if the content encoding of the file is gzip the is_already_gzipped argument is being deprecated in favour of compression_type and is_compressed, also the ability to pass a str to write_file is being deprecated. we're keeping track of the use of these using sentry capture_message --- pyproject.toml | 1 + shared/storage/minio.py | 179 +++++++---- tests/unit/storage/__init__.py | 0 tests/unit/storage/test_minio.py | 503 ++++++++++++++++++------------- uv.lock | 45 +++ 5 files changed, 465 insertions(+), 263 deletions(-) create mode 100644 tests/unit/storage/__init__.py diff --git a/pyproject.toml b/pyproject.toml index 2f271caf..cb0aaf90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "requests>=2.32.3", "sentry-sdk>=2.18.0", "sqlalchemy<2", + "zstandard==0.23.0", ] [build-system] diff --git a/shared/storage/minio.py b/shared/storage/minio.py index bb65c5d5..bb000241 100644 --- a/shared/storage/minio.py +++ b/shared/storage/minio.py @@ -1,13 +1,15 @@ -import gzip +import datetime import json import logging import os -import shutil import sys import tempfile from io import BytesIO -from typing import BinaryIO, overload +from typing import BinaryIO, Protocol, overload +import sentry_sdk +import sentry_sdk.scope +import zstandard from minio import Minio from minio.credentials import ( ChainedProvider, @@ -17,13 +19,29 @@ ) from minio.deleteobjects import DeleteObject from minio.error import MinioException, S3Error +from urllib3.response import HTTPResponse -from shared.storage.base import CHUNK_SIZE, BaseStorageService +from shared.storage.base import BaseStorageService from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError log = logging.getLogger(__name__) +class Readable(Protocol): + def read(self, size: int = -1) -> bytes: ... + + +class GetObjectToFileResponse(Protocol): + bucket_name: str + object_name: str + last_modified: datetime.datetime | None + etag: str + size: int + content_type: str | None + metadata: dict[str, str] + version_id: str | None + + # Service class for interfacing with codecov's underlying storage layer, minio class MinioStorageService(BaseStorageService): def __init__(self, minio_config): @@ -57,20 +75,21 @@ def init_minio_client( region: str = None, ): """ - Initialize the minio client + Initialize the minio client `iam_auth` adds support for IAM base authentication in a fallback pattern. - The following will be checked in order: + The following will be checked in order: * EC2 metadata -- a custom endpoint can be provided, default is None. - * AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY * Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY + * AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY - to support backward compatibility, the iam_auth setting should be used in the installation - configuration + to support backward compatibility, the iam_auth setting should be used + in the installation configuration Args: host (str): The address of the host where minio lives + port (str): The port number (as str or int should be ok) access_key (str, optional): The access key (optional if IAM is being used) secret_key (str, optional): The secret key (optional if IAM is being used) @@ -143,50 +162,64 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"): # Writes a file to storage will gzip if not compressed already def write_file( self, - bucket_name, - path, - data, - reduced_redundancy=False, + bucket_name: str, + path: str, + data: BinaryIO, + reduced_redundancy: bool = False, *, - is_already_gzipped: bool = False, + is_already_gzipped: bool = False, # deprecated + is_compressed: bool = False, + compression_type: str = "zstd", ): + if is_already_gzipped: + log.warning( + "is_already_gzipped is deprecated and will be removed in a future version, instead compress using zstd and use the is_already_zstd_compressed argument" + ) + with sentry_sdk.new_scope() as scope: + scope.set_extra("bucket_name", bucket_name) + scope.set_extra("path", path) + sentry_sdk.capture_message("is_already_gzipped passed with True") + is_compressed = True + compression_type = "gzip" + if isinstance(data, str): - data = data.encode() + log.warning( + "passing data as a str to write_file is deprecated and will be removed in a future version, instead pass an object compliant with the BinaryIO type" + ) + with sentry_sdk.new_scope() as scope: + scope.set_extra("bucket_name", bucket_name) + scope.set_extra("path", path) + sentry_sdk.capture_message("write_file data argument passed as str") - if isinstance(data, bytes): - if not is_already_gzipped: - out = BytesIO() - with gzip.GzipFile(fileobj=out, mode="w", compresslevel=9) as gz: - gz.write(data) - else: - out = BytesIO(data) - - # get file size - out.seek(0, os.SEEK_END) - out_size = out.tell() - else: - # data is already a file-like object - if not is_already_gzipped: - _, filename = tempfile.mkstemp() - with gzip.open(filename, "wb") as f: - shutil.copyfileobj(data, f) - out = open(filename, "rb") - else: - out = data + data = BytesIO(data.encode()) - out_size = os.stat(filename).st_size + if not is_compressed: + cctx = zstandard.ZstdCompressor() + reader: zstandard.ZstdCompressionReader = cctx.stream_reader(data) + _, filepath = tempfile.mkstemp() + with open(filepath, "wb") as f: + while chunk := reader.read(16384): + f.write(chunk) + data = open(filepath, "rb") try: - # reset pos for minio reading. - out.seek(0) + out_size = data.seek(0, os.SEEK_END) + data.seek(0) + + if compression_type == "gzip": + content_encoding = "gzip" + elif compression_type == "zstd": + content_encoding = "zstd" + + headers = {"Content-Encoding": content_encoding} - headers = {"Content-Encoding": "gzip"} if reduced_redundancy: headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" + self.minio_client.put_object( bucket_name, path, - out, + data, out_size, metadata=headers, content_type="text/plain", @@ -195,25 +228,65 @@ def write_file( except MinioException: raise + finally: + if not is_compressed: + data.close() + os.unlink(filepath) @overload - def read_file(self, bucket_name: str, path: str) -> bytes: ... + def read_file( + self, bucket_name: str, path: str, file_obj: None = None + ) -> bytes: ... @overload - def read_file(self, bucket_name: str, path: str, file_obj: BinaryIO) -> None: ... + def read_file(self, bucket_name: str, path: str, file_obj: str) -> None: ... def read_file(self, bucket_name, path, file_obj=None) -> bytes | None: try: - res = self.minio_client.get_object(bucket_name, path) - if file_obj is None: - data = BytesIO() - for d in res.stream(CHUNK_SIZE): - data.write(d) - data.seek(0) - return data.getvalue() + headers = {"Accept-Encoding": "gzip, zstd"} + if file_obj: + _, tmpfilepath = tempfile.mkstemp() + to_file_response: GetObjectToFileResponse = ( + self.minio_client.fget_object( + bucket_name, path, tmpfilepath, request_headers=headers + ) + ) + data = open(tmpfilepath, "rb") + content_encoding = to_file_response.metadata.get( + "Content-Encoding", None + ) + else: + response: HTTPResponse = self.minio_client.get_object( + bucket_name, path, request_headers=headers + ) + data = response + content_encoding = response.headers.get("Content-Encoding", None) + + reader: Readable | None = None + if content_encoding == "gzip": + # HTTPResponse automatically decodes gzipped data for us + # minio_client.fget_object uses HTTPResponse under the hood, + # so this applies to both get_object and fget_object + reader = data + elif content_encoding == "zstd": + # we have to manually decompress zstandard compressed data + cctx = zstandard.ZstdDecompressor() + reader = cctx.stream_reader(data) + else: + with sentry_sdk.new_scope() as scope: + scope.set_extra("bucket_name", bucket_name) + scope.set_extra("path", path) + raise ValueError("Blob does not have Content-Encoding set") + + if file_obj: + while chunk := reader.read(16384): + file_obj.write(chunk) + return None else: - for d in res.stream(CHUNK_SIZE): - file_obj.write(d) + res = BytesIO() + while chunk := reader.read(16384): + res.write(chunk) + return res.getvalue() except S3Error as e: if e.code == "NoSuchKey": raise FileNotInStorageError( @@ -222,6 +295,10 @@ def read_file(self, bucket_name, path, file_obj=None) -> bytes | None: raise e except MinioException: raise + finally: + if file_obj: + data.close() + os.unlink(tmpfilepath) """ Deletes file url in specified bucket. diff --git a/tests/unit/storage/__init__.py b/tests/unit/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/storage/test_minio.py b/tests/unit/storage/test_minio.py index 50fe478d..00a32104 100644 --- a/tests/unit/storage/test_minio.py +++ b/tests/unit/storage/test_minio.py @@ -1,231 +1,310 @@ -import os +import gzip import tempfile +from io import BytesIO +from uuid import uuid4 import pytest +import zstandard from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError from shared.storage.minio import MinioStorageService -from tests.base import BaseTestCase - -minio_config = { - "access_key_id": "codecov-default-key", - "secret_access_key": "codecov-default-secret", - "verify_ssl": False, - "host": "minio", - "port": "9000", - "iam_auth": False, - "iam_endpoint": None, -} - - -class TestMinioStorageService(BaseTestCase): - def test_create_bucket(self, codecov_vcr): - storage = MinioStorageService(minio_config) - bucket_name = "archivetest" - res = storage.create_root_storage(bucket_name, region="") - assert res == {"name": "archivetest"} - - def test_create_bucket_already_exists(self, codecov_vcr): - storage = MinioStorageService(minio_config) - bucket_name = "alreadyexists" - storage.create_root_storage(bucket_name) - with pytest.raises(BucketAlreadyExistsError): - storage.create_root_storage(bucket_name) - - def test_write_then_read_file(self, codecov_vcr): - storage = MinioStorageService(minio_config) - path = "test_write_then_read_file/result" - data = "lorem ipsum dolor test_write_then_read_file á" - bucket_name = "archivetest" - writing_result = storage.write_file(bucket_name, path, data) - assert writing_result - reading_result = storage.read_file(bucket_name, path) - assert reading_result.decode() == data - - def test_write_then_read_file_obj(self, codecov_vcr): - storage = MinioStorageService(minio_config) - path = "test_write_then_read_file/result" - data = "lorem ipsum dolor test_write_then_read_file á" - _, local_path = tempfile.mkstemp() - with open(local_path, "w") as f: - f.write(data) - f = open(local_path, "rb") - bucket_name = "archivetest" - writing_result = storage.write_file(bucket_name, path, f) - assert writing_result - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - storage.read_file(bucket_name, path, file_obj=f) - with open(local_path, "rb") as f: - assert f.read().decode() == data - - def test_read_file_does_not_exist(self, request, codecov_vcr): - storage = MinioStorageService(minio_config) - path = f"{request.node.name}/does_not_exist.txt" - bucket_name = "archivetest" - with pytest.raises(FileNotInStorageError): - storage.read_file(bucket_name, path) - - def test_write_then_delete_file(self, request, codecov_vcr): - storage = MinioStorageService(minio_config) - path = f"{request.node.name}/result.txt" - data = "lorem ipsum dolor test_write_then_read_file á" - bucket_name = "archivetest" - writing_result = storage.write_file(bucket_name, path, data) - assert writing_result - deletion_result = storage.delete_file(bucket_name, path) - assert deletion_result is True - with pytest.raises(FileNotInStorageError): - storage.read_file(bucket_name, path) - - def test_delete_file_doesnt_exist(self, request, codecov_vcr): - storage = MinioStorageService(minio_config) - path = f"{request.node.name}/result.txt" - bucket_name = "archivetest" - storage.delete_file(bucket_name, path) - - def test_batch_delete_files(self, request, codecov_vcr): - storage = MinioStorageService(minio_config) - path_1 = f"{request.node.name}/result_1.txt" - path_2 = f"{request.node.name}/result_2.txt" - path_3 = f"{request.node.name}/result_3.txt" - paths = [path_1, path_2, path_3] - data = "lorem ipsum dolor test_write_then_read_file á" - bucket_name = "archivetest" - storage.write_file(bucket_name, path_1, data) - storage.write_file(bucket_name, path_3, data) - deletion_result = storage.delete_files(bucket_name, paths) - assert deletion_result == [True, True, True] - for p in paths: - with pytest.raises(FileNotInStorageError): - storage.read_file(bucket_name, p) - - def test_list_folder_contents(self, request, codecov_vcr): - storage = MinioStorageService(minio_config) - path_1 = f"thiago/{request.node.name}/result_1.txt" - path_2 = f"thiago/{request.node.name}/result_2.txt" - path_3 = f"thiago/{request.node.name}/result_3.txt" - path_4 = f"thiago/{request.node.name}/f1/result_1.txt" - path_5 = f"thiago/{request.node.name}/f1/result_2.txt" - path_6 = f"thiago/{request.node.name}/f1/result_3.txt" - all_paths = [path_1, path_2, path_3, path_4, path_5, path_6] - bucket_name = "archivetest" - for i, p in enumerate(all_paths): - data = f"Lorem ipsum on file {p} for {i * 'po'}" - storage.write_file(bucket_name, p, data) - results_1 = list( - storage.list_folder_contents(bucket_name, f"thiago/{request.node.name}") - ) - expected_result_1 = [ - {"name": path_1, "size": 84}, - {"name": path_2, "size": 86}, - {"name": path_3, "size": 87}, - {"name": path_4, "size": 88}, - {"name": path_5, "size": 89}, - {"name": path_6, "size": 90}, - ] - assert sorted(expected_result_1, key=lambda x: x["size"]) == sorted( - results_1, key=lambda x: x["size"] - ) - results_2 = list( - storage.list_folder_contents(bucket_name, f"thiago/{request.node.name}/f1") - ) - expected_result_2 = [ - {"name": path_4, "size": 88}, - {"name": path_5, "size": 89}, - {"name": path_6, "size": 90}, - ] - assert sorted(expected_result_2, key=lambda x: x["size"]) == sorted( - results_2, key=lambda x: x["size"] - ) - """ - Since we cannot rely on `Chain` in the underlying implementation - we cannot ''trick'' minio into using the IAM auth flow while testing, - and therefore have to actually be running on an AWS instance. - We can unskip this test after minio fixes their credential - chain problem - """ - - @pytest.mark.skip(reason="Skipping because minio IAM is currently untestable.") - def test_minio_with_iam_flow(self, codecov_vcr, mocker): - mocker.patch.dict( - os.environ, - { - "MINIO_ACCESS_KEY": "codecov-default-key", - "MINIO_SECRET_KEY": "codecov-default-secret", - }, - ) - minio_iam_config = { +BUCKET_NAME = "archivetest" + + +def make_storage() -> MinioStorageService: + return MinioStorageService( + { "access_key_id": "codecov-default-key", "secret_access_key": "codecov-default-secret", "verify_ssl": False, "host": "minio", "port": "9000", - "iam_auth": True, + "iam_auth": False, "iam_endpoint": None, } - bucket_name = "testminiowithiamflow" - storage = MinioStorageService(minio_iam_config) + ) + + +def ensure_bucket(storage: MinioStorageService): + try: + storage.create_root_storage(BUCKET_NAME) + except Exception: + pass + + +def test_create_bucket(): + storage = make_storage() + bucket_name = uuid4().hex + + res = storage.create_root_storage(bucket_name, region="") + assert res == {"name": bucket_name} + + +def test_create_bucket_already_exists(): + storage = make_storage() + bucket_name = uuid4().hex + + storage.create_root_storage(bucket_name) + with pytest.raises(BucketAlreadyExistsError): storage.create_root_storage(bucket_name) - path = "test_write_then_read_file/result" - data = "lorem ipsum dolor test_write_then_read_file á" - writing_result = storage.write_file(bucket_name, path, data) - assert writing_result - reading_result = storage.read_file(bucket_name, path) - assert reading_result.decode() == data - - def test_minio_without_ports(self, mocker): - mocked_minio_client = mocker.patch("shared.storage.minio.Minio") - minio_no_ports_config = { - "access_key_id": "hodor", - "secret_access_key": "haha", - "verify_ssl": False, - "host": "cute_url_no_ports", - "iam_auth": True, - "iam_endpoint": None, - } - storage = MinioStorageService(minio_no_ports_config) - assert storage.minio_config == minio_no_ports_config - mocked_minio_client.assert_called_with( - "cute_url_no_ports", credentials=mocker.ANY, secure=False, region=None - ) - def test_minio_with_ports(self, mocker): - mocked_minio_client = mocker.patch("shared.storage.minio.Minio") - minio_no_ports_config = { - "access_key_id": "hodor", - "secret_access_key": "haha", - "verify_ssl": False, - "host": "cute_url_no_ports", - "port": "9000", - "iam_auth": True, - "iam_endpoint": None, - } - storage = MinioStorageService(minio_no_ports_config) - assert storage.minio_config == minio_no_ports_config - mocked_minio_client.assert_called_with( - "cute_url_no_ports:9000", credentials=mocker.ANY, secure=False, region=None + +def test_write_then_read_file(): + storage = make_storage() + path = f"test_write_then_read_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + writing_result = storage.write_file(BUCKET_NAME, path, data) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == data + + +def test_write_then_read_file_already_gzipped(): + storage = make_storage() + path = f"test_write_then_read_file_already_gzipped/{uuid4().hex}" + data = BytesIO( + gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + writing_result = storage.write_file( + BUCKET_NAME, path, data, is_already_gzipped=True + ) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_already_zstd(): + storage = make_storage() + path = f"test_write_then_read_file_already_zstd/{uuid4().hex}" + data = BytesIO( + zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + writing_result = storage.write_file( + BUCKET_NAME, path, data, compression_type="zstd", is_compressed=True + ) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj(): + storage = make_storage() + path = f"test_write_then_read_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file(BUCKET_NAME, path, f) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_already_gzipped(): + storage = make_storage() + path = f"test_write_then_read_file_obj_already_gzipped/{uuid4().hex}" + data = BytesIO( + gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + f.write(data.getvalue()) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, is_already_gzipped=True ) + assert writing_result - def test_minio_with_region(self, mocker): - mocked_minio_client = mocker.patch("shared.storage.minio.Minio") - minio_no_ports_config = { - "access_key_id": "hodor", - "secret_access_key": "haha", - "verify_ssl": False, - "host": "cute_url_no_ports", - "port": "9000", - "iam_auth": True, - "iam_endpoint": None, - "region": "example", - } - storage = MinioStorageService(minio_no_ports_config) - assert storage.minio_config == minio_no_ports_config - mocked_minio_client.assert_called_with( - "cute_url_no_ports:9000", - credentials=mocker.ANY, - secure=False, - region="example", + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj_already_zstd(): + storage = make_storage() + path = f"test_write_then_read_file_obj_already_zstd/{uuid4().hex}" + data = BytesIO( + zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + f.write(data.getvalue()) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, is_compressed=True, compression_type="zstd" ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_read_file_does_not_exist(): + storage = make_storage() + path = f"test_read_file_does_not_exist/{uuid4().hex}" + + ensure_bucket(storage) + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, path) + + +def test_write_then_delete_file(): + storage = make_storage() + path = f"test_write_then_delete_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_delete_file á" + + ensure_bucket(storage) + writing_result = storage.write_file(BUCKET_NAME, path, data) + assert writing_result + + deletion_result = storage.delete_file(BUCKET_NAME, path) + assert deletion_result is True + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, path) + + +def test_batch_delete_files(): + storage = make_storage() + path = f"test_batch_delete_files/{uuid4().hex}" + path_1 = f"{path}/result_1.txt" + path_2 = f"{path}/result_2.txt" + path_3 = f"{path}/result_3.txt" + paths = [path_1, path_2, path_3] + data = "lorem ipsum dolor test_batch_delete_files á" + + ensure_bucket(storage) + storage.write_file(BUCKET_NAME, path_1, data) + storage.write_file(BUCKET_NAME, path_3, data) + + deletion_result = storage.delete_files(BUCKET_NAME, paths) + assert deletion_result == [True, True, True] + for p in paths: + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, p) + + +def test_list_folder_contents(): + storage = make_storage() + path = f"test_list_folder_contents/{uuid4().hex}" + path_1 = "/result_1.txt" + path_2 = "/result_2.txt" + path_3 = "/result_3.txt" + path_4 = "/x1/result_1.txt" + path_5 = "/x1/result_2.txt" + path_6 = "/x1/result_3.txt" + all_paths = [path_1, path_2, path_3, path_4, path_5, path_6] + + ensure_bucket(storage) + for i, p in enumerate(all_paths): + data = f"Lorem ipsum on file {p} for {i * 'po'}" + storage.write_file(BUCKET_NAME, f"{path}{p}", data) + + results_1 = sorted( + storage.list_folder_contents(BUCKET_NAME, path), + key=lambda x: x["name"], + ) + # NOTE: the `size` here is actually the compressed (currently gzip) size + assert results_1 == [ + {"name": f"{path}{path_1}", "size": 47}, + {"name": f"{path}{path_2}", "size": 49}, + {"name": f"{path}{path_3}", "size": 51}, + {"name": f"{path}{path_4}", "size": 56}, + {"name": f"{path}{path_5}", "size": 58}, + {"name": f"{path}{path_6}", "size": 60}, + ] + + results_2 = sorted( + storage.list_folder_contents(BUCKET_NAME, f"{path}/x1"), + key=lambda x: x["name"], + ) + assert results_2 == [ + {"name": f"{path}{path_4}", "size": 56}, + {"name": f"{path}{path_5}", "size": 58}, + {"name": f"{path}{path_6}", "size": 60}, + ] + + +def test_minio_without_ports(mocker): + mocked_minio_client = mocker.patch("shared.storage.minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "iam_auth": True, + "iam_endpoint": None, + } + + storage = MinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports", credentials=mocker.ANY, secure=False, region=None + ) + + +def test_minio_with_ports(mocker): + mocked_minio_client = mocker.patch("shared.storage.minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + } + + storage = MinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", credentials=mocker.ANY, secure=False, region=None + ) + + +def test_minio_with_region(mocker): + mocked_minio_client = mocker.patch("shared.storage.minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + "region": "example", + } + + storage = MinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", + credentials=mocker.ANY, + secure=False, + region="example", + ) diff --git a/uv.lock b/uv.lock index a9e0cbfb..f94cccc8 100644 --- a/uv.lock +++ b/uv.lock @@ -1430,6 +1430,7 @@ dependencies = [ { name = "requests" }, { name = "sentry-sdk" }, { name = "sqlalchemy" }, + { name = "zstandard" }, ] [package.dev-dependencies] @@ -1483,6 +1484,7 @@ requires-dist = [ { name = "requests", specifier = ">=2.32.3" }, { name = "sentry-sdk", specifier = ">=2.18.0" }, { name = "sqlalchemy", specifier = "<2" }, + { name = "zstandard", specifier = "==0.23.0" }, ] [package.metadata.requires-dev] @@ -1686,3 +1688,46 @@ sdist = { url = "https://files.pythonhosted.org/packages/3f/50/bad581df71744867e wheels = [ { url = "https://files.pythonhosted.org/packages/b7/1a/7e4798e9339adc931158c9d69ecc34f5e6791489d469f5e50ec15e35f458/zipp-3.21.0-py3-none-any.whl", hash = "sha256:ac1bbe05fd2991f160ebce24ffbac5f6d11d83dc90891255885223d42b3cd931", size = 9630 }, ] + +[[package]] +name = "zstandard" +version = "0.23.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cffi", marker = "platform_python_implementation == 'PyPy'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ed/f6/2ac0287b442160a89d726b17a9184a4c615bb5237db763791a7fd16d9df1/zstandard-0.23.0.tar.gz", hash = "sha256:b2d8c62d08e7255f68f7a740bae85b3c9b8e5466baa9cbf7f57f1cde0ac6bc09", size = 681701 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/83/f23338c963bd9de687d47bf32efe9fd30164e722ba27fb59df33e6b1719b/zstandard-0.23.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:b4567955a6bc1b20e9c31612e615af6b53733491aeaa19a6b3b37f3b65477094", size = 788713 }, + { url = "https://files.pythonhosted.org/packages/5b/b3/1a028f6750fd9227ee0b937a278a434ab7f7fdc3066c3173f64366fe2466/zstandard-0.23.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1e172f57cd78c20f13a3415cc8dfe24bf388614324d25539146594c16d78fcc8", size = 633459 }, + { url = "https://files.pythonhosted.org/packages/26/af/36d89aae0c1f95a0a98e50711bc5d92c144939efc1f81a2fcd3e78d7f4c1/zstandard-0.23.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b0e166f698c5a3e914947388c162be2583e0c638a4703fc6a543e23a88dea3c1", size = 4945707 }, + { url = "https://files.pythonhosted.org/packages/cd/2e/2051f5c772f4dfc0aae3741d5fc72c3dcfe3aaeb461cc231668a4db1ce14/zstandard-0.23.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:12a289832e520c6bd4dcaad68e944b86da3bad0d339ef7989fb7e88f92e96072", size = 5306545 }, + { url = "https://files.pythonhosted.org/packages/0a/9e/a11c97b087f89cab030fa71206963090d2fecd8eb83e67bb8f3ffb84c024/zstandard-0.23.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d50d31bfedd53a928fed6707b15a8dbeef011bb6366297cc435accc888b27c20", size = 5337533 }, + { url = "https://files.pythonhosted.org/packages/fc/79/edeb217c57fe1bf16d890aa91a1c2c96b28c07b46afed54a5dcf310c3f6f/zstandard-0.23.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:72c68dda124a1a138340fb62fa21b9bf4848437d9ca60bd35db36f2d3345f373", size = 5436510 }, + { url = "https://files.pythonhosted.org/packages/81/4f/c21383d97cb7a422ddf1ae824b53ce4b51063d0eeb2afa757eb40804a8ef/zstandard-0.23.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:53dd9d5e3d29f95acd5de6802e909ada8d8d8cfa37a3ac64836f3bc4bc5512db", size = 4859973 }, + { url = "https://files.pythonhosted.org/packages/ab/15/08d22e87753304405ccac8be2493a495f529edd81d39a0870621462276ef/zstandard-0.23.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:6a41c120c3dbc0d81a8e8adc73312d668cd34acd7725f036992b1b72d22c1772", size = 4936968 }, + { url = "https://files.pythonhosted.org/packages/eb/fa/f3670a597949fe7dcf38119a39f7da49a8a84a6f0b1a2e46b2f71a0ab83f/zstandard-0.23.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:40b33d93c6eddf02d2c19f5773196068d875c41ca25730e8288e9b672897c105", size = 5467179 }, + { url = "https://files.pythonhosted.org/packages/4e/a9/dad2ab22020211e380adc477a1dbf9f109b1f8d94c614944843e20dc2a99/zstandard-0.23.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:9206649ec587e6b02bd124fb7799b86cddec350f6f6c14bc82a2b70183e708ba", size = 4848577 }, + { url = "https://files.pythonhosted.org/packages/08/03/dd28b4484b0770f1e23478413e01bee476ae8227bbc81561f9c329e12564/zstandard-0.23.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:76e79bc28a65f467e0409098fa2c4376931fd3207fbeb6b956c7c476d53746dd", size = 4693899 }, + { url = "https://files.pythonhosted.org/packages/2b/64/3da7497eb635d025841e958bcd66a86117ae320c3b14b0ae86e9e8627518/zstandard-0.23.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:66b689c107857eceabf2cf3d3fc699c3c0fe8ccd18df2219d978c0283e4c508a", size = 5199964 }, + { url = "https://files.pythonhosted.org/packages/43/a4/d82decbab158a0e8a6ebb7fc98bc4d903266bce85b6e9aaedea1d288338c/zstandard-0.23.0-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:9c236e635582742fee16603042553d276cca506e824fa2e6489db04039521e90", size = 5655398 }, + { url = "https://files.pythonhosted.org/packages/f2/61/ac78a1263bc83a5cf29e7458b77a568eda5a8f81980691bbc6eb6a0d45cc/zstandard-0.23.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a8fffdbd9d1408006baaf02f1068d7dd1f016c6bcb7538682622c556e7b68e35", size = 5191313 }, + { url = "https://files.pythonhosted.org/packages/e7/54/967c478314e16af5baf849b6ee9d6ea724ae5b100eb506011f045d3d4e16/zstandard-0.23.0-cp312-cp312-win32.whl", hash = "sha256:dc1d33abb8a0d754ea4763bad944fd965d3d95b5baef6b121c0c9013eaf1907d", size = 430877 }, + { url = "https://files.pythonhosted.org/packages/75/37/872d74bd7739639c4553bf94c84af7d54d8211b626b352bc57f0fd8d1e3f/zstandard-0.23.0-cp312-cp312-win_amd64.whl", hash = "sha256:64585e1dba664dc67c7cdabd56c1e5685233fbb1fc1966cfba2a340ec0dfff7b", size = 495595 }, + { url = "https://files.pythonhosted.org/packages/80/f1/8386f3f7c10261fe85fbc2c012fdb3d4db793b921c9abcc995d8da1b7a80/zstandard-0.23.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:576856e8594e6649aee06ddbfc738fec6a834f7c85bf7cadd1c53d4a58186ef9", size = 788975 }, + { url = "https://files.pythonhosted.org/packages/16/e8/cbf01077550b3e5dc86089035ff8f6fbbb312bc0983757c2d1117ebba242/zstandard-0.23.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:38302b78a850ff82656beaddeb0bb989a0322a8bbb1bf1ab10c17506681d772a", size = 633448 }, + { url = "https://files.pythonhosted.org/packages/06/27/4a1b4c267c29a464a161aeb2589aff212b4db653a1d96bffe3598f3f0d22/zstandard-0.23.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d2240ddc86b74966c34554c49d00eaafa8200a18d3a5b6ffbf7da63b11d74ee2", size = 4945269 }, + { url = "https://files.pythonhosted.org/packages/7c/64/d99261cc57afd9ae65b707e38045ed8269fbdae73544fd2e4a4d50d0ed83/zstandard-0.23.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2ef230a8fd217a2015bc91b74f6b3b7d6522ba48be29ad4ea0ca3a3775bf7dd5", size = 5306228 }, + { url = "https://files.pythonhosted.org/packages/7a/cf/27b74c6f22541f0263016a0fd6369b1b7818941de639215c84e4e94b2a1c/zstandard-0.23.0-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:774d45b1fac1461f48698a9d4b5fa19a69d47ece02fa469825b442263f04021f", size = 5336891 }, + { url = "https://files.pythonhosted.org/packages/fa/18/89ac62eac46b69948bf35fcd90d37103f38722968e2981f752d69081ec4d/zstandard-0.23.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6f77fa49079891a4aab203d0b1744acc85577ed16d767b52fc089d83faf8d8ed", size = 5436310 }, + { url = "https://files.pythonhosted.org/packages/a8/a8/5ca5328ee568a873f5118d5b5f70d1f36c6387716efe2e369010289a5738/zstandard-0.23.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ac184f87ff521f4840e6ea0b10c0ec90c6b1dcd0bad2f1e4a9a1b4fa177982ea", size = 4859912 }, + { url = "https://files.pythonhosted.org/packages/ea/ca/3781059c95fd0868658b1cf0440edd832b942f84ae60685d0cfdb808bca1/zstandard-0.23.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:c363b53e257246a954ebc7c488304b5592b9c53fbe74d03bc1c64dda153fb847", size = 4936946 }, + { url = "https://files.pythonhosted.org/packages/ce/11/41a58986f809532742c2b832c53b74ba0e0a5dae7e8ab4642bf5876f35de/zstandard-0.23.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:e7792606d606c8df5277c32ccb58f29b9b8603bf83b48639b7aedf6df4fe8171", size = 5466994 }, + { url = "https://files.pythonhosted.org/packages/83/e3/97d84fe95edd38d7053af05159465d298c8b20cebe9ccb3d26783faa9094/zstandard-0.23.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a0817825b900fcd43ac5d05b8b3079937073d2b1ff9cf89427590718b70dd840", size = 4848681 }, + { url = "https://files.pythonhosted.org/packages/6e/99/cb1e63e931de15c88af26085e3f2d9af9ce53ccafac73b6e48418fd5a6e6/zstandard-0.23.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:9da6bc32faac9a293ddfdcb9108d4b20416219461e4ec64dfea8383cac186690", size = 4694239 }, + { url = "https://files.pythonhosted.org/packages/ab/50/b1e703016eebbc6501fc92f34db7b1c68e54e567ef39e6e59cf5fb6f2ec0/zstandard-0.23.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:fd7699e8fd9969f455ef2926221e0233f81a2542921471382e77a9e2f2b57f4b", size = 5200149 }, + { url = "https://files.pythonhosted.org/packages/aa/e0/932388630aaba70197c78bdb10cce2c91fae01a7e553b76ce85471aec690/zstandard-0.23.0-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:d477ed829077cd945b01fc3115edd132c47e6540ddcd96ca169facff28173057", size = 5655392 }, + { url = "https://files.pythonhosted.org/packages/02/90/2633473864f67a15526324b007a9f96c96f56d5f32ef2a56cc12f9548723/zstandard-0.23.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa6ce8b52c5987b3e34d5674b0ab529a4602b632ebab0a93b07bfb4dfc8f8a33", size = 5191299 }, + { url = "https://files.pythonhosted.org/packages/b0/4c/315ca5c32da7e2dc3455f3b2caee5c8c2246074a61aac6ec3378a97b7136/zstandard-0.23.0-cp313-cp313-win32.whl", hash = "sha256:a9b07268d0c3ca5c170a385a0ab9fb7fdd9f5fd866be004c4ea39e44edce47dd", size = 430862 }, + { url = "https://files.pythonhosted.org/packages/a2/bf/c6aaba098e2d04781e8f4f7c0ba3c7aa73d00e4c436bcc0cf059a66691d1/zstandard-0.23.0-cp313-cp313-win_amd64.whl", hash = "sha256:f3513916e8c645d0610815c257cbfd3242adfd5c4cfa78be514e5a3ebb42a41b", size = 495578 }, +] From a69e79b89f55c48e482b9745b90a6299d96590da Mon Sep 17 00:00:00 2001 From: joseph-sentry Date: Wed, 27 Nov 2024 12:29:52 -0500 Subject: [PATCH 2/6] fix: address feedback - using fget_object was unecessary since we were streaming the response data regardless - no need for all the warning logs and sentry stuff, we'll just do a 3 step migration in both API and worker (update shared supporting old behaviour, update {api,worker}, remove old behaviour support from shared) - zstandard version pinning can be more flexible - add test for content type = application/x-gzip since there was some specific handling for that in the GCP storage service --- pyproject.toml | 2 +- shared/storage/minio.py | 60 +++++++------------------------- tests/unit/storage/test_minio.py | 26 ++++++++++++++ 3 files changed, 39 insertions(+), 49 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cb0aaf90..388d8e53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "requests>=2.32.3", "sentry-sdk>=2.18.0", "sqlalchemy<2", - "zstandard==0.23.0", + "zstandard>=0.23.0", ] [build-system] diff --git a/shared/storage/minio.py b/shared/storage/minio.py index bb000241..d718a788 100644 --- a/shared/storage/minio.py +++ b/shared/storage/minio.py @@ -7,8 +7,6 @@ from io import BytesIO from typing import BinaryIO, Protocol, overload -import sentry_sdk -import sentry_sdk.scope import zstandard from minio import Minio from minio.credentials import ( @@ -172,25 +170,10 @@ def write_file( compression_type: str = "zstd", ): if is_already_gzipped: - log.warning( - "is_already_gzipped is deprecated and will be removed in a future version, instead compress using zstd and use the is_already_zstd_compressed argument" - ) - with sentry_sdk.new_scope() as scope: - scope.set_extra("bucket_name", bucket_name) - scope.set_extra("path", path) - sentry_sdk.capture_message("is_already_gzipped passed with True") is_compressed = True compression_type = "gzip" if isinstance(data, str): - log.warning( - "passing data as a str to write_file is deprecated and will be removed in a future version, instead pass an object compliant with the BinaryIO type" - ) - with sentry_sdk.new_scope() as scope: - scope.set_extra("bucket_name", bucket_name) - scope.set_extra("path", path) - sentry_sdk.capture_message("write_file data argument passed as str") - data = BytesIO(data.encode()) if not is_compressed: @@ -242,43 +225,24 @@ def read_file( def read_file(self, bucket_name: str, path: str, file_obj: str) -> None: ... def read_file(self, bucket_name, path, file_obj=None) -> bytes | None: + response = None try: headers = {"Accept-Encoding": "gzip, zstd"} - if file_obj: - _, tmpfilepath = tempfile.mkstemp() - to_file_response: GetObjectToFileResponse = ( - self.minio_client.fget_object( - bucket_name, path, tmpfilepath, request_headers=headers - ) - ) - data = open(tmpfilepath, "rb") - content_encoding = to_file_response.metadata.get( - "Content-Encoding", None - ) - else: - response: HTTPResponse = self.minio_client.get_object( - bucket_name, path, request_headers=headers - ) - data = response - content_encoding = response.headers.get("Content-Encoding", None) + response: HTTPResponse = self.minio_client.get_object( + bucket_name, path, request_headers=headers + ) + content_encoding = response.headers.get("Content-Encoding", None) reader: Readable | None = None - if content_encoding == "gzip": - # HTTPResponse automatically decodes gzipped data for us - # minio_client.fget_object uses HTTPResponse under the hood, - # so this applies to both get_object and fget_object - reader = data - elif content_encoding == "zstd": + if content_encoding == "zstd": # we have to manually decompress zstandard compressed data cctx = zstandard.ZstdDecompressor() - reader = cctx.stream_reader(data) + reader = cctx.stream_reader(response) else: - with sentry_sdk.new_scope() as scope: - scope.set_extra("bucket_name", bucket_name) - scope.set_extra("path", path) - raise ValueError("Blob does not have Content-Encoding set") + reader = response if file_obj: + file_obj.seek(0) while chunk := reader.read(16384): file_obj.write(chunk) return None @@ -296,9 +260,9 @@ def read_file(self, bucket_name, path, file_obj=None) -> bytes | None: except MinioException: raise finally: - if file_obj: - data.close() - os.unlink(tmpfilepath) + if response: + response.close() + response.release_conn() """ Deletes file url in specified bucket. diff --git a/tests/unit/storage/test_minio.py b/tests/unit/storage/test_minio.py index 00a32104..facd301e 100644 --- a/tests/unit/storage/test_minio.py +++ b/tests/unit/storage/test_minio.py @@ -115,6 +115,32 @@ def test_write_then_read_file_obj(): assert f.read().decode() == data +def test_write_then_read_file_obj_x_gzip(): + storage = make_storage() + path = f"test_write_then_read_file_obj_x_gzip/{uuid4().hex}" + compressed = gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + outsize = len(compressed) + data = BytesIO(compressed) + + ensure_bucket(storage) + + headers = {"Content-Encoding": "gzip"} + storage.minio_client.put_object( + BUCKET_NAME, + path, + data, + content_type="application/x-gzip", + metadata=headers, + length=outsize, + ) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + def test_write_then_read_file_obj_already_gzipped(): storage = make_storage() path = f"test_write_then_read_file_obj_already_gzipped/{uuid4().hex}" From d57953136d14d340983cb5973937006670383193 Mon Sep 17 00:00:00 2001 From: joseph-sentry Date: Thu, 28 Nov 2024 16:24:06 -0500 Subject: [PATCH 3/6] fix: update MinioStorageService - in write file: - data arg is not BinaryIO it's actually bytes | str | IO[bytes] bytes and str are self-explanatory it's just how it's being used currently, so we must support it. IO[bytes] is there to support files handles opened with "rb" that are being passed and BytesIO objects - start accepting None value for compression_type which will mean no automatic compression even if is_compressed is false - do automatic compression using gzip if is_compressed=False and compression_type="gzip" - in put_object set size = -1 and use a part_size of 20MiB. the specific part size is arbitrary. Different sources online suggest different numbers. It probably depends on the size of the underlying data we're trying to send but 20MiB seems like a good flat number to pick for now. - in read_file: - generally reorganize the function do spend less time under the try except blocks - use the CHUNK_SIZE const defined in storage/base for the amount to read from the streams - accept IO[bytes] for the file_obj since we don't use any of the BinaryIO specific methods - create GZipStreamReader that takes in a IO[bytes] and implements a read() method that reads a certain amount of bytes from the IO[bytes] compresses whatever it reads using gzip, and returns the result --- shared/storage/base.py | 1 + shared/storage/minio.py | 200 ++++++++++++++++--------------- tests/unit/storage/test_minio.py | 44 +++++++ uv.lock | 2 +- 4 files changed, 149 insertions(+), 98 deletions(-) diff --git a/shared/storage/base.py b/shared/storage/base.py index 22668ca4..bf8795c5 100644 --- a/shared/storage/base.py +++ b/shared/storage/base.py @@ -1,4 +1,5 @@ CHUNK_SIZE = 1024 * 32 +PART_SIZE = 1024 * 1024 * 20 # 20MiB # Interface class for interfacing with codecov's underlying storage layer diff --git a/shared/storage/minio.py b/shared/storage/minio.py index d718a788..8184a9ef 100644 --- a/shared/storage/minio.py +++ b/shared/storage/minio.py @@ -1,15 +1,13 @@ -import datetime +import gzip import json import logging -import os import sys -import tempfile from io import BytesIO -from typing import BinaryIO, Protocol, overload +from typing import IO, BinaryIO, Tuple, cast, overload import zstandard from minio import Minio -from minio.credentials import ( +from minio.credentials.providers import ( ChainedProvider, EnvAWSProvider, EnvMinioProvider, @@ -17,27 +15,26 @@ ) from minio.deleteobjects import DeleteObject from minio.error import MinioException, S3Error -from urllib3.response import HTTPResponse +from minio.helpers import ObjectWriteResult +from urllib3 import HTTPResponse -from shared.storage.base import BaseStorageService +from shared.storage.base import CHUNK_SIZE, PART_SIZE, BaseStorageService from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError log = logging.getLogger(__name__) -class Readable(Protocol): - def read(self, size: int = -1) -> bytes: ... +class GZipStreamReader: + def __init__(self, fileobj: IO[bytes]): + self.data = fileobj + def read(self, size: int = -1, /) -> bytes: + curr_data = self.data.read(size) -class GetObjectToFileResponse(Protocol): - bucket_name: str - object_name: str - last_modified: datetime.datetime | None - etag: str - size: int - content_type: str | None - metadata: dict[str, str] - version_id: str | None + if not curr_data: + return b"" + + return gzip.compress(curr_data) # Service class for interfacing with codecov's underlying storage layer, minio @@ -65,12 +62,12 @@ def init_minio_client( self, host: str, port: str, - access_key: str = None, - secret_key: str = None, + access_key: str | None = None, + secret_key: str | None = None, verify_ssl: bool = False, iam_auth: bool = False, - iam_endpoint: str = None, - region: str = None, + iam_endpoint: str | None = None, + region: str | None = None, ): """ Initialize the minio client @@ -162,59 +159,59 @@ def write_file( self, bucket_name: str, path: str, - data: BinaryIO, + data: IO[bytes] | str | bytes, reduced_redundancy: bool = False, *, is_already_gzipped: bool = False, # deprecated is_compressed: bool = False, - compression_type: str = "zstd", - ): + compression_type: str | None = "zstd", + ) -> ObjectWriteResult: + if isinstance(data, str): + data = BytesIO(data.encode()) + elif isinstance(data, (bytes, bytearray, memoryview)): + data = BytesIO(data) + if is_already_gzipped: is_compressed = True compression_type = "gzip" - if isinstance(data, str): - data = BytesIO(data.encode()) + if is_compressed: + result = data + else: + if compression_type == "zstd": + cctx = zstandard.ZstdCompressor() + result = cctx.stream_reader(data) - if not is_compressed: - cctx = zstandard.ZstdCompressor() - reader: zstandard.ZstdCompressionReader = cctx.stream_reader(data) - _, filepath = tempfile.mkstemp() - with open(filepath, "wb") as f: - while chunk := reader.read(16384): - f.write(chunk) - data = open(filepath, "rb") - - try: - out_size = data.seek(0, os.SEEK_END) - data.seek(0) - - if compression_type == "gzip": - content_encoding = "gzip" - elif compression_type == "zstd": - content_encoding = "zstd" - - headers = {"Content-Encoding": content_encoding} - - if reduced_redundancy: - headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" - - self.minio_client.put_object( - bucket_name, - path, - data, - out_size, - metadata=headers, - content_type="text/plain", - ) - return True + elif compression_type == "gzip": + result = GZipStreamReader(data) - except MinioException: - raise - finally: - if not is_compressed: - data.close() - os.unlink(filepath) + else: + result = data + + headers: dict[str, str | list[str] | Tuple[str]] = {} + + if compression_type: + headers["Content-Encoding"] = compression_type + + if reduced_redundancy: + headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" + + # it's safe to do a BinaryIO cast here because we know that put_object only uses a function of the shape: + # read(self, size: int = -1, /) -> bytes + # GZipStreamReader implements this (we did it ourselves) + # ZstdCompressionReader implements read(): https://github.com/indygreg/python-zstandard/blob/12a80fac558820adf43e6f16206120685b9eb880/zstandard/__init__.pyi#L233C5-L233C49 + # BytesIO implements read(): https://docs.python.org/3/library/io.html#io.BufferedReader.read + # IO[bytes] implements read(): https://github.com/python/cpython/blob/3.13/Lib/typing.py#L3502 + + return self.minio_client.put_object( + bucket_name, + path, + cast(BinaryIO, result), + -1, + metadata=headers, + content_type="text/plain", + part_size=PART_SIZE, + ) @overload def read_file( @@ -222,47 +219,56 @@ def read_file( ) -> bytes: ... @overload - def read_file(self, bucket_name: str, path: str, file_obj: str) -> None: ... + def read_file(self, bucket_name: str, path: str, file_obj: IO[bytes]) -> None: ... - def read_file(self, bucket_name, path, file_obj=None) -> bytes | None: - response = None + def read_file( + self, bucket_name: str, path: str, file_obj: IO[bytes] | None = None + ) -> bytes | None: + headers: dict[str, str | list[str] | Tuple[str]] = { + "Accept-Encoding": "gzip, zstd" + } try: - headers = {"Accept-Encoding": "gzip, zstd"} - response: HTTPResponse = self.minio_client.get_object( - bucket_name, path, request_headers=headers + response = cast( + HTTPResponse, + self.minio_client.get_object( # this returns an HTTPResponse + bucket_name, path, request_headers=headers + ), ) - - content_encoding = response.headers.get("Content-Encoding", None) - reader: Readable | None = None - if content_encoding == "zstd": - # we have to manually decompress zstandard compressed data - cctx = zstandard.ZstdDecompressor() - reader = cctx.stream_reader(response) - else: - reader = response - - if file_obj: - file_obj.seek(0) - while chunk := reader.read(16384): - file_obj.write(chunk) - return None - else: - res = BytesIO() - while chunk := reader.read(16384): - res.write(chunk) - return res.getvalue() except S3Error as e: if e.code == "NoSuchKey": raise FileNotInStorageError( f"File {path} does not exist in {bucket_name}" ) raise e - except MinioException: - raise - finally: - if response: - response.close() - response.release_conn() + if response.headers: + content_encoding = response.headers.get("Content-Encoding", None) + if content_encoding == "zstd": + # we have to manually decompress zstandard compressed data + cctx = zstandard.ZstdDecompressor() + # if the object passed to this has a read method then that's + # all this object will ever need, since it will just call read + # and get the bytes object resulting from it then compress that + # HTTPResponse + reader = cctx.stream_reader(cast(IO[bytes], response)) + else: + reader = response + else: + reader = response + + if file_obj: + file_obj.seek(0) + while chunk := reader.read(CHUNK_SIZE): + file_obj.write(chunk) + response.close() + response.release_conn() + return None + else: + res = BytesIO() + while chunk := reader.read(CHUNK_SIZE): + res.write(chunk) + response.close() + response.release_conn() + return res.getvalue() """ Deletes file url in specified bucket. diff --git a/tests/unit/storage/test_minio.py b/tests/unit/storage/test_minio.py index facd301e..c219830b 100644 --- a/tests/unit/storage/test_minio.py +++ b/tests/unit/storage/test_minio.py @@ -115,6 +115,50 @@ def test_write_then_read_file_obj(): assert f.read().decode() == data +def test_write_then_read_file_obj_gzip(): + storage = make_storage() + path = f"test_write_then_read_file_gzip/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, compression_type="gzip" + ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_no_compression(): + storage = make_storage() + path = f"test_write_then_read_file_no_compression/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file(BUCKET_NAME, path, f, compression_type=None) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + def test_write_then_read_file_obj_x_gzip(): storage = make_storage() path = f"test_write_then_read_file_obj_x_gzip/{uuid4().hex}" diff --git a/uv.lock b/uv.lock index f94cccc8..fcda4f1b 100644 --- a/uv.lock +++ b/uv.lock @@ -1484,7 +1484,7 @@ requires-dist = [ { name = "requests", specifier = ">=2.32.3" }, { name = "sentry-sdk", specifier = ">=2.18.0" }, { name = "sqlalchemy", specifier = "<2" }, - { name = "zstandard", specifier = "==0.23.0" }, + { name = "zstandard", specifier = ">=0.23.0" }, ] [package.metadata.requires-dev] From 200f3fc720ce7ac8090931692f39534d13685b53 Mon Sep 17 00:00:00 2001 From: joseph-sentry Date: Mon, 2 Dec 2024 10:24:37 -0500 Subject: [PATCH 4/6] fix(minio): check urllib3 version in read_file this is because if urllib3 is >= 2.0.0 and the zstd extra is installed then it is capable (and will) decode zstd encoded data when it's used in get_object so when we create the MinioStorageService we check the urllib3 version and we check if it's been installed with the zstd extra this commit also adds a test to ensure that the gzip compression and decompression used in the GzipStreamReader actually works --- shared/storage/minio.py | 23 ++++++++++++++++++++++- tests/unit/storage/test_minio.py | 18 +++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/shared/storage/minio.py b/shared/storage/minio.py index 8184a9ef..1e8789c8 100644 --- a/shared/storage/minio.py +++ b/shared/storage/minio.py @@ -1,4 +1,5 @@ import gzip +import importlib.metadata import json import logging import sys @@ -37,9 +38,29 @@ def read(self, size: int = -1, /) -> bytes: return gzip.compress(curr_data) +def zstd_decoded_by_default() -> bool: + try: + version = importlib.metadata.version("urllib3") + except importlib.metadata.PackageNotFoundError: + return False + + if version < "2.0.0": + return False + + distribution = importlib.metadata.metadata("urllib3") + if requires_dist := distribution.get_all("Requires-Dist"): + for req in requires_dist: + if "[zstd]" in req: + return True + + return False + + # Service class for interfacing with codecov's underlying storage layer, minio class MinioStorageService(BaseStorageService): def __init__(self, minio_config): + self.zstd_default = zstd_decoded_by_default() + self.minio_config = minio_config log.debug("Connecting to minio with config %s", self.minio_config) @@ -242,7 +263,7 @@ def read_file( raise e if response.headers: content_encoding = response.headers.get("Content-Encoding", None) - if content_encoding == "zstd": + if not self.zstd_default and content_encoding == "zstd": # we have to manually decompress zstandard compressed data cctx = zstandard.ZstdDecompressor() # if the object passed to this has a read method then that's diff --git a/tests/unit/storage/test_minio.py b/tests/unit/storage/test_minio.py index c219830b..9c502ee3 100644 --- a/tests/unit/storage/test_minio.py +++ b/tests/unit/storage/test_minio.py @@ -7,11 +7,27 @@ import zstandard from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError -from shared.storage.minio import MinioStorageService +from shared.storage.minio import MinioStorageService, zstd_decoded_by_default BUCKET_NAME = "archivetest" +def test_zstd_by_default(): + assert not zstd_decoded_by_default() + + +def test_gzip_stream_compression(): + data = "lorem ipsum dolor test_write_then_read_file á" + + split_data = [data[i : i + 5] for i in range(0, len(data), 5)] + + compressed_pieces: list[bytes] = [ + gzip.compress(piece.encode()) for piece in split_data + ] + + assert gzip.decompress(b"".join(compressed_pieces)) == data.encode() + + def make_storage() -> MinioStorageService: return MinioStorageService( { From f30ca66f828f00d58b8430bc2efad04625f82658 Mon Sep 17 00:00:00 2001 From: joseph-sentry Date: Tue, 3 Dec 2024 16:22:35 -0500 Subject: [PATCH 5/6] feat: add feature flag for new minio storage instead of doing a 0-100 launch of the new minio storage service i'd like to have it so we incrementally ship it using a feature flag. so if a repoid is passed to the get_appropriate_storage_service function and the chosen storage is minio, then it will check the use_new_minio feature to decide whether to use the new or old minio storage service as mentioned this will be decided via the repoid (to reduce the impact IF it is broken) changes had to be made to avoid circular imports in the model_utils and rollout_utils files --- shared/django_apps/rollouts/models.py | 2 +- shared/django_apps/utils/model_utils.py | 20 -- shared/django_apps/utils/rollout_utils.py | 21 ++ shared/rollouts/__init__.py | 2 +- shared/rollouts/features.py | 1 + shared/storage/__init__.py | 14 +- shared/storage/new_minio.py | 330 ++++++++++++++++++ tests/unit/storage/test_init.py | 30 ++ tests/unit/storage/test_new_minio.py | 396 ++++++++++++++++++++++ 9 files changed, 790 insertions(+), 26 deletions(-) create mode 100644 shared/django_apps/utils/rollout_utils.py create mode 100644 shared/storage/new_minio.py create mode 100644 tests/unit/storage/test_new_minio.py diff --git a/shared/django_apps/rollouts/models.py b/shared/django_apps/rollouts/models.py index 2c054657..59394003 100644 --- a/shared/django_apps/rollouts/models.py +++ b/shared/django_apps/rollouts/models.py @@ -23,7 +23,7 @@ class RolloutUniverse(models.TextChoices): def default_random_salt(): # to resolve circular dependency - from shared.django_apps.utils.model_utils import default_random_salt + from shared.django_apps.utils.rollout_utils import default_random_salt return default_random_salt() diff --git a/shared/django_apps/utils/model_utils.py b/shared/django_apps/utils/model_utils.py index efb30a14..1067cbe6 100644 --- a/shared/django_apps/utils/model_utils.py +++ b/shared/django_apps/utils/model_utils.py @@ -1,10 +1,8 @@ import json import logging -from random import choice from typing import Any, Callable, Optional from shared.api_archive.archive import ArchiveService -from shared.django_apps.rollouts.models import RolloutUniverse from shared.storage.exceptions import FileNotInStorageError from shared.utils.ReportEncoder import ReportEncoder @@ -148,24 +146,6 @@ def __set__(self, obj, value): setattr(obj, self.cached_value_property_name, value) -def default_random_salt(): - ALPHABET = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - return "".join([choice(ALPHABET) for _ in range(16)]) - - -def rollout_universe_to_override_string(rollout_universe: RolloutUniverse): - if rollout_universe == RolloutUniverse.OWNER_ID: - return "override_owner_ids" - elif rollout_universe == RolloutUniverse.REPO_ID: - return "override_repo_ids" - elif rollout_universe == RolloutUniverse.EMAIL: - return "override_emails" - elif rollout_universe == RolloutUniverse.ORG_ID: - return "override_org_ids" - else: - return "" - - # This is the place for DB trigger logic that's been moved into code # Owner def get_ownerid_if_member( diff --git a/shared/django_apps/utils/rollout_utils.py b/shared/django_apps/utils/rollout_utils.py new file mode 100644 index 00000000..f4a63561 --- /dev/null +++ b/shared/django_apps/utils/rollout_utils.py @@ -0,0 +1,21 @@ +from random import choice + +from shared.django_apps.rollouts.models import RolloutUniverse + + +def default_random_salt(): + ALPHABET = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + return "".join([choice(ALPHABET) for _ in range(16)]) + + +def rollout_universe_to_override_string(rollout_universe: RolloutUniverse): + if rollout_universe == RolloutUniverse.OWNER_ID: + return "override_owner_ids" + elif rollout_universe == RolloutUniverse.REPO_ID: + return "override_repo_ids" + elif rollout_universe == RolloutUniverse.EMAIL: + return "override_emails" + elif rollout_universe == RolloutUniverse.ORG_ID: + return "override_org_ids" + else: + return "" diff --git a/shared/rollouts/__init__.py b/shared/rollouts/__init__.py index c6290995..94b064d0 100644 --- a/shared/rollouts/__init__.py +++ b/shared/rollouts/__init__.py @@ -17,7 +17,7 @@ Platform, RolloutUniverse, ) -from shared.django_apps.utils.model_utils import rollout_universe_to_override_string +from shared.django_apps.utils.rollout_utils import rollout_universe_to_override_string log = logging.getLogger("__name__") diff --git a/shared/rollouts/features.py b/shared/rollouts/features.py index 8e566462..611c907a 100644 --- a/shared/rollouts/features.py +++ b/shared/rollouts/features.py @@ -2,3 +2,4 @@ BUNDLE_THRESHOLD_FLAG = Feature("bundle_threshold_flag") INCLUDE_GITHUB_COMMENT_ACTIONS_BY_OWNER = Feature("include_github_comment_actions") +USE_NEW_MINIO = Feature("use_new_minio") diff --git a/shared/storage/__init__.py b/shared/storage/__init__.py index edfc5388..2d905c8b 100644 --- a/shared/storage/__init__.py +++ b/shared/storage/__init__.py @@ -1,18 +1,22 @@ from shared.config import get_config +from shared.rollouts.features import USE_NEW_MINIO from shared.storage.aws import AWSStorageService from shared.storage.base import BaseStorageService from shared.storage.fallback import StorageWithFallbackService from shared.storage.gcp import GCPStorageService from shared.storage.minio import MinioStorageService +from shared.storage.new_minio import NewMinioStorageService -def get_appropriate_storage_service() -> BaseStorageService: - chosen_storage = get_config("services", "chosen_storage", default="minio") - return _get_appropriate_storage_service_given_storage(chosen_storage) +def get_appropriate_storage_service( + repoid: int | None = None, +) -> BaseStorageService: + chosen_storage: str = get_config("services", "chosen_storage", default="minio") # type: ignore + return _get_appropriate_storage_service_given_storage(chosen_storage, repoid) def _get_appropriate_storage_service_given_storage( - chosen_storage: str, + chosen_storage: str, repoid: int | None ) -> BaseStorageService: if chosen_storage == "gcp": gcp_config = get_config("services", "gcp", default={}) @@ -28,4 +32,6 @@ def _get_appropriate_storage_service_given_storage( return StorageWithFallbackService(gcp_service, aws_service) else: minio_config = get_config("services", "minio", default={}) + if repoid and USE_NEW_MINIO.check_value(repoid, default=False): + return NewMinioStorageService(minio_config) return MinioStorageService(minio_config) diff --git a/shared/storage/new_minio.py b/shared/storage/new_minio.py new file mode 100644 index 00000000..da52f0c6 --- /dev/null +++ b/shared/storage/new_minio.py @@ -0,0 +1,330 @@ +import gzip +import importlib.metadata +import json +import logging +import sys +from io import BytesIO +from typing import IO, BinaryIO, Tuple, cast, overload + +import zstandard +from minio import Minio +from minio.credentials.providers import ( + ChainedProvider, + EnvAWSProvider, + EnvMinioProvider, + IamAwsProvider, +) +from minio.deleteobjects import DeleteObject +from minio.error import MinioException, S3Error +from minio.helpers import ObjectWriteResult +from urllib3 import HTTPResponse + +from shared.storage.base import CHUNK_SIZE, PART_SIZE, BaseStorageService +from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError + +log = logging.getLogger(__name__) + + +class GZipStreamReader: + def __init__(self, fileobj: IO[bytes]): + self.data = fileobj + + def read(self, size: int = -1, /) -> bytes: + curr_data = self.data.read(size) + + if not curr_data: + return b"" + + return gzip.compress(curr_data) + + +def zstd_decoded_by_default() -> bool: + try: + version = importlib.metadata.version("urllib3") + except importlib.metadata.PackageNotFoundError: + return False + + if version < "2.0.0": + return False + + distribution = importlib.metadata.metadata("urllib3") + if requires_dist := distribution.get_all("Requires-Dist"): + for req in requires_dist: + if "[zstd]" in req: + return True + + return False + + +# Service class for interfacing with codecov's underlying storage layer, minio +class NewMinioStorageService(BaseStorageService): + def __init__(self, minio_config): + self.zstd_default = zstd_decoded_by_default() + + self.minio_config = minio_config + log.debug("Connecting to minio with config %s", self.minio_config) + + self.minio_client = self.init_minio_client( + self.minio_config["host"], + self.minio_config.get("port"), + self.minio_config["access_key_id"], + self.minio_config["secret_access_key"], + self.minio_config["verify_ssl"], + self.minio_config.get("iam_auth", False), + self.minio_config["iam_endpoint"], + self.minio_config.get("region"), + ) + log.debug("Done setting up minio client") + + def client(self): + return self.minio_client if self.minio_client else None + + def init_minio_client( + self, + host: str, + port: str, + access_key: str | None = None, + secret_key: str | None = None, + verify_ssl: bool = False, + iam_auth: bool = False, + iam_endpoint: str | None = None, + region: str | None = None, + ): + """ + Initialize the minio client + + `iam_auth` adds support for IAM base authentication in a fallback pattern. + The following will be checked in order: + + * EC2 metadata -- a custom endpoint can be provided, default is None. + * Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY + * AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY + + to support backward compatibility, the iam_auth setting should be used + in the installation configuration + + Args: + host (str): The address of the host where minio lives + + port (str): The port number (as str or int should be ok) + access_key (str, optional): The access key (optional if IAM is being used) + secret_key (str, optional): The secret key (optional if IAM is being used) + verify_ssl (bool, optional): Whether minio should verify ssl + iam_auth (bool, optional): Whether to use iam_auth + iam_endpoint (str, optional): The endpoint to try to fetch EC2 metadata + region (str, optional): The region of the host where minio lives + """ + if port is not None: + host = "{}:{}".format(host, port) + + if iam_auth: + return Minio( + host, + secure=verify_ssl, + region=region, + credentials=ChainedProvider( + providers=[ + IamAwsProvider(custom_endpoint=iam_endpoint), + EnvMinioProvider(), + EnvAWSProvider(), + ] + ), + ) + return Minio( + host, + access_key=access_key, + secret_key=secret_key, + secure=verify_ssl, + region=region, + ) + + # writes the initial storage bucket to storage via minio. + def create_root_storage(self, bucket_name="archive", region="us-east-1"): + read_only_policy = { + "Statement": [ + { + "Action": ["s3:GetObject"], + "Effect": "Allow", + "Principal": {"AWS": ["*"]}, + "Resource": [f"arn:aws:s3:::{bucket_name}/*"], + } + ], + "Version": "2012-10-17", + } + try: + if not self.minio_client.bucket_exists(bucket_name): + log.debug( + "Making bucket on bucket %s on location %s", bucket_name, region + ) + self.minio_client.make_bucket(bucket_name, location=region) + log.debug("Setting policy") + self.minio_client.set_bucket_policy( + bucket_name, json.dumps(read_only_policy) + ) + log.debug("Done creating root storage") + return {"name": bucket_name} + else: + raise BucketAlreadyExistsError(f"Bucket {bucket_name} already exists") + # todo should only pass or raise + except S3Error as e: + if e.code == "BucketAlreadyOwnedByYou": + raise BucketAlreadyExistsError(f"Bucket {bucket_name} already exists") + elif e.code == "BucketAlreadyExists": + pass + raise + except MinioException: + raise + + # Writes a file to storage will gzip if not compressed already + def write_file( + self, + bucket_name: str, + path: str, + data: IO[bytes] | str | bytes, + reduced_redundancy: bool = False, + *, + is_already_gzipped: bool = False, # deprecated + is_compressed: bool = False, + compression_type: str | None = "zstd", + ) -> ObjectWriteResult: + if isinstance(data, str): + data = BytesIO(data.encode()) + elif isinstance(data, (bytes, bytearray, memoryview)): + data = BytesIO(data) + + if is_already_gzipped: + is_compressed = True + compression_type = "gzip" + + if is_compressed: + result = data + else: + if compression_type == "zstd": + cctx = zstandard.ZstdCompressor() + result = cctx.stream_reader(data) + + elif compression_type == "gzip": + result = GZipStreamReader(data) + + else: + result = data + + headers: dict[str, str | list[str] | Tuple[str]] = {} + + if compression_type: + headers["Content-Encoding"] = compression_type + + if reduced_redundancy: + headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" + + # it's safe to do a BinaryIO cast here because we know that put_object only uses a function of the shape: + # read(self, size: int = -1, /) -> bytes + # GZipStreamReader implements this (we did it ourselves) + # ZstdCompressionReader implements read(): https://github.com/indygreg/python-zstandard/blob/12a80fac558820adf43e6f16206120685b9eb880/zstandard/__init__.pyi#L233C5-L233C49 + # BytesIO implements read(): https://docs.python.org/3/library/io.html#io.BufferedReader.read + # IO[bytes] implements read(): https://github.com/python/cpython/blob/3.13/Lib/typing.py#L3502 + + return self.minio_client.put_object( + bucket_name, + path, + cast(BinaryIO, result), + -1, + metadata=headers, + content_type="text/plain", + part_size=PART_SIZE, + ) + + @overload + def read_file( + self, bucket_name: str, path: str, file_obj: None = None + ) -> bytes: ... + + @overload + def read_file(self, bucket_name: str, path: str, file_obj: IO[bytes]) -> None: ... + + def read_file( + self, bucket_name: str, path: str, file_obj: IO[bytes] | None = None + ) -> bytes | None: + headers: dict[str, str | list[str] | Tuple[str]] = { + "Accept-Encoding": "gzip, zstd" + } + try: + response = cast( + HTTPResponse, + self.minio_client.get_object( # this returns an HTTPResponse + bucket_name, path, request_headers=headers + ), + ) + except S3Error as e: + if e.code == "NoSuchKey": + raise FileNotInStorageError( + f"File {path} does not exist in {bucket_name}" + ) + raise e + if response.headers: + content_encoding = response.headers.get("Content-Encoding", None) + if not self.zstd_default and content_encoding == "zstd": + # we have to manually decompress zstandard compressed data + cctx = zstandard.ZstdDecompressor() + # if the object passed to this has a read method then that's + # all this object will ever need, since it will just call read + # and get the bytes object resulting from it then compress that + # HTTPResponse + reader = cctx.stream_reader(cast(IO[bytes], response)) + else: + reader = response + else: + reader = response + + if file_obj: + file_obj.seek(0) + while chunk := reader.read(CHUNK_SIZE): + file_obj.write(chunk) + response.close() + response.release_conn() + return None + else: + res = BytesIO() + while chunk := reader.read(CHUNK_SIZE): + res.write(chunk) + response.close() + response.release_conn() + return res.getvalue() + + """ + Deletes file url in specified bucket. + Return true on successful + deletion, returns a ResponseError otherwise. + """ + + def delete_file(self, bucket_name, url): + try: + # delete a file given a bucket name and a url + self.minio_client.remove_object(bucket_name, url) + return True + except MinioException: + raise + + def delete_files(self, bucket_name, urls=[]): + try: + for del_err in self.minio_client.remove_objects( + bucket_name, [DeleteObject(url) for url in urls] + ): + print("Deletion error: {}".format(del_err)) + return [True] * len(urls) + except MinioException: + raise + + def list_folder_contents(self, bucket_name, prefix=None, recursive=True): + return ( + self.object_to_dict(b) + for b in self.minio_client.list_objects(bucket_name, prefix, recursive) + ) + + def object_to_dict(self, obj): + return {"name": obj.object_name, "size": obj.size} + + # TODO remove this function -- just using it for output during testing. + def write(self, string, silence=False): + if not silence: + sys.stdout.write((string or "") + "\n") diff --git a/tests/unit/storage/test_init.py b/tests/unit/storage/test_init.py index c7ae6298..e9a62501 100644 --- a/tests/unit/storage/test_init.py +++ b/tests/unit/storage/test_init.py @@ -1,8 +1,10 @@ +from shared.rollouts.features import USE_NEW_MINIO from shared.storage import get_appropriate_storage_service from shared.storage.aws import AWSStorageService from shared.storage.fallback import StorageWithFallbackService from shared.storage.gcp import GCPStorageService from shared.storage.minio import MinioStorageService +from shared.storage.new_minio import NewMinioStorageService fake_private_key = """-----BEGIN PRIVATE KEY----- MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCnND/Neha4aNJ6 @@ -110,3 +112,31 @@ def test_get_appropriate_storage_service_minio(self, mock_configuration): res = get_appropriate_storage_service() assert isinstance(res, MinioStorageService) assert res.minio_config == minio_config + + def test_get_appropriate_storage_service_new_minio( + self, mock_configuration, mocker + ): + mock_configuration.params["services"] = { + "chosen_storage": "minio", + "gcp": gcp_config, + "aws": aws_config, + "minio": minio_config, + } + mocker.patch.object(USE_NEW_MINIO, "check_value", return_value=True) + res = get_appropriate_storage_service(repoid=123) + assert isinstance(res, NewMinioStorageService) + assert res.minio_config == minio_config + + def test_get_appropriate_storage_service_new_minio_false( + self, mock_configuration, mocker + ): + mock_configuration.params["services"] = { + "chosen_storage": "minio", + "gcp": gcp_config, + "aws": aws_config, + "minio": minio_config, + } + mocker.patch.object(USE_NEW_MINIO, "check_value", return_value=False) + res = get_appropriate_storage_service(repoid=123) + assert isinstance(res, MinioStorageService) + assert res.minio_config == minio_config diff --git a/tests/unit/storage/test_new_minio.py b/tests/unit/storage/test_new_minio.py new file mode 100644 index 00000000..2d3057a9 --- /dev/null +++ b/tests/unit/storage/test_new_minio.py @@ -0,0 +1,396 @@ +import gzip +import tempfile +from io import BytesIO +from uuid import uuid4 + +import pytest +import zstandard + +from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError +from shared.storage.new_minio import NewMinioStorageService, zstd_decoded_by_default + +BUCKET_NAME = "archivetest" + + +def test_zstd_by_default(): + assert not zstd_decoded_by_default() + + +def test_gzip_stream_compression(): + data = "lorem ipsum dolor test_write_then_read_file á" + + split_data = [data[i : i + 5] for i in range(0, len(data), 5)] + + compressed_pieces: list[bytes] = [ + gzip.compress(piece.encode()) for piece in split_data + ] + + assert gzip.decompress(b"".join(compressed_pieces)) == data.encode() + + +def make_storage() -> NewMinioStorageService: + return NewMinioStorageService( + { + "access_key_id": "codecov-default-key", + "secret_access_key": "codecov-default-secret", + "verify_ssl": False, + "host": "minio", + "port": "9000", + "iam_auth": False, + "iam_endpoint": None, + } + ) + + +def ensure_bucket(storage: NewMinioStorageService): + try: + storage.create_root_storage(BUCKET_NAME) + except Exception: + pass + + +def test_create_bucket(): + storage = make_storage() + bucket_name = uuid4().hex + + res = storage.create_root_storage(bucket_name, region="") + assert res == {"name": bucket_name} + + +def test_create_bucket_already_exists(): + storage = make_storage() + bucket_name = uuid4().hex + + storage.create_root_storage(bucket_name) + with pytest.raises(BucketAlreadyExistsError): + storage.create_root_storage(bucket_name) + + +def test_write_then_read_file(): + storage = make_storage() + path = f"test_write_then_read_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + writing_result = storage.write_file(BUCKET_NAME, path, data) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == data + + +def test_write_then_read_file_already_gzipped(): + storage = make_storage() + path = f"test_write_then_read_file_already_gzipped/{uuid4().hex}" + data = BytesIO( + gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + writing_result = storage.write_file( + BUCKET_NAME, path, data, is_already_gzipped=True + ) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_already_zstd(): + storage = make_storage() + path = f"test_write_then_read_file_already_zstd/{uuid4().hex}" + data = BytesIO( + zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + writing_result = storage.write_file( + BUCKET_NAME, path, data, compression_type="zstd", is_compressed=True + ) + assert writing_result + reading_result = storage.read_file(BUCKET_NAME, path) + assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj(): + storage = make_storage() + path = f"test_write_then_read_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file(BUCKET_NAME, path, f) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_gzip(): + storage = make_storage() + path = f"test_write_then_read_file_gzip/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, compression_type="gzip" + ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_no_compression(): + storage = make_storage() + path = f"test_write_then_read_file_no_compression/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_read_file á" + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + with open(local_path, "rb") as f: + writing_result = storage.write_file(BUCKET_NAME, path, f, compression_type=None) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + +def test_write_then_read_file_obj_x_gzip(): + storage = make_storage() + path = f"test_write_then_read_file_obj_x_gzip/{uuid4().hex}" + compressed = gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + outsize = len(compressed) + data = BytesIO(compressed) + + ensure_bucket(storage) + + headers = {"Content-Encoding": "gzip"} + storage.minio_client.put_object( + BUCKET_NAME, + path, + data, + content_type="application/x-gzip", + metadata=headers, + length=outsize, + ) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj_already_gzipped(): + storage = make_storage() + path = f"test_write_then_read_file_obj_already_gzipped/{uuid4().hex}" + data = BytesIO( + gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + f.write(data.getvalue()) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, is_already_gzipped=True + ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_write_then_read_file_obj_already_zstd(): + storage = make_storage() + path = f"test_write_then_read_file_obj_already_zstd/{uuid4().hex}" + data = BytesIO( + zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) + ) + + ensure_bucket(storage) + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + f.write(data.getvalue()) + with open(local_path, "rb") as f: + writing_result = storage.write_file( + BUCKET_NAME, path, f, is_compressed=True, compression_type="zstd" + ) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(BUCKET_NAME, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" + + +def test_read_file_does_not_exist(): + storage = make_storage() + path = f"test_read_file_does_not_exist/{uuid4().hex}" + + ensure_bucket(storage) + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, path) + + +def test_write_then_delete_file(): + storage = make_storage() + path = f"test_write_then_delete_file/{uuid4().hex}" + data = "lorem ipsum dolor test_write_then_delete_file á" + + ensure_bucket(storage) + writing_result = storage.write_file(BUCKET_NAME, path, data) + assert writing_result + + deletion_result = storage.delete_file(BUCKET_NAME, path) + assert deletion_result is True + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, path) + + +def test_batch_delete_files(): + storage = make_storage() + path = f"test_batch_delete_files/{uuid4().hex}" + path_1 = f"{path}/result_1.txt" + path_2 = f"{path}/result_2.txt" + path_3 = f"{path}/result_3.txt" + paths = [path_1, path_2, path_3] + data = "lorem ipsum dolor test_batch_delete_files á" + + ensure_bucket(storage) + storage.write_file(BUCKET_NAME, path_1, data) + storage.write_file(BUCKET_NAME, path_3, data) + + deletion_result = storage.delete_files(BUCKET_NAME, paths) + assert deletion_result == [True, True, True] + for p in paths: + with pytest.raises(FileNotInStorageError): + storage.read_file(BUCKET_NAME, p) + + +def test_list_folder_contents(): + storage = make_storage() + path = f"test_list_folder_contents/{uuid4().hex}" + path_1 = "/result_1.txt" + path_2 = "/result_2.txt" + path_3 = "/result_3.txt" + path_4 = "/x1/result_1.txt" + path_5 = "/x1/result_2.txt" + path_6 = "/x1/result_3.txt" + all_paths = [path_1, path_2, path_3, path_4, path_5, path_6] + + ensure_bucket(storage) + for i, p in enumerate(all_paths): + data = f"Lorem ipsum on file {p} for {i * 'po'}" + storage.write_file(BUCKET_NAME, f"{path}{p}", data) + + results_1 = sorted( + storage.list_folder_contents(BUCKET_NAME, path), + key=lambda x: x["name"], + ) + # NOTE: the `size` here is actually the compressed (currently gzip) size + assert results_1 == [ + {"name": f"{path}{path_1}", "size": 47}, + {"name": f"{path}{path_2}", "size": 49}, + {"name": f"{path}{path_3}", "size": 51}, + {"name": f"{path}{path_4}", "size": 56}, + {"name": f"{path}{path_5}", "size": 58}, + {"name": f"{path}{path_6}", "size": 60}, + ] + + results_2 = sorted( + storage.list_folder_contents(BUCKET_NAME, f"{path}/x1"), + key=lambda x: x["name"], + ) + assert results_2 == [ + {"name": f"{path}{path_4}", "size": 56}, + {"name": f"{path}{path_5}", "size": 58}, + {"name": f"{path}{path_6}", "size": 60}, + ] + + +def test_minio_without_ports(mocker): + mocked_minio_client = mocker.patch("shared.storage.new_minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "iam_auth": True, + "iam_endpoint": None, + } + + storage = NewMinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports", credentials=mocker.ANY, secure=False, region=None + ) + + +def test_minio_with_ports(mocker): + mocked_minio_client = mocker.patch("shared.storage.new_minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + } + + storage = NewMinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", credentials=mocker.ANY, secure=False, region=None + ) + + +def test_minio_with_region(mocker): + mocked_minio_client = mocker.patch("shared.storage.new_minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + "region": "example", + } + + storage = NewMinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", + credentials=mocker.ANY, + secure=False, + region="example", + ) From 54bfa03d146ea887732519d37f5eacdeb2ea7fda Mon Sep 17 00:00:00 2001 From: joseph-sentry Date: Wed, 4 Dec 2024 17:16:32 -0500 Subject: [PATCH 6/6] fix: revert changes to old minio --- shared/storage/minio.py | 208 ++++------- tests/unit/storage/test_minio.py | 589 +++++++++++-------------------- 2 files changed, 284 insertions(+), 513 deletions(-) diff --git a/shared/storage/minio.py b/shared/storage/minio.py index 1e8789c8..58d51c61 100644 --- a/shared/storage/minio.py +++ b/shared/storage/minio.py @@ -1,14 +1,15 @@ import gzip -import importlib.metadata import json import logging +import os +import shutil import sys +import tempfile from io import BytesIO -from typing import IO, BinaryIO, Tuple, cast, overload +from typing import BinaryIO, overload -import zstandard from minio import Minio -from minio.credentials.providers import ( +from minio.credentials import ( ChainedProvider, EnvAWSProvider, EnvMinioProvider, @@ -16,51 +17,16 @@ ) from minio.deleteobjects import DeleteObject from minio.error import MinioException, S3Error -from minio.helpers import ObjectWriteResult -from urllib3 import HTTPResponse -from shared.storage.base import CHUNK_SIZE, PART_SIZE, BaseStorageService +from shared.storage.base import CHUNK_SIZE, BaseStorageService from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError log = logging.getLogger(__name__) -class GZipStreamReader: - def __init__(self, fileobj: IO[bytes]): - self.data = fileobj - - def read(self, size: int = -1, /) -> bytes: - curr_data = self.data.read(size) - - if not curr_data: - return b"" - - return gzip.compress(curr_data) - - -def zstd_decoded_by_default() -> bool: - try: - version = importlib.metadata.version("urllib3") - except importlib.metadata.PackageNotFoundError: - return False - - if version < "2.0.0": - return False - - distribution = importlib.metadata.metadata("urllib3") - if requires_dist := distribution.get_all("Requires-Dist"): - for req in requires_dist: - if "[zstd]" in req: - return True - - return False - - # Service class for interfacing with codecov's underlying storage layer, minio class MinioStorageService(BaseStorageService): def __init__(self, minio_config): - self.zstd_default = zstd_decoded_by_default() - self.minio_config = minio_config log.debug("Connecting to minio with config %s", self.minio_config) @@ -91,21 +57,20 @@ def init_minio_client( region: str | None = None, ): """ - Initialize the minio client + Initialize the minio client `iam_auth` adds support for IAM base authentication in a fallback pattern. - The following will be checked in order: + The following will be checked in order: * EC2 metadata -- a custom endpoint can be provided, default is None. - * Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY * AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY + * Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY - to support backward compatibility, the iam_auth setting should be used - in the installation configuration + to support backward compatibility, the iam_auth setting should be used in the installation + configuration Args: host (str): The address of the host where minio lives - port (str): The port number (as str or int should be ok) access_key (str, optional): The access key (optional if IAM is being used) secret_key (str, optional): The secret key (optional if IAM is being used) @@ -178,118 +143,89 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"): # Writes a file to storage will gzip if not compressed already def write_file( self, - bucket_name: str, - path: str, - data: IO[bytes] | str | bytes, - reduced_redundancy: bool = False, + bucket_name, + path, + data, + reduced_redundancy=False, *, - is_already_gzipped: bool = False, # deprecated - is_compressed: bool = False, - compression_type: str | None = "zstd", - ) -> ObjectWriteResult: + is_already_gzipped: bool = False, + ): if isinstance(data, str): - data = BytesIO(data.encode()) - elif isinstance(data, (bytes, bytearray, memoryview)): - data = BytesIO(data) - - if is_already_gzipped: - is_compressed = True - compression_type = "gzip" + data = data.encode() + + out: BinaryIO + if isinstance(data, bytes): + if not is_already_gzipped: + out = BytesIO() + with gzip.GzipFile(fileobj=out, mode="w", compresslevel=9) as gz: + gz.write(data) + else: + out = BytesIO(data) - if is_compressed: - result = data + # get file size + out.seek(0, os.SEEK_END) + out_size = out.tell() else: - if compression_type == "zstd": - cctx = zstandard.ZstdCompressor() - result = cctx.stream_reader(data) - - elif compression_type == "gzip": - result = GZipStreamReader(data) - + # data is already a file-like object + if not is_already_gzipped: + _, filename = tempfile.mkstemp() + with gzip.open(filename, "wb") as f: + shutil.copyfileobj(data, f) + out = open(filename, "rb") else: - result = data + out = data - headers: dict[str, str | list[str] | Tuple[str]] = {} + out_size = os.stat(filename).st_size - if compression_type: - headers["Content-Encoding"] = compression_type - - if reduced_redundancy: - headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" - - # it's safe to do a BinaryIO cast here because we know that put_object only uses a function of the shape: - # read(self, size: int = -1, /) -> bytes - # GZipStreamReader implements this (we did it ourselves) - # ZstdCompressionReader implements read(): https://github.com/indygreg/python-zstandard/blob/12a80fac558820adf43e6f16206120685b9eb880/zstandard/__init__.pyi#L233C5-L233C49 - # BytesIO implements read(): https://docs.python.org/3/library/io.html#io.BufferedReader.read - # IO[bytes] implements read(): https://github.com/python/cpython/blob/3.13/Lib/typing.py#L3502 + try: + # reset pos for minio reading. + out.seek(0) + + headers = {"Content-Encoding": "gzip"} + if reduced_redundancy: + headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" + self.minio_client.put_object( + bucket_name, + path, + out, + out_size, + metadata=headers, + content_type="text/plain", + ) + return True - return self.minio_client.put_object( - bucket_name, - path, - cast(BinaryIO, result), - -1, - metadata=headers, - content_type="text/plain", - part_size=PART_SIZE, - ) + except MinioException: + raise @overload - def read_file( - self, bucket_name: str, path: str, file_obj: None = None - ) -> bytes: ... + def read_file(self, bucket_name: str, path: str) -> bytes: ... @overload - def read_file(self, bucket_name: str, path: str, file_obj: IO[bytes]) -> None: ... + def read_file(self, bucket_name: str, path: str, file_obj: BinaryIO) -> None: ... def read_file( - self, bucket_name: str, path: str, file_obj: IO[bytes] | None = None + self, bucket_name: str, path: str, file_obj: BinaryIO | None = None ) -> bytes | None: - headers: dict[str, str | list[str] | Tuple[str]] = { - "Accept-Encoding": "gzip, zstd" - } try: - response = cast( - HTTPResponse, - self.minio_client.get_object( # this returns an HTTPResponse - bucket_name, path, request_headers=headers - ), - ) + res = self.minio_client.get_object(bucket_name, path) + if file_obj is None: + data = BytesIO() + for d in res.stream(CHUNK_SIZE): + data.write(d) + data.seek(0) + return data.getvalue() + else: + for d in res.stream(CHUNK_SIZE): + file_obj.write(d) + return None except S3Error as e: if e.code == "NoSuchKey": raise FileNotInStorageError( f"File {path} does not exist in {bucket_name}" ) raise e - if response.headers: - content_encoding = response.headers.get("Content-Encoding", None) - if not self.zstd_default and content_encoding == "zstd": - # we have to manually decompress zstandard compressed data - cctx = zstandard.ZstdDecompressor() - # if the object passed to this has a read method then that's - # all this object will ever need, since it will just call read - # and get the bytes object resulting from it then compress that - # HTTPResponse - reader = cctx.stream_reader(cast(IO[bytes], response)) - else: - reader = response - else: - reader = response - - if file_obj: - file_obj.seek(0) - while chunk := reader.read(CHUNK_SIZE): - file_obj.write(chunk) - response.close() - response.release_conn() - return None - else: - res = BytesIO() - while chunk := reader.read(CHUNK_SIZE): - res.write(chunk) - response.close() - response.release_conn() - return res.getvalue() + except MinioException: + raise """ Deletes file url in specified bucket. diff --git a/tests/unit/storage/test_minio.py b/tests/unit/storage/test_minio.py index 9c502ee3..50fe478d 100644 --- a/tests/unit/storage/test_minio.py +++ b/tests/unit/storage/test_minio.py @@ -1,396 +1,231 @@ -import gzip +import os import tempfile -from io import BytesIO -from uuid import uuid4 import pytest -import zstandard from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError -from shared.storage.minio import MinioStorageService, zstd_decoded_by_default - -BUCKET_NAME = "archivetest" - - -def test_zstd_by_default(): - assert not zstd_decoded_by_default() - - -def test_gzip_stream_compression(): - data = "lorem ipsum dolor test_write_then_read_file á" - - split_data = [data[i : i + 5] for i in range(0, len(data), 5)] - - compressed_pieces: list[bytes] = [ - gzip.compress(piece.encode()) for piece in split_data - ] - - assert gzip.decompress(b"".join(compressed_pieces)) == data.encode() - +from shared.storage.minio import MinioStorageService +from tests.base import BaseTestCase + +minio_config = { + "access_key_id": "codecov-default-key", + "secret_access_key": "codecov-default-secret", + "verify_ssl": False, + "host": "minio", + "port": "9000", + "iam_auth": False, + "iam_endpoint": None, +} + + +class TestMinioStorageService(BaseTestCase): + def test_create_bucket(self, codecov_vcr): + storage = MinioStorageService(minio_config) + bucket_name = "archivetest" + res = storage.create_root_storage(bucket_name, region="") + assert res == {"name": "archivetest"} + + def test_create_bucket_already_exists(self, codecov_vcr): + storage = MinioStorageService(minio_config) + bucket_name = "alreadyexists" + storage.create_root_storage(bucket_name) + with pytest.raises(BucketAlreadyExistsError): + storage.create_root_storage(bucket_name) + + def test_write_then_read_file(self, codecov_vcr): + storage = MinioStorageService(minio_config) + path = "test_write_then_read_file/result" + data = "lorem ipsum dolor test_write_then_read_file á" + bucket_name = "archivetest" + writing_result = storage.write_file(bucket_name, path, data) + assert writing_result + reading_result = storage.read_file(bucket_name, path) + assert reading_result.decode() == data + + def test_write_then_read_file_obj(self, codecov_vcr): + storage = MinioStorageService(minio_config) + path = "test_write_then_read_file/result" + data = "lorem ipsum dolor test_write_then_read_file á" + _, local_path = tempfile.mkstemp() + with open(local_path, "w") as f: + f.write(data) + f = open(local_path, "rb") + bucket_name = "archivetest" + writing_result = storage.write_file(bucket_name, path, f) + assert writing_result + + _, local_path = tempfile.mkstemp() + with open(local_path, "wb") as f: + storage.read_file(bucket_name, path, file_obj=f) + with open(local_path, "rb") as f: + assert f.read().decode() == data + + def test_read_file_does_not_exist(self, request, codecov_vcr): + storage = MinioStorageService(minio_config) + path = f"{request.node.name}/does_not_exist.txt" + bucket_name = "archivetest" + with pytest.raises(FileNotInStorageError): + storage.read_file(bucket_name, path) + + def test_write_then_delete_file(self, request, codecov_vcr): + storage = MinioStorageService(minio_config) + path = f"{request.node.name}/result.txt" + data = "lorem ipsum dolor test_write_then_read_file á" + bucket_name = "archivetest" + writing_result = storage.write_file(bucket_name, path, data) + assert writing_result + deletion_result = storage.delete_file(bucket_name, path) + assert deletion_result is True + with pytest.raises(FileNotInStorageError): + storage.read_file(bucket_name, path) + + def test_delete_file_doesnt_exist(self, request, codecov_vcr): + storage = MinioStorageService(minio_config) + path = f"{request.node.name}/result.txt" + bucket_name = "archivetest" + storage.delete_file(bucket_name, path) + + def test_batch_delete_files(self, request, codecov_vcr): + storage = MinioStorageService(minio_config) + path_1 = f"{request.node.name}/result_1.txt" + path_2 = f"{request.node.name}/result_2.txt" + path_3 = f"{request.node.name}/result_3.txt" + paths = [path_1, path_2, path_3] + data = "lorem ipsum dolor test_write_then_read_file á" + bucket_name = "archivetest" + storage.write_file(bucket_name, path_1, data) + storage.write_file(bucket_name, path_3, data) + deletion_result = storage.delete_files(bucket_name, paths) + assert deletion_result == [True, True, True] + for p in paths: + with pytest.raises(FileNotInStorageError): + storage.read_file(bucket_name, p) + + def test_list_folder_contents(self, request, codecov_vcr): + storage = MinioStorageService(minio_config) + path_1 = f"thiago/{request.node.name}/result_1.txt" + path_2 = f"thiago/{request.node.name}/result_2.txt" + path_3 = f"thiago/{request.node.name}/result_3.txt" + path_4 = f"thiago/{request.node.name}/f1/result_1.txt" + path_5 = f"thiago/{request.node.name}/f1/result_2.txt" + path_6 = f"thiago/{request.node.name}/f1/result_3.txt" + all_paths = [path_1, path_2, path_3, path_4, path_5, path_6] + bucket_name = "archivetest" + for i, p in enumerate(all_paths): + data = f"Lorem ipsum on file {p} for {i * 'po'}" + storage.write_file(bucket_name, p, data) + results_1 = list( + storage.list_folder_contents(bucket_name, f"thiago/{request.node.name}") + ) + expected_result_1 = [ + {"name": path_1, "size": 84}, + {"name": path_2, "size": 86}, + {"name": path_3, "size": 87}, + {"name": path_4, "size": 88}, + {"name": path_5, "size": 89}, + {"name": path_6, "size": 90}, + ] + assert sorted(expected_result_1, key=lambda x: x["size"]) == sorted( + results_1, key=lambda x: x["size"] + ) + results_2 = list( + storage.list_folder_contents(bucket_name, f"thiago/{request.node.name}/f1") + ) + expected_result_2 = [ + {"name": path_4, "size": 88}, + {"name": path_5, "size": 89}, + {"name": path_6, "size": 90}, + ] + assert sorted(expected_result_2, key=lambda x: x["size"]) == sorted( + results_2, key=lambda x: x["size"] + ) -def make_storage() -> MinioStorageService: - return MinioStorageService( - { + """ + Since we cannot rely on `Chain` in the underlying implementation + we cannot ''trick'' minio into using the IAM auth flow while testing, + and therefore have to actually be running on an AWS instance. + We can unskip this test after minio fixes their credential + chain problem + """ + + @pytest.mark.skip(reason="Skipping because minio IAM is currently untestable.") + def test_minio_with_iam_flow(self, codecov_vcr, mocker): + mocker.patch.dict( + os.environ, + { + "MINIO_ACCESS_KEY": "codecov-default-key", + "MINIO_SECRET_KEY": "codecov-default-secret", + }, + ) + minio_iam_config = { "access_key_id": "codecov-default-key", "secret_access_key": "codecov-default-secret", "verify_ssl": False, "host": "minio", "port": "9000", - "iam_auth": False, + "iam_auth": True, "iam_endpoint": None, } - ) - - -def ensure_bucket(storage: MinioStorageService): - try: - storage.create_root_storage(BUCKET_NAME) - except Exception: - pass - - -def test_create_bucket(): - storage = make_storage() - bucket_name = uuid4().hex - - res = storage.create_root_storage(bucket_name, region="") - assert res == {"name": bucket_name} - - -def test_create_bucket_already_exists(): - storage = make_storage() - bucket_name = uuid4().hex - - storage.create_root_storage(bucket_name) - with pytest.raises(BucketAlreadyExistsError): + bucket_name = "testminiowithiamflow" + storage = MinioStorageService(minio_iam_config) storage.create_root_storage(bucket_name) - - -def test_write_then_read_file(): - storage = make_storage() - path = f"test_write_then_read_file/{uuid4().hex}" - data = "lorem ipsum dolor test_write_then_read_file á" - - ensure_bucket(storage) - writing_result = storage.write_file(BUCKET_NAME, path, data) - assert writing_result - reading_result = storage.read_file(BUCKET_NAME, path) - assert reading_result.decode() == data - - -def test_write_then_read_file_already_gzipped(): - storage = make_storage() - path = f"test_write_then_read_file_already_gzipped/{uuid4().hex}" - data = BytesIO( - gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) - ) - - ensure_bucket(storage) - writing_result = storage.write_file( - BUCKET_NAME, path, data, is_already_gzipped=True - ) - assert writing_result - reading_result = storage.read_file(BUCKET_NAME, path) - assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" - - -def test_write_then_read_file_already_zstd(): - storage = make_storage() - path = f"test_write_then_read_file_already_zstd/{uuid4().hex}" - data = BytesIO( - zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) - ) - - ensure_bucket(storage) - writing_result = storage.write_file( - BUCKET_NAME, path, data, compression_type="zstd", is_compressed=True - ) - assert writing_result - reading_result = storage.read_file(BUCKET_NAME, path) - assert reading_result.decode() == "lorem ipsum dolor test_write_then_read_file á" - - -def test_write_then_read_file_obj(): - storage = make_storage() - path = f"test_write_then_read_file/{uuid4().hex}" - data = "lorem ipsum dolor test_write_then_read_file á" - - ensure_bucket(storage) - - _, local_path = tempfile.mkstemp() - with open(local_path, "w") as f: - f.write(data) - with open(local_path, "rb") as f: - writing_result = storage.write_file(BUCKET_NAME, path, f) - assert writing_result - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - storage.read_file(BUCKET_NAME, path, file_obj=f) - with open(local_path, "rb") as f: - assert f.read().decode() == data - - -def test_write_then_read_file_obj_gzip(): - storage = make_storage() - path = f"test_write_then_read_file_gzip/{uuid4().hex}" - data = "lorem ipsum dolor test_write_then_read_file á" - - ensure_bucket(storage) - - _, local_path = tempfile.mkstemp() - with open(local_path, "w") as f: - f.write(data) - with open(local_path, "rb") as f: - writing_result = storage.write_file( - BUCKET_NAME, path, f, compression_type="gzip" + path = "test_write_then_read_file/result" + data = "lorem ipsum dolor test_write_then_read_file á" + writing_result = storage.write_file(bucket_name, path, data) + assert writing_result + reading_result = storage.read_file(bucket_name, path) + assert reading_result.decode() == data + + def test_minio_without_ports(self, mocker): + mocked_minio_client = mocker.patch("shared.storage.minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "iam_auth": True, + "iam_endpoint": None, + } + storage = MinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports", credentials=mocker.ANY, secure=False, region=None ) - assert writing_result - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - storage.read_file(BUCKET_NAME, path, file_obj=f) - with open(local_path, "rb") as f: - assert f.read().decode() == data - - -def test_write_then_read_file_obj_no_compression(): - storage = make_storage() - path = f"test_write_then_read_file_no_compression/{uuid4().hex}" - data = "lorem ipsum dolor test_write_then_read_file á" - - ensure_bucket(storage) - _, local_path = tempfile.mkstemp() - with open(local_path, "w") as f: - f.write(data) - with open(local_path, "rb") as f: - writing_result = storage.write_file(BUCKET_NAME, path, f, compression_type=None) - assert writing_result - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - storage.read_file(BUCKET_NAME, path, file_obj=f) - with open(local_path, "rb") as f: - assert f.read().decode() == data - - -def test_write_then_read_file_obj_x_gzip(): - storage = make_storage() - path = f"test_write_then_read_file_obj_x_gzip/{uuid4().hex}" - compressed = gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) - outsize = len(compressed) - data = BytesIO(compressed) - - ensure_bucket(storage) - - headers = {"Content-Encoding": "gzip"} - storage.minio_client.put_object( - BUCKET_NAME, - path, - data, - content_type="application/x-gzip", - metadata=headers, - length=outsize, - ) - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - storage.read_file(BUCKET_NAME, path, file_obj=f) - with open(local_path, "rb") as f: - assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" - - -def test_write_then_read_file_obj_already_gzipped(): - storage = make_storage() - path = f"test_write_then_read_file_obj_already_gzipped/{uuid4().hex}" - data = BytesIO( - gzip.compress("lorem ipsum dolor test_write_then_read_file á".encode()) - ) - - ensure_bucket(storage) - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - f.write(data.getvalue()) - with open(local_path, "rb") as f: - writing_result = storage.write_file( - BUCKET_NAME, path, f, is_already_gzipped=True + def test_minio_with_ports(self, mocker): + mocked_minio_client = mocker.patch("shared.storage.minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + } + storage = MinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", credentials=mocker.ANY, secure=False, region=None ) - assert writing_result - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - storage.read_file(BUCKET_NAME, path, file_obj=f) - with open(local_path, "rb") as f: - assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" - -def test_write_then_read_file_obj_already_zstd(): - storage = make_storage() - path = f"test_write_then_read_file_obj_already_zstd/{uuid4().hex}" - data = BytesIO( - zstandard.compress("lorem ipsum dolor test_write_then_read_file á".encode()) - ) - - ensure_bucket(storage) - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - f.write(data.getvalue()) - with open(local_path, "rb") as f: - writing_result = storage.write_file( - BUCKET_NAME, path, f, is_compressed=True, compression_type="zstd" + def test_minio_with_region(self, mocker): + mocked_minio_client = mocker.patch("shared.storage.minio.Minio") + minio_no_ports_config = { + "access_key_id": "hodor", + "secret_access_key": "haha", + "verify_ssl": False, + "host": "cute_url_no_ports", + "port": "9000", + "iam_auth": True, + "iam_endpoint": None, + "region": "example", + } + storage = MinioStorageService(minio_no_ports_config) + assert storage.minio_config == minio_no_ports_config + mocked_minio_client.assert_called_with( + "cute_url_no_ports:9000", + credentials=mocker.ANY, + secure=False, + region="example", ) - assert writing_result - - _, local_path = tempfile.mkstemp() - with open(local_path, "wb") as f: - storage.read_file(BUCKET_NAME, path, file_obj=f) - with open(local_path, "rb") as f: - assert f.read().decode() == "lorem ipsum dolor test_write_then_read_file á" - - -def test_read_file_does_not_exist(): - storage = make_storage() - path = f"test_read_file_does_not_exist/{uuid4().hex}" - - ensure_bucket(storage) - with pytest.raises(FileNotInStorageError): - storage.read_file(BUCKET_NAME, path) - - -def test_write_then_delete_file(): - storage = make_storage() - path = f"test_write_then_delete_file/{uuid4().hex}" - data = "lorem ipsum dolor test_write_then_delete_file á" - - ensure_bucket(storage) - writing_result = storage.write_file(BUCKET_NAME, path, data) - assert writing_result - - deletion_result = storage.delete_file(BUCKET_NAME, path) - assert deletion_result is True - with pytest.raises(FileNotInStorageError): - storage.read_file(BUCKET_NAME, path) - - -def test_batch_delete_files(): - storage = make_storage() - path = f"test_batch_delete_files/{uuid4().hex}" - path_1 = f"{path}/result_1.txt" - path_2 = f"{path}/result_2.txt" - path_3 = f"{path}/result_3.txt" - paths = [path_1, path_2, path_3] - data = "lorem ipsum dolor test_batch_delete_files á" - - ensure_bucket(storage) - storage.write_file(BUCKET_NAME, path_1, data) - storage.write_file(BUCKET_NAME, path_3, data) - - deletion_result = storage.delete_files(BUCKET_NAME, paths) - assert deletion_result == [True, True, True] - for p in paths: - with pytest.raises(FileNotInStorageError): - storage.read_file(BUCKET_NAME, p) - - -def test_list_folder_contents(): - storage = make_storage() - path = f"test_list_folder_contents/{uuid4().hex}" - path_1 = "/result_1.txt" - path_2 = "/result_2.txt" - path_3 = "/result_3.txt" - path_4 = "/x1/result_1.txt" - path_5 = "/x1/result_2.txt" - path_6 = "/x1/result_3.txt" - all_paths = [path_1, path_2, path_3, path_4, path_5, path_6] - - ensure_bucket(storage) - for i, p in enumerate(all_paths): - data = f"Lorem ipsum on file {p} for {i * 'po'}" - storage.write_file(BUCKET_NAME, f"{path}{p}", data) - - results_1 = sorted( - storage.list_folder_contents(BUCKET_NAME, path), - key=lambda x: x["name"], - ) - # NOTE: the `size` here is actually the compressed (currently gzip) size - assert results_1 == [ - {"name": f"{path}{path_1}", "size": 47}, - {"name": f"{path}{path_2}", "size": 49}, - {"name": f"{path}{path_3}", "size": 51}, - {"name": f"{path}{path_4}", "size": 56}, - {"name": f"{path}{path_5}", "size": 58}, - {"name": f"{path}{path_6}", "size": 60}, - ] - - results_2 = sorted( - storage.list_folder_contents(BUCKET_NAME, f"{path}/x1"), - key=lambda x: x["name"], - ) - assert results_2 == [ - {"name": f"{path}{path_4}", "size": 56}, - {"name": f"{path}{path_5}", "size": 58}, - {"name": f"{path}{path_6}", "size": 60}, - ] - - -def test_minio_without_ports(mocker): - mocked_minio_client = mocker.patch("shared.storage.minio.Minio") - minio_no_ports_config = { - "access_key_id": "hodor", - "secret_access_key": "haha", - "verify_ssl": False, - "host": "cute_url_no_ports", - "iam_auth": True, - "iam_endpoint": None, - } - - storage = MinioStorageService(minio_no_ports_config) - assert storage.minio_config == minio_no_ports_config - mocked_minio_client.assert_called_with( - "cute_url_no_ports", credentials=mocker.ANY, secure=False, region=None - ) - - -def test_minio_with_ports(mocker): - mocked_minio_client = mocker.patch("shared.storage.minio.Minio") - minio_no_ports_config = { - "access_key_id": "hodor", - "secret_access_key": "haha", - "verify_ssl": False, - "host": "cute_url_no_ports", - "port": "9000", - "iam_auth": True, - "iam_endpoint": None, - } - - storage = MinioStorageService(minio_no_ports_config) - assert storage.minio_config == minio_no_ports_config - mocked_minio_client.assert_called_with( - "cute_url_no_ports:9000", credentials=mocker.ANY, secure=False, region=None - ) - - -def test_minio_with_region(mocker): - mocked_minio_client = mocker.patch("shared.storage.minio.Minio") - minio_no_ports_config = { - "access_key_id": "hodor", - "secret_access_key": "haha", - "verify_ssl": False, - "host": "cute_url_no_ports", - "port": "9000", - "iam_auth": True, - "iam_endpoint": None, - "region": "example", - } - - storage = MinioStorageService(minio_no_ports_config) - assert storage.minio_config == minio_no_ports_config - mocked_minio_client.assert_called_with( - "cute_url_no_ports:9000", - credentials=mocker.ANY, - secure=False, - region="example", - )