Skip to content

Commit

Permalink
Avoid metadata requests talking to GCS
Browse files Browse the repository at this point in the history
The GCS Storage would previously fetch the bucket metadata using `get_bucket`, where a call to `bucket` is sufficient to create the python class without doing a request.

Similarly, every `read_file` would `reload` the metadata of the `Blob` in order to fix a `content_type` mismatch.
It is unclear why that was done, but transparent (zlib) decompression should only depend on the `content_encoding`, and the `content_type` should not matter in that case.

Both these changes should make sure that we can read a file with a single request (instead of 3), and store a file with only a single request (instead of 2).
  • Loading branch information
Swatinem committed Sep 5, 2024
1 parent 42f83ec commit 5ebd191
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 41 deletions.
65 changes: 25 additions & 40 deletions shared/storage/gcp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gzip
import logging
from typing import IO

import google.cloud.exceptions
from google.cloud import storage
Expand All @@ -26,8 +27,8 @@ def load_credentials(self, gcp_config):
return Credentials.from_service_account_info(gcp_config)

def get_blob(self, bucket_name, path):
bucket = self.storage_client.get_bucket(bucket_name)
return storage.Blob(path, bucket)
bucket = self.storage_client.bucket(bucket_name)
return bucket.blob(path)

def create_root_storage(self, bucket_name="archive", region="us-east-1"):
"""
Expand All @@ -48,29 +49,27 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"):

def write_file(
self,
bucket_name,
path,
data,
bucket_name: str,
path: str,
data: str | bytes | IO,
reduced_redundancy=False,
*,
is_already_gzipped: bool = False,
):
"""
Writes a new file with the contents of `data`
(What happens if the file already exists?)
Writes a new file with the contents of `data`
(What happens if the file already exists?)
Args:
bucket_name (str): The name of the bucket for the file to be created on
path (str): The desired path of the file
data (str): The data to be written to the file
data: The data to be written to the file
reduced_redundancy (bool): Whether a reduced redundancy mode should be used (default: {False})
is_already_gzipped (bool): Whether the file is already gzipped (default: {False})
Raises:
NotImplementedError: If the current instance did not implement this method
"""
blob = self.get_blob(bucket_name, path)

if isinstance(data, str):
data = data.encode()
if isinstance(data, bytes):
Expand Down Expand Up @@ -108,30 +107,24 @@ def append_to_file(self, bucket_name, path, data):
file_contents = data
return self.write_file(bucket_name, path, file_contents)

def read_file(self, bucket_name, path, file_obj=None, *, retry=0):
def read_file(
self, bucket_name: str, path: str, file_obj: IO | None = None, *, retry=0
) -> bytes:
"""Reads the content of a file
Args:
bucket_name (str): The name of the bucket for the file lives
path (str): The path of the file
Raises:
NotImplementedError: If the current instance did not implement this method
FileNotInStorageError: If the file does not exist
Returns:
bytes : The contents of that file, still encoded as bytes
bytes: The contents of that file, still encoded as bytes
"""
blob = self.get_blob(bucket_name, path)

try:
blob.reload()
if (
blob.content_type == "application/x-gzip"
and blob.content_encoding == "gzip"
):
blob.content_type = "text/plain"
blob.content_encoding = "gzip"
blob.patch()
if file_obj is None:
return blob.download_as_bytes(checksum="crc32c")
else:
Expand All @@ -144,19 +137,18 @@ def read_file(self, bucket_name, path, file_obj=None, *, retry=0):
return self.read_file(bucket_name, path, file_obj, retry=1)
raise

def delete_file(self, bucket_name, path):
def delete_file(self, bucket_name: str, path: str) -> bool:
"""Deletes a single file from the storage (what happens if the file doesnt exist?)
Args:
bucket_name (str): The name of the bucket for the file lives
path (str): The path of the file to be deleted
Raises:
NotImplementedError: If the current instance did not implement this method
FileNotInStorageError: If the file does not exist
Returns:
bool: True if the deletion was succesful
bool: True if the deletion was successful
"""
blob = self.get_blob(bucket_name, path)
try:
Expand All @@ -165,28 +157,25 @@ def delete_file(self, bucket_name, path):
raise FileNotInStorageError(f"File {path} does not exist in {bucket_name}")
return True

def delete_files(self, bucket_name, paths=[]):
def delete_files(self, bucket_name: str, paths: list[str]) -> list[bool]:
"""Batch deletes a list of files from a given bucket
(what happens to the files that don't exist?)
Args:
bucket_name (str): The name of the bucket for the file lives
paths (list): A list of the paths to be deletes (default: {[]})
Raises:
NotImplementedError: If the current instance did not implement this method
Returns:
list: A list of booleans, where each result indicates whether that file was deleted
successfully
"""
bucket = self.storage_client.get_bucket(bucket_name)
blobs = [self.get_blob(bucket_name, path) for path in paths]
bucket = self.storage_client.bucket(bucket_name)
blobs = [bucket.blob(path) for path in paths]
blobs_errored = set()
bucket.delete_blobs(blobs, on_error=blobs_errored.add)
return [b not in blobs_errored for b in blobs]

def list_folder_contents(self, bucket_name, prefix=None, recursive=True):
def list_folder_contents(self, bucket_name: str, prefix=None, recursive=True):
"""List the contents of a specific folder
Attention: google ignores the `recursive` param
Expand All @@ -195,13 +184,9 @@ def list_folder_contents(self, bucket_name, prefix=None, recursive=True):
bucket_name (str): The name of the bucket for the file lives
prefix: The prefix of the files to be listed (default: {None})
recursive: Whether the listing should be recursive (default: {True})
Raises:
NotImplementedError: If the current instance did not implement this method
"""
assert recursive
bucket = self.storage_client.get_bucket(bucket_name)
return (self._blob_to_dict(b) for b in bucket.list_blobs(prefix=prefix))

def _blob_to_dict(self, blob):
return {"name": blob.name, "size": blob.size}
bucket = self.storage_client.bucket(bucket_name)
return (
{"name": b.name, "size": b.size} for b in bucket.list_blobs(prefix=prefix)
)
1 change: 0 additions & 1 deletion tests/unit/storage/test_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ def test_read_file_application_gzip(self, request, codecov_vcr):
f, size=f.tell(), rewind=True, content_type="application/x-gzip"
)
content = storage.read_file(bucket_name, path)
print(content)
assert content.decode() == content_to_upload

def test_write_then_delete_file(self, request, codecov_vcr):
Expand Down

0 comments on commit 5ebd191

Please sign in to comment.