Skip to content

Commit

Permalink
implement multipart xml uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
emilong committed Jun 24, 2024
1 parent 025a5de commit 7bcbfd1
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 99 deletions.
1 change: 1 addition & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python 3.10.6
43 changes: 43 additions & 0 deletions src/gcp_storage_emulator/checksums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import hashlib
from base64 import b64encode

import google_crc32c

from gcp_storage_emulator.exceptions import Conflict


MD5_CHECKSUM_ERROR = 'Provided MD5 hash "{}" doesn\'t match calculated MD5 hash "{}".'
CRC32C_CHECKSUM_ERROR = 'Provided CRC32C "{}" doesn\'t match calculated CRC32C "{}".'


def _crc32c(content):
if isinstance(content, str):
content = content.encode()
val = google_crc32c.Checksum(content)
return b64encode(val.digest()).decode("ascii")


def _md5(content):
if isinstance(content, str):
content = content.encode()
return b64encode(hashlib.md5(content).digest()).decode("ascii")


def checksums(content, file_obj):
crc32c_hash = _crc32c(content)
obj_crc32c = file_obj.get("crc32c")
md5_hash = _md5(content)
obj_md5 = file_obj.get("md5Hash")
if not obj_crc32c:
file_obj["crc32c"] = crc32c_hash
else:
if obj_crc32c != crc32c_hash:
raise Conflict(CRC32C_CHECKSUM_ERROR.format(obj_crc32c, crc32c_hash))
if not obj_md5:
file_obj["md5Hash"] = md5_hash
else:
if obj_md5 != md5_hash:
raise Conflict(MD5_CHECKSUM_ERROR.format(obj_md5, md5_hash))
if not file_obj.get("etag"):
file_obj["etag"] = md5_hash
return file_obj
154 changes: 104 additions & 50 deletions src/gcp_storage_emulator/handlers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import secrets
import string
import textwrap
import time
import urllib.parse
from base64 import b64encode
Expand All @@ -16,6 +17,7 @@
import google_crc32c

from gcp_storage_emulator.exceptions import Conflict, NotFound
from gcp_storage_emulator.checksums import checksums

logger = logging.getLogger("api.object")

Expand Down Expand Up @@ -51,10 +53,6 @@
}


MD5_CHECKSUM_ERROR = 'Provided MD5 hash "{}" doesn\'t match calculated MD5 hash "{}".'
CRC32C_CHECKSUM_ERROR = 'Provided CRC32C "{}" doesn\'t match calculated CRC32C "{}".'


class GoogleHTTPStatus(IntEnum):
def __new__(cls, value, phrase, description=""):
obj = int.__new__(cls, value)
Expand All @@ -76,39 +74,6 @@ def _handle_conflict(response, err):
response.json(resp)


def _crc32c(content):
if isinstance(content, str):
content = content.encode()
val = google_crc32c.Checksum(content)
return b64encode(val.digest()).decode("ascii")


def _md5(content):
if isinstance(content, str):
content = content.encode()
return b64encode(hashlib.md5(content).digest()).decode("ascii")


def _checksums(content, file_obj):
crc32c_hash = _crc32c(content)
obj_crc32c = file_obj.get("crc32c")
md5_hash = _md5(content)
obj_md5 = file_obj.get("md5Hash")
if not obj_crc32c:
file_obj["crc32c"] = crc32c_hash
else:
if obj_crc32c != crc32c_hash:
raise Conflict(CRC32C_CHECKSUM_ERROR.format(obj_crc32c, crc32c_hash))
if not obj_md5:
file_obj["md5Hash"] = md5_hash
else:
if obj_md5 != md5_hash:
raise Conflict(MD5_CHECKSUM_ERROR.format(obj_md5, md5_hash))
if not file_obj.get("etag"):
file_obj["etag"] = md5_hash
return file_obj


