Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support zstd compression in miniostorage #405

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"requests>=2.32.3",
"sentry-sdk>=2.18.0",
"sqlalchemy<2",
"zstandard>=0.23.0",
]

[build-system]
Expand Down
1 change: 1 addition & 0 deletions shared/storage/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
CHUNK_SIZE = 1024 * 32
PART_SIZE = 1024 * 1024 * 20 # 20MiB


# Interface class for interfacing with codecov's underlying storage layer
Expand Down
193 changes: 120 additions & 73 deletions shared/storage/minio.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
import gzip
import json
import logging
import os
import shutil
import sys
import tempfile
from io import BytesIO
from typing import BinaryIO, overload
from typing import IO, BinaryIO, Tuple, cast, overload

import zstandard
from minio import Minio
from minio.credentials import (
from minio.credentials.providers import (
ChainedProvider,
EnvAWSProvider,
EnvMinioProvider,
IamAwsProvider,
)
from minio.deleteobjects import DeleteObject
from minio.error import MinioException, S3Error
from minio.helpers import ObjectWriteResult
from urllib3 import HTTPResponse

from shared.storage.base import CHUNK_SIZE, BaseStorageService
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
from shared.storage.base import CHUNK_SIZE, PART_SIZE, BaseStorageService
from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError

log = logging.getLogger(__name__)


class GZipStreamReader:
def __init__(self, fileobj: IO[bytes]):
self.data = fileobj

def read(self, size: int = -1, /) -> bytes:
curr_data = self.data.read(size)

if not curr_data:
return b""

return gzip.compress(curr_data)
Swatinem marked this conversation as resolved.
Show resolved Hide resolved


# Service class for interfacing with codecov's underlying storage layer, minio
class MinioStorageService(BaseStorageService):
def __init__(self, minio_config):
Expand All @@ -49,28 +62,29 @@ def init_minio_client(
self,
host: str,
port: str,
access_key: str = None,
secret_key: str = None,
access_key: str | None = None,
secret_key: str | None = None,
verify_ssl: bool = False,
iam_auth: bool = False,
iam_endpoint: str = None,
region: str = None,
iam_endpoint: str | None = None,
region: str | None = None,
):
"""
Initialize the minio client
Initialize the minio client

`iam_auth` adds support for IAM base authentication in a fallback pattern.
The following will be checked in order:
The following will be checked in order:

* EC2 metadata -- a custom endpoint can be provided, default is None.
* AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY
* Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY
* AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY

to support backward compatibility, the iam_auth setting should be used in the installation
configuration
to support backward compatibility, the iam_auth setting should be used
in the installation configuration

Args:
host (str): The address of the host where minio lives

port (str): The port number (as str or int should be ok)
access_key (str, optional): The access key (optional if IAM is being used)
secret_key (str, optional): The secret key (optional if IAM is being used)
Expand Down Expand Up @@ -143,85 +157,118 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"):
# Writes a file to storage will gzip if not compressed already
def write_file(
self,
bucket_name,
path,
data,
reduced_redundancy=False,
bucket_name: str,
path: str,
data: IO[bytes] | str | bytes,
reduced_redundancy: bool = False,
*,
is_already_gzipped: bool = False,
):
is_already_gzipped: bool = False, # deprecated
is_compressed: bool = False,
compression_type: str | None = "zstd",
) -> ObjectWriteResult:
if isinstance(data, str):
data = data.encode()
data = BytesIO(data.encode())
elif isinstance(data, (bytes, bytearray, memoryview)):
data = BytesIO(data)

if isinstance(data, bytes):
if not is_already_gzipped:
out = BytesIO()
with gzip.GzipFile(fileobj=out, mode="w", compresslevel=9) as gz:
gz.write(data)
else:
out = BytesIO(data)
if is_already_gzipped:
is_compressed = True
compression_type = "gzip"

# get file size
out.seek(0, os.SEEK_END)
out_size = out.tell()
if is_compressed:
result = data
else:
# data is already a file-like object
if not is_already_gzipped:
_, filename = tempfile.mkstemp()
with gzip.open(filename, "wb") as f:
shutil.copyfileobj(data, f)
out = open(filename, "rb")
if compression_type == "zstd":
cctx = zstandard.ZstdCompressor()
result = cctx.stream_reader(data)

elif compression_type == "gzip":
result = GZipStreamReader(data)

else:
out = data
result = data

out_size = os.stat(filename).st_size
headers: dict[str, str | list[str] | Tuple[str]] = {}

try:
# reset pos for minio reading.
out.seek(0)

headers = {"Content-Encoding": "gzip"}
if reduced_redundancy:
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
self.minio_client.put_object(
bucket_name,
path,
out,
out_size,
metadata=headers,
content_type="text/plain",
)
return True
if compression_type:
headers["Content-Encoding"] = compression_type

except MinioException:
raise
if reduced_redundancy:
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"

# it's safe to do a BinaryIO cast here because we know that put_object only uses a function of the shape:
# read(self, size: int = -1, /) -> bytes
# GZipStreamReader implements this (we did it ourselves)
# ZstdCompressionReader implements read(): https://github.com/indygreg/python-zstandard/blob/12a80fac558820adf43e6f16206120685b9eb880/zstandard/__init__.pyi#L233C5-L233C49
# BytesIO implements read(): https://docs.python.org/3/library/io.html#io.BufferedReader.read
# IO[bytes] implements read(): https://github.com/python/cpython/blob/3.13/Lib/typing.py#L3502

return self.minio_client.put_object(
bucket_name,
path,
cast(BinaryIO, result),
-1,
metadata=headers,
content_type="text/plain",
part_size=PART_SIZE,
)

@overload
def read_file(self, bucket_name: str, path: str) -> bytes: ...
def read_file(
self, bucket_name: str, path: str, file_obj: None = None
) -> bytes: ...

@overload
def read_file(self, bucket_name: str, path: str, file_obj: BinaryIO) -> None: ...
def read_file(self, bucket_name: str, path: str, file_obj: IO[bytes]) -> None: ...

def read_file(self, bucket_name, path, file_obj=None) -> bytes | None:
def read_file(
self, bucket_name: str, path: str, file_obj: IO[bytes] | None = None
) -> bytes | None:
headers: dict[str, str | list[str] | Tuple[str]] = {
"Accept-Encoding": "gzip, zstd"
}
try:
res = self.minio_client.get_object(bucket_name, path)
if file_obj is None:
data = BytesIO()
for d in res.stream(CHUNK_SIZE):
data.write(d)
data.seek(0)
return data.getvalue()
else:
for d in res.stream(CHUNK_SIZE):
file_obj.write(d)
response = cast(
HTTPResponse,
self.minio_client.get_object( # this returns an HTTPResponse
bucket_name, path, request_headers=headers
),
)
except S3Error as e:
if e.code == "NoSuchKey":
raise FileNotInStorageError(
f"File {path} does not exist in {bucket_name}"
)
raise e
except MinioException:
raise
if response.headers:
content_encoding = response.headers.get("Content-Encoding", None)
if content_encoding == "zstd":
# we have to manually decompress zstandard compressed data
cctx = zstandard.ZstdDecompressor()
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
# if the object passed to this has a read method then that's
# all this object will ever need, since it will just call read
# and get the bytes object resulting from it then compress that
# HTTPResponse
reader = cctx.stream_reader(cast(IO[bytes], response))
else:
reader = response
else:
reader = response

if file_obj:
file_obj.seek(0)
while chunk := reader.read(CHUNK_SIZE):
file_obj.write(chunk)
response.close()
response.release_conn()
return None
else:
res = BytesIO()
while chunk := reader.read(CHUNK_SIZE):
res.write(chunk)
response.close()
response.release_conn()
return res.getvalue()

"""
Deletes file url in specified bucket.
Expand Down
Empty file added tests/unit/storage/__init__.py
Empty file.
Loading