Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support zstd compression in miniostorage #405

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"requests>=2.32.3",
"sentry-sdk>=2.18.0",
"sqlalchemy<2",
"zstandard>=0.23.0",
]

[build-system]
Expand Down
145 changes: 93 additions & 52 deletions shared/storage/minio.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import gzip
import datetime
import json
import logging
import os
import shutil
import sys
import tempfile
from io import BytesIO
from typing import BinaryIO, overload
from typing import BinaryIO, Protocol, overload

import zstandard
from minio import Minio
from minio.credentials import (
ChainedProvider,
Expand All @@ -17,13 +17,29 @@
)
from minio.deleteobjects import DeleteObject
from minio.error import MinioException, S3Error
from urllib3.response import HTTPResponse

from shared.storage.base import CHUNK_SIZE, BaseStorageService
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
from shared.storage.base import BaseStorageService
from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError

log = logging.getLogger(__name__)


class Readable(Protocol):
def read(self, size: int = -1) -> bytes: ...


class GetObjectToFileResponse(Protocol):
bucket_name: str
object_name: str
last_modified: datetime.datetime | None
etag: str
size: int
content_type: str | None
metadata: dict[str, str]
version_id: str | None
Swatinem marked this conversation as resolved.
Show resolved Hide resolved


# Service class for interfacing with codecov's underlying storage layer, minio
class MinioStorageService(BaseStorageService):
def __init__(self, minio_config):
Expand Down Expand Up @@ -57,20 +73,21 @@ def init_minio_client(
region: str = None,
):
"""
Initialize the minio client
Initialize the minio client

`iam_auth` adds support for IAM base authentication in a fallback pattern.
The following will be checked in order:
The following will be checked in order:

* EC2 metadata -- a custom endpoint can be provided, default is None.
* AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY
* Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY
* AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY

to support backward compatibility, the iam_auth setting should be used in the installation
configuration
to support backward compatibility, the iam_auth setting should be used
in the installation configuration

Args:
host (str): The address of the host where minio lives

port (str): The port number (as str or int should be ok)
access_key (str, optional): The access key (optional if IAM is being used)
secret_key (str, optional): The secret key (optional if IAM is being used)
Expand Down Expand Up @@ -143,50 +160,49 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"):
# Writes a file to storage will gzip if not compressed already
def write_file(
self,
bucket_name,
path,
data,
reduced_redundancy=False,
bucket_name: str,
path: str,
data: BinaryIO,
reduced_redundancy: bool = False,
*,
is_already_gzipped: bool = False,
is_already_gzipped: bool = False, # deprecated
is_compressed: bool = False,
compression_type: str = "zstd",
):
if isinstance(data, str):
data = data.encode()
if is_already_gzipped:
is_compressed = True
compression_type = "gzip"

if isinstance(data, bytes):
if not is_already_gzipped:
out = BytesIO()
with gzip.GzipFile(fileobj=out, mode="w", compresslevel=9) as gz:
gz.write(data)
else:
out = BytesIO(data)

# get file size
out.seek(0, os.SEEK_END)
out_size = out.tell()
else:
# data is already a file-like object
if not is_already_gzipped:
_, filename = tempfile.mkstemp()
with gzip.open(filename, "wb") as f:
shutil.copyfileobj(data, f)
out = open(filename, "rb")
else:
out = data
if isinstance(data, str):
data = BytesIO(data.encode())

out_size = os.stat(filename).st_size
if not is_compressed:
cctx = zstandard.ZstdCompressor()
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
reader: zstandard.ZstdCompressionReader = cctx.stream_reader(data)
_, filepath = tempfile.mkstemp()
with open(filepath, "wb") as f:
while chunk := reader.read(16384):
f.write(chunk)
data = open(filepath, "rb")

try:
# reset pos for minio reading.
out.seek(0)
out_size = data.seek(0, os.SEEK_END)
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
data.seek(0)

if compression_type == "gzip":
content_encoding = "gzip"
elif compression_type == "zstd":
content_encoding = "zstd"

headers = {"Content-Encoding": content_encoding}

headers = {"Content-Encoding": "gzip"}
if reduced_redundancy:
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"

self.minio_client.put_object(
bucket_name,
path,
out,
data,
out_size,
metadata=headers,
content_type="text/plain",
Expand All @@ -195,25 +211,46 @@ def write_file(

except MinioException:
raise
finally:
if not is_compressed:
data.close()
os.unlink(filepath)

@overload
def read_file(self, bucket_name: str, path: str) -> bytes: ...
def read_file(
self, bucket_name: str, path: str, file_obj: None = None
) -> bytes: ...

@overload
def read_file(self, bucket_name: str, path: str, file_obj: BinaryIO) -> None: ...
def read_file(self, bucket_name: str, path: str, file_obj: str) -> None: ...
Swatinem marked this conversation as resolved.
Show resolved Hide resolved

def read_file(self, bucket_name, path, file_obj=None) -> bytes | None:
response = None
try:
res = self.minio_client.get_object(bucket_name, path)
if file_obj is None:
data = BytesIO()
for d in res.stream(CHUNK_SIZE):
data.write(d)
data.seek(0)
return data.getvalue()
headers = {"Accept-Encoding": "gzip, zstd"}
response: HTTPResponse = self.minio_client.get_object(
bucket_name, path, request_headers=headers
)

content_encoding = response.headers.get("Content-Encoding", None)
reader: Readable | None = None
if content_encoding == "zstd":
# we have to manually decompress zstandard compressed data
cctx = zstandard.ZstdDecompressor()
reader = cctx.stream_reader(response)
else:
reader = response

if file_obj:
file_obj.seek(0)
while chunk := reader.read(16384):
file_obj.write(chunk)
return None
else:
for d in res.stream(CHUNK_SIZE):
file_obj.write(d)
res = BytesIO()
while chunk := reader.read(16384):
res.write(chunk)
return res.getvalue()
except S3Error as e:
if e.code == "NoSuchKey":
raise FileNotInStorageError(
Expand All @@ -222,6 +259,10 @@ def read_file(self, bucket_name, path, file_obj=None) -> bytes | None:
raise e
except MinioException:
raise
finally:
if response:
response.close()
response.release_conn()

"""
Deletes file url in specified bucket.
Expand Down
Empty file added tests/unit/storage/__init__.py
Empty file.
Loading