def _patch_object(obj, metadata):
if metadata:
obj["metageneration"] = str(int(obj["metageneration"]) + 1)
Expand Down Expand Up @@ -173,7 +138,7 @@ def _media_upload(request, response, storage):
content_type,
str(len(request.data)),
)
obj = _checksums(request.data, obj)
obj = checksums(request.data, obj)
storage.create_file(
request.params["bucket_name"],
object_id,
Expand All @@ -198,7 +163,7 @@ def _multipart_upload(request, response, storage):
str(len(request.data["content"])),
request.data["meta"],
)
obj = _checksums(request.data["content"], obj)
obj = checksums(request.data["content"], obj)
storage.create_file(
request.params["bucket_name"],
object_id,
Expand Down Expand Up @@ -263,6 +228,93 @@ def _patch(storage, bucket_name, object_id, metadata):
return None


def _extract_host_bucket(host):
return re.match(r"^(?P<bucket_name>[^.]+)\.", host).group("bucket_name")


# https://cloud.google.com/storage/docs/xml-api/post-object-multipart
def _xml_initiate_upload(request, response, storage, *args, **kwargs):
bucket_name = _extract_host_bucket(request.host)
object_id = request.path.lstrip("/")
content_type = request.headers.get("content-type")
# TODO extract metadata from the request headers
metadata = {}
try:
upload_id = storage.create_xml_multipart_upload(
bucket_name, object_id, content_type, metadata
)

xml = """
<?xml version='1.0' encoding='UTF-8'?>
<InitiateMultipartUploadResult xmlns='http://s3.amazonaws.com/doc/2006-03-01/'>
<Bucket>{}</Bucket>
<Key>{}</Key>
<UploadId>{}</UploadId>
</InitiateMultipartUploadResult>
""".format(
bucket_name, object_id, upload_id
)
response.xml(textwrap.dedent(xml))
except NotFound:
response.status = HTTPStatus.NOT_FOUND


# https://cloud.google.com/storage/docs/xml-api/post-object-complete
def _xml_complete_upload(request, response, storage, *args, **kwargs):
bucket_name = _extract_host_bucket(request.host)
object_id = request.path.lstrip("/")
upload_id = request.query.get("uploadId")[0]

try:
storage.complete_multipart_upload(upload_id)

# NOTE the constant etag below - we don't actually calculate it
xml = """
<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Location>http://{}/{}</Location>
<Bucket>{}</Bucket>
<Key>{}</Key>
<ETag>"7fc8f92280ac3c975f300cb64412c16f-9"</ETag>
</CompleteMultipartUploadResult>
""".format(
request.host, object_id, bucket_name, object_id
)
response.xml(textwrap.dedent(xml))
except NotFound:
response.status = HTTPStatus.NOT_FOUND


## https://cloud.google.com/storage/docs/xml-api/post-object-multipart
def xml_multipart_upload(request, response, storage, *args, **kwargs):
print("query:", request.query.get("uploads"))
if request.query.get("uploads") is not None:
return _xml_initiate_upload(request, response, storage, *args, **kwargs)

if request.query.get("uploadId") is not None:
return _xml_complete_upload(request, response, storage, *args, **kwargs)

# Only know how to handle multipart uploads at the moment, not resumable ones
response.status = HTTPStatus.BAD_REQUEST


# https://cloud.google.com/storage/docs/xml-api/put-object-multipart
def xml_part_upload(request, response, storage, *args, **kwargs):
if request.query.get("uploadId") is None or request.query.get("partNumber") is None:
response.status = HTTPStatus.BAD_REQUEST
return

upload_id = request.query.get("uploadId")[0]
part_number = int(request.query.get("partNumber")[0], 10)
print("upload_id", upload_id, "part_number", part_number)
try:
storage.add_to_multipart_upload(upload_id, part_number, request.data)
# We don't actually do anything with the tag, but return it for compatibility
response["ETag"] = f'"{hashlib.md5(request.data).hexdigest()}"'
except NotFound:
response.status = HTTPStatus.NOT_FOUND


def xml_upload(request, response, storage, *args, **kwargs):
content_type = request.get_header("Content-Type", "application/octet-stream")
obj = _make_object_resource(
Expand All @@ -273,7 +325,7 @@ def xml_upload(request, response, storage, *args, **kwargs):
str(len(request.data)),
)
try:
obj = _checksums(request.data, obj)
obj = checksums(request.data, obj)
storage.create_file(
request.params["bucket_name"],
request.params["object_id"],
Expand Down Expand Up @@ -329,7 +381,7 @@ def upload_partial(request, response, storage, *args, **kwargs):
else:
data = request.data or b""

obj = _checksums(data, obj)
obj = checksums(data, obj)
obj["size"] = str(len(data))
storage.create_file(obj["bucket"], obj["name"], data, obj, upload_id)
response.json(obj)
Expand Down Expand Up @@ -385,7 +437,7 @@ def copy(request, response, storage, *args, **kwargs):

file = storage.get_file(request.params["bucket_name"], request.params["object_id"])
try:
dest_obj = _checksums(file, dest_obj)
dest_obj = checksums(file, dest_obj)
storage.create_file(
request.params["dest_bucket_name"],
request.params["dest_object_id"],
Expand Down Expand Up @@ -419,19 +471,21 @@ def rewrite(request, response, storage, *args, **kwargs):

file = storage.get_file(request.params["bucket_name"], request.params["object_id"])
try:
dest_obj = _checksums(file, dest_obj)
dest_obj = checksums(file, dest_obj)
storage.create_file(
request.params["dest_bucket_name"],
request.params["dest_object_id"],
file,
dest_obj,
)
response.json({
"resource": dest_obj,
"written": dest_obj["size"],
"size": dest_obj["size"],
"done": True,
})
response.json(
{
"resource": dest_obj,
"written": dest_obj["size"],
"size": dest_obj["size"],
"done": True,
}
)
except NotFound:
response.status = HTTPStatus.NOT_FOUND
except Conflict as err:
Expand Down Expand Up @@ -467,7 +521,7 @@ def compose(request, response, storage, *args, **kwargs):
)

try:
dest_obj = _checksums(dest_file, dest_obj)
dest_obj = checksums(dest_file, dest_obj)
storage.create_file(
request.params["bucket_name"],
request.params["object_id"],
Expand Down
Loading

0 comments on commit 7bcbfd1

Please sign in to comment.