From 14be82633aba4790e824ee229a1ec119b2ba72bf Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 8 Feb 2024 10:19:07 -0500 Subject: [PATCH 01/67] Initial object-store implementation --- src/zarr/v3/store/object_store.py | 78 +++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 src/zarr/v3/store/object_store.py diff --git a/src/zarr/v3/store/object_store.py b/src/zarr/v3/store/object_store.py new file mode 100644 index 0000000000..3790f621be --- /dev/null +++ b/src/zarr/v3/store/object_store.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import asyncio +from typing import List, Optional, Tuple + +from object_store import ObjectStore as _ObjectStore +from object_store import Path as ObjectPath + +from zarr.v3.abc.store import Store + + +class ObjectStore(Store): + supports_writes: bool = True + supports_partial_writes: bool = False + supports_listing: bool = True + + store: _ObjectStore + + def init(self, store: _ObjectStore): + self.store = store + + def __str__(self) -> str: + return f"object://{self.store}" + + def __repr__(self) -> str: + return f"ObjectStore({repr(str(self))})" + + async def get( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[bytes]: + if byte_range is None: + return await self.store.get_async(ObjectPath(key)) + + start, end = byte_range + if end is None: + # Have to wrap a separate object-store function to support this + raise NotImplementedError + + return await self.store.get_range_async(ObjectPath(key), start, end - start) + + async def get_partial_values( + self, key_ranges: List[Tuple[str, Tuple[int, int]]] + ) -> List[bytes]: + # TODO: use rust-based concurrency inside object-store + futs = [self.get(key, byte_range=byte_range) for (key, byte_range) in key_ranges] + + # Seems like a weird type match where `get()` returns `Optional[bytes]` but + # `get_partial_values` is non-optional? + return await asyncio.gather(*futs) # type: ignore + + async def exists(self, key: str) -> bool: + try: + _ = await self.store.head_async(ObjectPath(key)) + return True + except FileNotFoundError: + return False + + async def set(self, key: str, value: bytes) -> None: + await self.store.put_async(ObjectPath(key), value) + + async def delete(self, key: str) -> None: + await self.store.delete_async(ObjectPath(key)) + + async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + raise NotImplementedError + + async def list(self) -> List[str]: + objects = await self.store.list_async(None) + return [str(obj.location) for obj in objects] + + async def list_prefix(self, prefix: str) -> List[str]: + objects = await self.store.list_async(ObjectPath(prefix)) + return [str(obj.location) for obj in objects] + + async def list_dir(self, prefix: str) -> List[str]: + list_result = await self.store.list_with_delimiter_async(ObjectPath(prefix)) + common_prefixes = set(list_result.common_prefixes) + return [str(obj.location) for obj in list_result.objects if obj not in common_prefixes] From afa79af04462778d1f69e5f739db20e09b7c7609 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 27 Feb 2024 17:10:07 -0500 Subject: [PATCH 02/67] Update src/zarr/v3/store/object_store.py Co-authored-by: Deepak Cherian --- src/zarr/v3/store/object_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/v3/store/object_store.py b/src/zarr/v3/store/object_store.py index 3790f621be..a834b44102 100644 --- a/src/zarr/v3/store/object_store.py +++ b/src/zarr/v3/store/object_store.py @@ -16,7 +16,7 @@ class ObjectStore(Store): store: _ObjectStore - def init(self, store: _ObjectStore): + def __init__(self, store: _ObjectStore): self.store = store def __str__(self) -> str: From f5c884bc3626dc47f327d868d996b244850aa526 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 22 Oct 2024 14:03:32 -0400 Subject: [PATCH 03/67] update --- src/zarr/storage/object_store.py | 141 ++++++++++++++++++++++++++++++ src/zarr/v3/store/object_store.py | 78 ----------------- 2 files changed, 141 insertions(+), 78 deletions(-) create mode 100644 src/zarr/storage/object_store.py delete mode 100644 src/zarr/v3/store/object_store.py diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py new file mode 100644 index 0000000000..862aa3c74d --- /dev/null +++ b/src/zarr/storage/object_store.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +import asyncio +from collections import defaultdict +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any + +import object_store_rs as obs + +from zarr.abc.store import ByteRangeRequest, Store +from zarr.core.buffer import Buffer +from zarr.core.buffer.core import BufferPrototype + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Coroutine, Iterable + from typing import Any + + from object_store_rs.store import ObjectStore as _ObjectStore + + from zarr.core.buffer import Buffer, BufferPrototype + from zarr.core.common import BytesLike + + +class ObjectStore(Store): + store: _ObjectStore + + def __init__(self, store: _ObjectStore) -> None: + self.store = store + + def __str__(self) -> str: + return f"object://{self.store}" + + def __repr__(self) -> str: + return f"ObjectStore({self!r})" + + async def get( + self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None + ) -> Buffer: + if byte_range is None: + resp = await obs.get_async(self.store, key) + return await resp.bytes_async() + + pass + + raise NotImplementedError + + async def get_partial_values( + self, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRangeRequest]], + ) -> list[Buffer | None]: + # TODO: this is a bit hacky and untested. ObjectStore has a `get_ranges` method + # that will additionally merge nearby ranges, but it's _per_ file. So we need to + # split these key_ranges into **per-file** key ranges, and then reassemble the + # results in the original order. + key_ranges = list(key_ranges) + + per_file_requests: dict[str, list[tuple[int | None, int | None, int]]] = defaultdict(list) + for idx, (path, range_) in enumerate(key_ranges): + per_file_requests[path].append((range_[0], range_[1], idx)) + + futs: list[Coroutine[Any, Any, list[bytes]]] = [] + for path, ranges in per_file_requests.items(): + offsets = [r[0] for r in ranges] + lengths = [r[1] - r[0] for r in ranges] + fut = obs.get_ranges_async(self.store, path, offsets=offsets, lengths=lengths) + futs.append(fut) + + result = await asyncio.gather(*futs) + + output_buffers: list[bytes] = [b""] * len(key_ranges) + for per_file_request, buffers in zip(per_file_requests.items(), result, strict=True): + path, ranges = per_file_request + for buffer, ranges_ in zip(buffers, ranges, strict=True): + initial_index = ranges_[2] + output_buffers[initial_index] = buffer + + return output_buffers + + async def exists(self, key: str) -> bool: + try: + await obs.head_async(self.store, key) + except FileNotFoundError: + return False + else: + return True + + @property + def supports_writes(self) -> bool: + return True + + async def set(self, key: str, value: Buffer) -> None: + buf = value.to_bytes() + await obs.put_async(self.store, key, buf) + + # TODO: + # async def set_if_not_exists(self, key: str, value: Buffer) -> None: + + @property + def supports_deletes(self) -> bool: + return True + + async def delete(self, key: str) -> None: + await obs.delete_async(self.store, key) + + @property + def supports_partial_writes(self) -> bool: + return False + + async def set_partial_values( + self, key_start_values: Iterable[tuple[str, int, BytesLike]] + ) -> None: + raise NotImplementedError + + @property + def supports_listing(self) -> bool: + return True + + def list(self) -> AsyncGenerator[str, None]: + # object-store-rs does not yet support list results as an async generator + # https://github.com/apache/arrow-rs/issues/6587 + objects = obs.list(self.store) + paths = [object["path"] for object in objects] + # Not sure how to convert list to async generator + return paths + + def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: + # object-store-rs does not yet support list results as an async generator + # https://github.com/apache/arrow-rs/issues/6587 + objects = obs.list(self.store, prefix=prefix) + paths = [object["path"] for object in objects] + # Not sure how to convert list to async generator + return paths + + def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: + # object-store-rs does not yet support list results as an async generator + # https://github.com/apache/arrow-rs/issues/6587 + objects = obs.list_with_delimiter(self.store, prefix=prefix) + paths = [object["path"] for object in objects["objects"]] + # Not sure how to convert list to async generator + return paths diff --git a/src/zarr/v3/store/object_store.py b/src/zarr/v3/store/object_store.py deleted file mode 100644 index a834b44102..0000000000 --- a/src/zarr/v3/store/object_store.py +++ /dev/null @@ -1,78 +0,0 @@ -from __future__ import annotations - -import asyncio -from typing import List, Optional, Tuple - -from object_store import ObjectStore as _ObjectStore -from object_store import Path as ObjectPath - -from zarr.v3.abc.store import Store - - -class ObjectStore(Store): - supports_writes: bool = True - supports_partial_writes: bool = False - supports_listing: bool = True - - store: _ObjectStore - - def __init__(self, store: _ObjectStore): - self.store = store - - def __str__(self) -> str: - return f"object://{self.store}" - - def __repr__(self) -> str: - return f"ObjectStore({repr(str(self))})" - - async def get( - self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[bytes]: - if byte_range is None: - return await self.store.get_async(ObjectPath(key)) - - start, end = byte_range - if end is None: - # Have to wrap a separate object-store function to support this - raise NotImplementedError - - return await self.store.get_range_async(ObjectPath(key), start, end - start) - - async def get_partial_values( - self, key_ranges: List[Tuple[str, Tuple[int, int]]] - ) -> List[bytes]: - # TODO: use rust-based concurrency inside object-store - futs = [self.get(key, byte_range=byte_range) for (key, byte_range) in key_ranges] - - # Seems like a weird type match where `get()` returns `Optional[bytes]` but - # `get_partial_values` is non-optional? - return await asyncio.gather(*futs) # type: ignore - - async def exists(self, key: str) -> bool: - try: - _ = await self.store.head_async(ObjectPath(key)) - return True - except FileNotFoundError: - return False - - async def set(self, key: str, value: bytes) -> None: - await self.store.put_async(ObjectPath(key), value) - - async def delete(self, key: str) -> None: - await self.store.delete_async(ObjectPath(key)) - - async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: - raise NotImplementedError - - async def list(self) -> List[str]: - objects = await self.store.list_async(None) - return [str(obj.location) for obj in objects] - - async def list_prefix(self, prefix: str) -> List[str]: - objects = await self.store.list_async(ObjectPath(prefix)) - return [str(obj.location) for obj in objects] - - async def list_dir(self, prefix: str) -> List[str]: - list_result = await self.store.list_with_delimiter_async(ObjectPath(prefix)) - common_prefixes = set(list_result.common_prefixes) - return [str(obj.location) for obj in list_result.objects if obj not in common_prefixes] From af2a39b52ed3d5ac2f33d2c280ac5e0675e67a2d Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 1 Nov 2024 16:54:54 -0400 Subject: [PATCH 04/67] Handle list streams --- src/zarr/storage/object_store.py | 56 +++++++++++++++++++------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 862aa3c74d..2bf709e470 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -5,7 +5,7 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any -import object_store_rs as obs +import obstore as obs from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer @@ -15,7 +15,8 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable from typing import Any - from object_store_rs.store import ObjectStore as _ObjectStore + from obstore import ListStream, ObjectMeta + from obstore.store import ObjectStore as _ObjectStore from zarr.core.buffer import Buffer, BufferPrototype from zarr.core.common import BytesLike @@ -93,8 +94,9 @@ async def set(self, key: str, value: Buffer) -> None: buf = value.to_bytes() await obs.put_async(self.store, key, buf) - # TODO: - # async def set_if_not_exists(self, key: str, value: Buffer) -> None: + async def set_if_not_exists(self, key: str, value: Buffer) -> None: + buf = value.to_bytes() + await obs.put_async(self.store, key, buf, mode="create") @property def supports_deletes(self) -> bool: @@ -117,25 +119,35 @@ def supports_listing(self) -> bool: return True def list(self) -> AsyncGenerator[str, None]: - # object-store-rs does not yet support list results as an async generator - # https://github.com/apache/arrow-rs/issues/6587 - objects = obs.list(self.store) - paths = [object["path"] for object in objects] - # Not sure how to convert list to async generator - return paths + objects: ListStream[list[ObjectMeta]] = obs.list(self.store) + return _transform_list(objects) def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: - # object-store-rs does not yet support list results as an async generator - # https://github.com/apache/arrow-rs/issues/6587 - objects = obs.list(self.store, prefix=prefix) - paths = [object["path"] for object in objects] - # Not sure how to convert list to async generator - return paths + objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) + return _transform_list(objects) def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: - # object-store-rs does not yet support list results as an async generator - # https://github.com/apache/arrow-rs/issues/6587 - objects = obs.list_with_delimiter(self.store, prefix=prefix) - paths = [object["path"] for object in objects["objects"]] - # Not sure how to convert list to async generator - return paths + objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) + return _transform_list_dir(objects, prefix) + + +async def _transform_list( + list_stream: AsyncGenerator[list[ObjectMeta], None], +) -> AsyncGenerator[str, None]: + async for batch in list_stream: + for item in batch: + yield item["path"] + + +async def _transform_list_dir( + list_stream: AsyncGenerator[list[ObjectMeta], None], prefix: str +) -> AsyncGenerator[str, None]: + # We assume that the underlying object-store implementation correctly handles the + # prefix, so we don't double-check that the returned results actually start with the + # given prefix. + prefix_len = len(prefix) + async for batch in list_stream: + for item in batch: + # Yield this item if "/" does not exist after the prefix. + if "/" not in item["path"][prefix_len:]: + yield item["path"] From d7cfbee1f37fe17f7c92335b8f1a569f88963608 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 1 Nov 2024 17:24:57 -0400 Subject: [PATCH 05/67] Update get --- src/zarr/storage/object_store.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 2bf709e470..8f40cd49d4 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -39,11 +39,22 @@ async def get( ) -> Buffer: if byte_range is None: resp = await obs.get_async(self.store, key) - return await resp.bytes_async() - - pass - - raise NotImplementedError + return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + + start, end = byte_range + if start is not None and end is not None: + resp = await obs.get_range_async(self.store, key, start=start, end=end) + return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + elif start is not None: + if start >= 0: + # Offset request + resp = await obs.get_async(self.store, key, options={"range": {"offset": start}}) + else: + resp = await obs.get_async(self.store, key, options={"range": {"suffix": start}}) + + return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + else: + raise ValueError(f"Unexpected input to `get`: {start=}, {end=}") async def get_partial_values( self, From cb40015f77d4e3df11f6801906ca1b3eb2201a47 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 1 Nov 2024 17:29:10 -0400 Subject: [PATCH 06/67] wip refactor get_partial_values --- src/zarr/storage/object_store.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 8f40cd49d4..ac56affa2d 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -15,6 +15,7 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable from typing import Any + from obstore import Buffer as ObjectStoreBuffer from obstore import ListStream, ObjectMeta from obstore.store import ObjectStore as _ObjectStore @@ -71,21 +72,21 @@ async def get_partial_values( for idx, (path, range_) in enumerate(key_ranges): per_file_requests[path].append((range_[0], range_[1], idx)) - futs: list[Coroutine[Any, Any, list[bytes]]] = [] + futs: list[Coroutine[Any, Any, list[ObjectStoreBuffer]]] = [] for path, ranges in per_file_requests.items(): - offsets = [r[0] for r in ranges] - lengths = [r[1] - r[0] for r in ranges] - fut = obs.get_ranges_async(self.store, path, offsets=offsets, lengths=lengths) + starts = [r[0] for r in ranges] + ends = [r[1] for r in ranges] + fut = obs.get_ranges_async(self.store, path, starts=starts, ends=ends) futs.append(fut) result = await asyncio.gather(*futs) - output_buffers: list[bytes] = [b""] * len(key_ranges) + output_buffers: list[type[BufferPrototype]] = [b""] * len(key_ranges) for per_file_request, buffers in zip(per_file_requests.items(), result, strict=True): path, ranges = per_file_request for buffer, ranges_ in zip(buffers, ranges, strict=True): initial_index = ranges_[2] - output_buffers[initial_index] = buffer + output_buffers[initial_index] = prototype.buffer.from_buffer(memoryview(buffer)) return output_buffers From b97645016717c37c6fa0a86c0edeb727fada10f0 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 7 Nov 2024 15:35:44 -0500 Subject: [PATCH 07/67] Fixes to _get_partial_values --- src/zarr/storage/object_store.py | 186 ++++++++++++++++++++++++++----- 1 file changed, 156 insertions(+), 30 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index ac56affa2d..7383ec3ac7 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -3,7 +3,7 @@ import asyncio from collections import defaultdict from collections.abc import Iterable -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypedDict import obstore as obs @@ -15,8 +15,7 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable from typing import Any - from obstore import Buffer as ObjectStoreBuffer - from obstore import ListStream, ObjectMeta + from obstore import ListStream, ObjectMeta, OffsetRange, SuffixRange from obstore.store import ObjectStore as _ObjectStore from zarr.core.buffer import Buffer, BufferPrototype @@ -62,33 +61,7 @@ async def get_partial_values( prototype: BufferPrototype, key_ranges: Iterable[tuple[str, ByteRangeRequest]], ) -> list[Buffer | None]: - # TODO: this is a bit hacky and untested. ObjectStore has a `get_ranges` method - # that will additionally merge nearby ranges, but it's _per_ file. So we need to - # split these key_ranges into **per-file** key ranges, and then reassemble the - # results in the original order. - key_ranges = list(key_ranges) - - per_file_requests: dict[str, list[tuple[int | None, int | None, int]]] = defaultdict(list) - for idx, (path, range_) in enumerate(key_ranges): - per_file_requests[path].append((range_[0], range_[1], idx)) - - futs: list[Coroutine[Any, Any, list[ObjectStoreBuffer]]] = [] - for path, ranges in per_file_requests.items(): - starts = [r[0] for r in ranges] - ends = [r[1] for r in ranges] - fut = obs.get_ranges_async(self.store, path, starts=starts, ends=ends) - futs.append(fut) - - result = await asyncio.gather(*futs) - - output_buffers: list[type[BufferPrototype]] = [b""] * len(key_ranges) - for per_file_request, buffers in zip(per_file_requests.items(), result, strict=True): - path, ranges = per_file_request - for buffer, ranges_ in zip(buffers, ranges, strict=True): - initial_index = ranges_[2] - output_buffers[initial_index] = prototype.buffer.from_buffer(memoryview(buffer)) - - return output_buffers + return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges) async def exists(self, key: str) -> bool: try: @@ -163,3 +136,156 @@ async def _transform_list_dir( # Yield this item if "/" does not exist after the prefix. if "/" not in item["path"][prefix_len:]: yield item["path"] + + +class BoundedRequest(TypedDict): + """Range request with a known start and end byte. + + These requests can be multiplexed natively on the Rust side with + `obstore.get_ranges_async`. + """ + + original_request_index: int + """The positional index in the original key_ranges input""" + + start: int + """Start byte offset.""" + + end: int + """End byte offset.""" + + +class OtherRequest(TypedDict): + """Offset or suffix range requests. + + These requests cannot be concurrent on the Rust side, and each need their own call + to `obstore.get_async`, passing in the `range` parameter. + """ + + original_request_index: int + """The positional index in the original key_ranges input""" + + path: str + """The path to request from.""" + + range: OffsetRange | SuffixRange + """The range request type.""" + + +class Response(TypedDict): + """A response buffer associated with the original index that it should be restored to.""" + + original_request_index: int + """The positional index in the original key_ranges input""" + + buffer: Buffer + """The buffer returned from obstore's range request.""" + + +async def _make_bounded_requests( + store: obs.store.ObjectStore, + path: str, + requests: list[BoundedRequest], + prototype: BufferPrototype, +) -> list[Response]: + """Make all bounded requests for a specific file. + + `obstore.get_ranges_async` allows for making concurrent requests for multiple ranges + within a single file, and will e.g. merge concurrent requests. This only uses one + single Python coroutine. + """ + + starts = [r["start"] for r in requests] + ends = [r["end"] for r in requests] + responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) + + buffer_responses: list[Response] = [] + for request, response in zip(requests, responses, strict=True): + buffer_responses.append( + { + "original_request_index": request["original_request_index"], + "buffer": prototype.buffer.from_bytes(memoryview(response)), + } + ) + + return buffer_responses + + +async def _make_other_request( + store: obs.store.ObjectStore, + request: OtherRequest, + prototype: BufferPrototype, +) -> list[Response]: + """Make suffix or offset requests. + + We return a `list[Response]` for symmetry with `_make_bounded_requests` so that all + futures can be gathered together. + """ + resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) + buffer = await resp.bytes_async() + return [ + { + "original_request_index": request["original_request_index"], + "buffer": prototype.buffer.from_bytes(buffer), + } + ] + + +async def _get_partial_values( + store: obs.store.ObjectStore, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRangeRequest]], +) -> list[Buffer | None]: + """Make multiple range requests. + + ObjectStore has a `get_ranges` method that will additionally merge nearby ranges, + but it's _per_ file. So we need to split these key_ranges into **per-file** key + ranges, and then reassemble the results in the original order. + + We separate into different requests: + + - One call to `obstore.get_ranges_async` **per target file** + - One call to `obstore.get_async` for each other request. + """ + key_ranges = list(key_ranges) + per_file_bounded_requests: dict[str, list[BoundedRequest]] = defaultdict(list) + other_requests: list[OtherRequest] = [] + + for idx, (path, (start, end)) in enumerate(key_ranges): + if start is None: + raise ValueError("Cannot pass `None` for the start of the range request.") + + if end is not None: + # This is a bounded request with known start and end byte. + per_file_bounded_requests[path].append( + {"original_request_index": idx, "start": start, "end": end} + ) + elif end is None and start < 0: + # Suffix request from the end + other_requests.append( + {"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} + ) + elif end is None and start > 0: + # Offset request to the end + other_requests.append( + {"original_request_index": idx, "path": path, "range": {"offset": start}} + ) + else: + raise ValueError(f"Unsupported range input: {start=}, {end=}") + + futs: list[Coroutine[Any, Any, list[Response]]] = [] + for path, bounded_ranges in per_file_bounded_requests.items(): + futs.append(_make_bounded_requests(store, path, bounded_ranges, prototype)) + + for request in other_requests: + futs.append(_make_other_request(store, request, prototype)) # noqa: PERF401 + + buffers: list[Buffer | None] = [None] * len(key_ranges) + + # TODO: this gather a list of list of Response; not sure if there's a way to + # unpack these lists inside of an `asyncio.gather`? + for responses in await asyncio.gather(*futs): + for resp in responses: + buffers[resp["original_request_index"]] = resp["buffer"] + + return buffers From f2c827dea81b0c5d8d6c83a525b402b4fe59960b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 7 Nov 2024 15:38:54 -0500 Subject: [PATCH 08/67] Fix constructing prototype from get --- src/zarr/storage/object_store.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 7383ec3ac7..dd56697321 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -39,12 +39,12 @@ async def get( ) -> Buffer: if byte_range is None: resp = await obs.get_async(self.store, key) - return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + return prototype.buffer.from_bytes(await resp.bytes_async()) start, end = byte_range if start is not None and end is not None: resp = await obs.get_range_async(self.store, key, start=start, end=end) - return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + return prototype.buffer.from_bytes(memoryview(resp)) elif start is not None: if start >= 0: # Offset request @@ -52,7 +52,7 @@ async def get( else: resp = await obs.get_async(self.store, key, options={"range": {"suffix": start}}) - return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + return prototype.buffer.from_bytes(await resp.bytes_async()) else: raise ValueError(f"Unexpected input to `get`: {start=}, {end=}") @@ -265,7 +265,7 @@ async def _get_partial_values( other_requests.append( {"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} ) - elif end is None and start > 0: + elif end is None and start >= 0: # Offset request to the end other_requests.append( {"original_request_index": idx, "path": path, "range": {"offset": start}} From 5c8903f99bc647469f9ef7638ab57e7061b50662 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 7 Nov 2024 15:39:29 -0500 Subject: [PATCH 09/67] lint --- src/zarr/storage/object_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index dd56697321..7945a29568 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -260,12 +260,12 @@ async def _get_partial_values( per_file_bounded_requests[path].append( {"original_request_index": idx, "start": start, "end": end} ) - elif end is None and start < 0: + elif start < 0: # Suffix request from the end other_requests.append( {"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} ) - elif end is None and start >= 0: + elif start >= 0: # Offset request to the end other_requests.append( {"original_request_index": idx, "path": path, "range": {"offset": start}} From 8bb252e045f95c29d05b8613e491d079d578e4c3 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 18 Nov 2024 17:13:06 +0000 Subject: [PATCH 10/67] Add docstring --- src/zarr/storage/object_store.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 7945a29568..a915696a16 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -23,6 +23,14 @@ class ObjectStore(Store): + """A Zarr store that uses obstore for fast read/write from AWS, GCP, and Azure. + + Parameters + ---------- + store : obstore.store.ObjectStore + An obstore store instance that is set up with the proper credentials. + """ + store: _ObjectStore def __init__(self, store: _ObjectStore) -> None: From 559eafde892f16a3cdaa02d6336f85fb52a16c3c Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 18 Nov 2024 17:14:35 +0000 Subject: [PATCH 11/67] Make names private --- src/zarr/storage/object_store.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index a915696a16..2a728470ad 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -146,7 +146,7 @@ async def _transform_list_dir( yield item["path"] -class BoundedRequest(TypedDict): +class _BoundedRequest(TypedDict): """Range request with a known start and end byte. These requests can be multiplexed natively on the Rust side with @@ -163,7 +163,7 @@ class BoundedRequest(TypedDict): """End byte offset.""" -class OtherRequest(TypedDict): +class _OtherRequest(TypedDict): """Offset or suffix range requests. These requests cannot be concurrent on the Rust side, and each need their own call @@ -180,7 +180,7 @@ class OtherRequest(TypedDict): """The range request type.""" -class Response(TypedDict): +class _Response(TypedDict): """A response buffer associated with the original index that it should be restored to.""" original_request_index: int @@ -193,9 +193,9 @@ class Response(TypedDict): async def _make_bounded_requests( store: obs.store.ObjectStore, path: str, - requests: list[BoundedRequest], + requests: list[_BoundedRequest], prototype: BufferPrototype, -) -> list[Response]: +) -> list[_Response]: """Make all bounded requests for a specific file. `obstore.get_ranges_async` allows for making concurrent requests for multiple ranges @@ -207,7 +207,7 @@ async def _make_bounded_requests( ends = [r["end"] for r in requests] responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) - buffer_responses: list[Response] = [] + buffer_responses: list[_Response] = [] for request, response in zip(requests, responses, strict=True): buffer_responses.append( { @@ -221,12 +221,12 @@ async def _make_bounded_requests( async def _make_other_request( store: obs.store.ObjectStore, - request: OtherRequest, + request: _OtherRequest, prototype: BufferPrototype, -) -> list[Response]: +) -> list[_Response]: """Make suffix or offset requests. - We return a `list[Response]` for symmetry with `_make_bounded_requests` so that all + We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all futures can be gathered together. """ resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) @@ -256,8 +256,8 @@ async def _get_partial_values( - One call to `obstore.get_async` for each other request. """ key_ranges = list(key_ranges) - per_file_bounded_requests: dict[str, list[BoundedRequest]] = defaultdict(list) - other_requests: list[OtherRequest] = [] + per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list) + other_requests: list[_OtherRequest] = [] for idx, (path, (start, end)) in enumerate(key_ranges): if start is None: @@ -281,7 +281,7 @@ async def _get_partial_values( else: raise ValueError(f"Unsupported range input: {start=}, {end=}") - futs: list[Coroutine[Any, Any, list[Response]]] = [] + futs: list[Coroutine[Any, Any, list[_Response]]] = [] for path, bounded_ranges in per_file_bounded_requests.items(): futs.append(_make_bounded_requests(store, path, bounded_ranges, prototype)) From 5486e69081455bf4ef85102f98fbb6ae59cf2470 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 18 Nov 2024 17:35:21 +0000 Subject: [PATCH 12/67] Implement eq --- src/zarr/storage/object_store.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 2a728470ad..b7b9902aa0 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -32,6 +32,13 @@ class ObjectStore(Store): """ store: _ObjectStore + """The underlying obstore instance.""" + + def __eq__(self, value: object) -> bool: + if not isinstance(value, ObjectStore): + return False + + return self.store.__eq__(value.store) def __init__(self, store: _ObjectStore) -> None: self.store = store From 9a05c017e8755e941f89d7f1129efb05364c87e6 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 18 Nov 2024 13:19:14 -0500 Subject: [PATCH 13/67] Add obstore as a test dep --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 09797ae3d4..92c5812c4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ test = [ "mypy", "hypothesis", "universal-pathlib", + "obstore==0.3.0b5", ] jupyter = [ From 56b7a0bd365d4ee2f80530d357f37b8fc472d29b Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 18 Nov 2024 13:21:52 -0500 Subject: [PATCH 14/67] Run store tests on ObjectStore --- tests/test_store/test_object.py | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 tests/test_store/test_object.py diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py new file mode 100644 index 0000000000..f4ff234313 --- /dev/null +++ b/tests/test_store/test_object.py @@ -0,0 +1,7 @@ +from zarr.core.buffer import cpu +from zarr.storage.object_store import ObjectStore +from zarr.testing.store import StoreTests + + +class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): + store_cls = ObjectStore From b38ada1082bf7729106175336b6b1812dee66524 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 21 Nov 2024 14:55:01 +0000 Subject: [PATCH 15/67] import or skip --- tests/test_store/test_object.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index f4ff234313..0fa5a0a098 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -1,3 +1,8 @@ +# ruff: noqa: E402 +import pytest + +pytest.importorskip("obstore") + from zarr.core.buffer import cpu from zarr.storage.object_store import ObjectStore from zarr.testing.store import StoreTests From ab00b46daff61054595ea57eebff1772b1d67658 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 22 Nov 2024 14:52:54 +0000 Subject: [PATCH 16/67] Bump obstore beta version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 92c5812c4d..41fbd70b15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ test = [ "mypy", "hypothesis", "universal-pathlib", - "obstore==0.3.0b5", + "obstore==0.3.0b8", ] jupyter = [ From 9c65e4d50fffaabbd286e8f6be0dac10e5e4a518 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 22 Nov 2024 15:25:03 +0000 Subject: [PATCH 17/67] bump pre-commit --- .pre-commit-config.yaml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dd9660fa5f..df4f35a48a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -9,9 +9,9 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.7.3 hooks: - - id: ruff - args: ["--fix", "--show-fixes"] - - id: ruff-format + - id: ruff + args: ["--fix", "--show-fixes"] + - id: ruff-format - repo: https://github.com/codespell-project/codespell rev: v2.3.0 hooks: @@ -20,7 +20,7 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - - id: check-yaml + - id: check-yaml - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.13.0 hooks: @@ -35,6 +35,7 @@ repos: - numpy - typing_extensions - universal-pathlib + - obstore==0.3.0-beta.8 # Tests - pytest - repo: https://github.com/scientific-python/cookie From 77d7c127a0f4208d14c0b6f279cdb6d850319f26 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 5 Dec 2024 13:49:26 -0500 Subject: [PATCH 18/67] Add read_only param for __init__ --- src/zarr/storage/object_store.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index b7b9902aa0..af84904589 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -40,9 +40,11 @@ def __eq__(self, value: object) -> bool: return self.store.__eq__(value.store) - def __init__(self, store: _ObjectStore) -> None: + def __init__(self, store: _ObjectStore, *, read_only: bool = False) -> None: self.store = store + super().__init__(read_only=read_only) + def __str__(self) -> str: return f"object://{self.store}" From 44184260c0238bd6273664e45815c7537064a9b3 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 12:31:44 -0500 Subject: [PATCH 19/67] Bump obstore --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 41fbd70b15..05425f7fbb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ test = [ "mypy", "hypothesis", "universal-pathlib", - "obstore==0.3.0b8", + "obstore==0.3.0b9", ] jupyter = [ From 7a7117401b9493938a5a23b5482c4af941d7d2c5 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 14:52:39 -0500 Subject: [PATCH 20/67] Add fixtures for object store tests --- tests/test_store/test_object.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 0fa5a0a098..31a3fa0ebc 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -1,7 +1,7 @@ # ruff: noqa: E402 import pytest -pytest.importorskip("obstore") +obstore = pytest.importorskip("obstore") from zarr.core.buffer import cpu from zarr.storage.object_store import ObjectStore @@ -10,3 +10,13 @@ class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): store_cls = ObjectStore + buffer_cls = cpu.Buffer + + @pytest.fixture + def store_kwargs(self, tmpdir) -> dict[str, str | bool]: + store = obstore.store.LocalStore(prefix=tmpdir) + return {"store": store, "read_only": False} + + @pytest.fixture + def store(self, store_kwargs: dict[str, str | bool]) -> ObjectStore: + return self.store_cls(**store_kwargs) From e73bcc996f4bcf9738492db467f0582eeb785645 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 14:55:19 -0500 Subject: [PATCH 21/67] Cast return from __eq__ as bool --- src/zarr/storage/object_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index af84904589..95386bb2bf 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -38,7 +38,7 @@ def __eq__(self, value: object) -> bool: if not isinstance(value, ObjectStore): return False - return self.store.__eq__(value.store) + return bool(self.store.__eq__(value.store)) def __init__(self, store: _ObjectStore, *, read_only: bool = False) -> None: self.store = store From c2cd6b88283840a06ee6e13d4fe57b10d920fa22 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:00:47 -0500 Subject: [PATCH 22/67] Avoid recursion error on repr --- src/zarr/storage/object_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 95386bb2bf..bebd30f55f 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -49,7 +49,7 @@ def __str__(self) -> str: return f"object://{self.store}" def __repr__(self) -> str: - return f"ObjectStore({self!r})" + return f"ObjectStore({self})" async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None From a95ec59b4e5f198ca10dae5898e930019d78fdc9 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:25:43 -0500 Subject: [PATCH 23/67] Check store type at runtime --- src/zarr/storage/object_store.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index bebd30f55f..ee13364991 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -41,8 +41,19 @@ def __eq__(self, value: object) -> bool: return bool(self.store.__eq__(value.store)) def __init__(self, store: _ObjectStore, *, read_only: bool = False) -> None: + if not isinstance( + store, + ( + obs.store.AzureStore, + obs.store.GCSStore, + obs.store.HTTPStore, + obs.store.S3Store, + obs.store.LocalStore, + obs.store.MemoryStore, + ), + ): + raise TypeError(f"expected ObjectStore class, got {store!r}") self.store = store - super().__init__(read_only=read_only) def __str__(self) -> str: From ca261b115b26daf0b5a1fdfa417df9c2e9792082 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:31:28 -0500 Subject: [PATCH 24/67] Check if store is writable for setting or deleting objects --- src/zarr/storage/object_store.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index ee13364991..744f6bdc3e 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -104,10 +104,12 @@ def supports_writes(self) -> bool: return True async def set(self, key: str, value: Buffer) -> None: + self._check_writable() buf = value.to_bytes() await obs.put_async(self.store, key, buf) async def set_if_not_exists(self, key: str, value: Buffer) -> None: + self._check_writable() buf = value.to_bytes() await obs.put_async(self.store, key, buf, mode="create") @@ -116,6 +118,7 @@ def supports_deletes(self) -> bool: return True async def delete(self, key: str) -> None: + self._check_writable() await obs.delete_async(self.store, key) @property From 0eb416aaf80c1469a7b79e2b503cee167824a7a9 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:57:19 -0500 Subject: [PATCH 25/67] Add test for object store repr --- tests/test_store/test_object.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 31a3fa0ebc..b891cb3e35 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -20,3 +20,9 @@ def store_kwargs(self, tmpdir) -> dict[str, str | bool]: @pytest.fixture def store(self, store_kwargs: dict[str, str | bool]) -> ObjectStore: return self.store_cls(**store_kwargs) + + def test_store_repr(self, store: ObjectStore) -> None: + from fnmatch import fnmatch + + pattern = "ObjectStore(object://LocalStore(file:///*))" + assert fnmatch(f"{store!r}", pattern) From 247432f2cf10a0bd76c4f6438da2bee40f9caec9 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 17:29:08 -0500 Subject: [PATCH 26/67] Add attribute tests --- tests/test_store/test_object.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index b891cb3e35..da435ba9d7 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -26,3 +26,12 @@ def test_store_repr(self, store: ObjectStore) -> None: pattern = "ObjectStore(object://LocalStore(file:///*))" assert fnmatch(f"{store!r}", pattern) + + def test_store_supports_writes(self, store: ObjectStore) -> None: + assert store.supports_writes + + def test_store_supports_partial_writes(self, store: ObjectStore) -> None: + assert not store.supports_partial_writes + + def test_store_supports_listing(self, store: ObjectStore) -> None: + assert store.supports_listing From 4b31b3c8a809cabc4da0f41b4b233f7969ab88f4 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 20:59:42 -0500 Subject: [PATCH 27/67] Add get and set methods to test class --- src/zarr/testing/store.py | 4 ++-- tests/test_store/test_object.py | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index d26d83e566..ddd4667017 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -23,7 +23,7 @@ class StoreTests(Generic[S, B]): async def set(self, store: S, key: str, value: Buffer) -> None: """ Insert a value into a storage backend, with a specific key. - This should not not use any store methods. Bypassing the store methods allows them to be + This should not use any store methods. Bypassing the store methods allows them to be tested. """ raise NotImplementedError @@ -31,7 +31,7 @@ async def set(self, store: S, key: str, value: Buffer) -> None: async def get(self, store: S, key: str) -> Buffer: """ Retrieve a value from a storage backend, by key. - This should not not use any store methods. Bypassing the store methods allows them to be + This should not use any store methods. Bypassing the store methods allows them to be tested. """ diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index da435ba9d7..9799e19ac8 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -3,10 +3,14 @@ obstore = pytest.importorskip("obstore") -from zarr.core.buffer import cpu +import re + +from zarr.core.buffer import Buffer, cpu from zarr.storage.object_store import ObjectStore from zarr.testing.store import StoreTests +PATTERN = r"file://(/[\w/.-]+)" + class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): store_cls = ObjectStore @@ -21,6 +25,18 @@ def store_kwargs(self, tmpdir) -> dict[str, str | bool]: def store(self, store_kwargs: dict[str, str | bool]) -> ObjectStore: return self.store_cls(**store_kwargs) + async def get(self, store: ObjectStore, key: str) -> Buffer: + # TODO: There must be a better way to get the path to the store + store_path = re.search(PATTERN, str(store)).group(1) + new_local_store = obstore.store.LocalStore(prefix=store_path) + return self.buffer_cls.from_bytes(obstore.get(new_local_store, key)) + + async def set(self, store: ObjectStore, key: str, value: Buffer) -> None: + # TODO: There must be a better way to get the path to the store + store_path = re.search(PATTERN, str(store)).group(1) + new_local_store = obstore.store.LocalStore(prefix=store_path) + obstore.put(new_local_store, key, value.to_bytes()) + def test_store_repr(self, store: ObjectStore) -> None: from fnmatch import fnmatch From d49d1ff903a51e68bd6d80502b319eb24cec79e9 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 13 Dec 2024 21:16:59 -0500 Subject: [PATCH 28/67] Raise an exeption for previously set key --- src/zarr/testing/store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index ddd4667017..6a0f73c00a 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -308,8 +308,9 @@ async def test_set_if_not_exists(self, store: S) -> None: data_buf = self.buffer_cls.from_bytes(b"0000") await self.set(store, key, data_buf) - new = self.buffer_cls.from_bytes(b"1111") - await store.set_if_not_exists("k", new) # no error + with pytest.raises(Exception): + new = self.buffer_cls.from_bytes(b"1111") + await store.set_if_not_exists("k", new) result = await store.get(key, default_buffer_prototype()) assert result == data_buf From c2ebc8ffe73eef6110a0a75e43560f9776e9b258 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Sat, 14 Dec 2024 15:39:06 -0500 Subject: [PATCH 29/67] Update src/zarr/testing/store.py Co-authored-by: Davis Bennett --- src/zarr/testing/store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 6a0f73c00a..7b70ccc51b 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -310,7 +310,7 @@ async def test_set_if_not_exists(self, store: S) -> None: with pytest.raises(Exception): new = self.buffer_cls.from_bytes(b"1111") - await store.set_if_not_exists("k", new) + await store.set_if_not_exists(key, new) result = await store.get(key, default_buffer_prototype()) assert result == data_buf From eb76698a18001aba21429b5b798e342f789d7bca Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Sat, 14 Dec 2024 21:05:38 -0500 Subject: [PATCH 30/67] Update _transform_list_dir to not remove all items --- src/zarr/storage/object_store.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 744f6bdc3e..13535c6e67 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -161,12 +161,13 @@ async def _transform_list_dir( # We assume that the underlying object-store implementation correctly handles the # prefix, so we don't double-check that the returned results actually start with the # given prefix. - prefix_len = len(prefix) + prefix_len = len(prefix) + 1 # If one is not added to the length, all items will contain "/" async for batch in list_stream: for item in batch: - # Yield this item if "/" does not exist after the prefix. - if "/" not in item["path"][prefix_len:]: - yield item["path"] + # Yield this item if "/" does not exist after the prefix + item_path = item["path"][prefix_len:] + if "/" not in item_path: + yield item_path class _BoundedRequest(TypedDict): From f310260df9cf75dc0822f8c9511e29bf4320e5a2 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Sat, 14 Dec 2024 21:10:42 -0500 Subject: [PATCH 31/67] Return bytes from GetResult --- tests/test_store/test_object.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 9799e19ac8..ca6a1b176f 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -29,7 +29,7 @@ async def get(self, store: ObjectStore, key: str) -> Buffer: # TODO: There must be a better way to get the path to the store store_path = re.search(PATTERN, str(store)).group(1) new_local_store = obstore.store.LocalStore(prefix=store_path) - return self.buffer_cls.from_bytes(obstore.get(new_local_store, key)) + return self.buffer_cls.from_bytes(obstore.get(new_local_store, key).bytes()) async def set(self, store: ObjectStore, key: str, value: Buffer) -> None: # TODO: There must be a better way to get the path to the store From 86951b8eb6f87459d77318d37360b6424d2df49c Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Sat, 14 Dec 2024 22:38:54 -0500 Subject: [PATCH 32/67] Don't raise an exception on set_if_not_exists --- src/zarr/storage/object_store.py | 4 +++- src/zarr/testing/store.py | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 13535c6e67..9bc3526058 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextlib from collections import defaultdict from collections.abc import Iterable from typing import TYPE_CHECKING, Any, TypedDict @@ -111,7 +112,8 @@ async def set(self, key: str, value: Buffer) -> None: async def set_if_not_exists(self, key: str, value: Buffer) -> None: self._check_writable() buf = value.to_bytes() - await obs.put_async(self.store, key, buf, mode="create") + with contextlib.suppress(obs.exceptions.AlreadyExistsError): + await obs.put_async(self.store, key, buf, mode="create") @property def supports_deletes(self) -> bool: diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 7b70ccc51b..ddd4667017 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -308,9 +308,8 @@ async def test_set_if_not_exists(self, store: S) -> None: data_buf = self.buffer_cls.from_bytes(b"0000") await self.set(store, key, data_buf) - with pytest.raises(Exception): - new = self.buffer_cls.from_bytes(b"1111") - await store.set_if_not_exists(key, new) + new = self.buffer_cls.from_bytes(b"1111") + await store.set_if_not_exists("k", new) # no error result = await store.get(key, default_buffer_prototype()) assert result == data_buf From f989884820e6a5a8e1a35dc1139752fd57e63aef Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Sat, 14 Dec 2024 22:41:17 -0500 Subject: [PATCH 33/67] Remove test that stores empty file --- src/zarr/testing/store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index ddd4667017..54e5e7e9f0 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -103,14 +103,14 @@ def test_store_supports_listing(self, store: S) -> None: raise NotImplementedError @pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"]) - @pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""]) @pytest.mark.parametrize("byte_range", [None, (0, None), (1, None), (1, 2), (None, 1)]) async def test_get( - self, store: S, key: str, data: bytes, byte_range: None | tuple[int | None, int | None] + self, store: S, key: str, byte_range: None | tuple[int | None, int | None] ) -> None: """ Ensure that data can be read from the store using the store.get method. """ + data = b"\x01\x02\x03\x04" data_buf = self.buffer_cls.from_bytes(data) await self.set(store, key, data_buf) observed = await store.get(key, prototype=default_buffer_prototype(), byte_range=byte_range) From 40e1b25bf6ffd66572fd1c081a2ddbedc2aaefd3 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Sat, 14 Dec 2024 22:50:06 -0500 Subject: [PATCH 34/67] Handle None as start or end of byte range request --- src/zarr/storage/object_store.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 9bc3526058..8304e578f1 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -71,17 +71,22 @@ async def get( return prototype.buffer.from_bytes(await resp.bytes_async()) start, end = byte_range + if (start is None or start == 0) and end is None: + resp = await obs.get_async(self.store, key) + return prototype.buffer.from_bytes(await resp.bytes_async()) if start is not None and end is not None: resp = await obs.get_range_async(self.store, key, start=start, end=end) return prototype.buffer.from_bytes(memoryview(resp)) elif start is not None: - if start >= 0: + if start > 0: # Offset request resp = await obs.get_async(self.store, key, options={"range": {"offset": start}}) else: resp = await obs.get_async(self.store, key, options={"range": {"suffix": start}}) - return prototype.buffer.from_bytes(await resp.bytes_async()) + elif end is not None: + resp = await obs.get_range_async(self.store, key, start=0, end=end) + return prototype.buffer.from_bytes(memoryview(resp)) else: raise ValueError(f"Unexpected input to `get`: {start=}, {end=}") From 26fa37ef277f24f0f81d3d1e6309c26ed3fb9fdb Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:52:58 -0500 Subject: [PATCH 35/67] Use new ByteRequest syntax --- src/zarr/storage/object_store.py | 88 +++++++++++++++++++------------- 1 file changed, 52 insertions(+), 36 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 8304e578f1..71c4c36215 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -8,7 +8,13 @@ import obstore as obs -from zarr.abc.store import ByteRangeRequest, Store +from zarr.abc.store import ( + ByteRequest, + OffsetByteRequest, + RangeByteRequest, + Store, + SuffixByteRequest, +) from zarr.core.buffer import Buffer from zarr.core.buffer.core import BufferPrototype @@ -64,36 +70,33 @@ def __repr__(self) -> str: return f"ObjectStore({self})" async def get( - self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None + self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer: if byte_range is None: resp = await obs.get_async(self.store, key) return prototype.buffer.from_bytes(await resp.bytes_async()) - - start, end = byte_range - if (start is None or start == 0) and end is None: - resp = await obs.get_async(self.store, key) - return prototype.buffer.from_bytes(await resp.bytes_async()) - if start is not None and end is not None: - resp = await obs.get_range_async(self.store, key, start=start, end=end) + elif isinstance(byte_range, RangeByteRequest): + resp = await obs.get_range_async( + self.store, key, start=byte_range.start, end=byte_range.end + ) return prototype.buffer.from_bytes(memoryview(resp)) - elif start is not None: - if start > 0: - # Offset request - resp = await obs.get_async(self.store, key, options={"range": {"offset": start}}) - else: - resp = await obs.get_async(self.store, key, options={"range": {"suffix": start}}) + elif isinstance(byte_range, OffsetByteRequest): + resp = await obs.get_async( + self.store, key, options={"range": {"offset": byte_range.offset}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) + elif isinstance(byte_range, SuffixByteRequest): + resp = await obs.get_async( + self.store, key, options={"range": {"suffix": byte_range.suffix}} + ) return prototype.buffer.from_bytes(await resp.bytes_async()) - elif end is not None: - resp = await obs.get_range_async(self.store, key, start=0, end=end) - return prototype.buffer.from_bytes(memoryview(resp)) else: - raise ValueError(f"Unexpected input to `get`: {start=}, {end=}") + raise ValueError(f"Unexpected input to `get`: {byte_range}") async def get_partial_values( self, prototype: BufferPrototype, - key_ranges: Iterable[tuple[str, ByteRangeRequest]], + key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges) @@ -260,7 +263,10 @@ async def _make_other_request( We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all futures can be gathered together. """ - resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) + if request["range"] is None: + resp = await obs.get_async(store, request["path"]) + else: + resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) buffer = await resp.bytes_async() return [ { @@ -273,7 +279,7 @@ async def _make_other_request( async def _get_partial_values( store: obs.store.ObjectStore, prototype: BufferPrototype, - key_ranges: Iterable[tuple[str, ByteRangeRequest]], + key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: """Make multiple range requests. @@ -290,27 +296,37 @@ async def _get_partial_values( per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list) other_requests: list[_OtherRequest] = [] - for idx, (path, (start, end)) in enumerate(key_ranges): - if start is None: - raise ValueError("Cannot pass `None` for the start of the range request.") - - if end is not None: - # This is a bounded request with known start and end byte. + for idx, (path, byte_range) in enumerate(key_ranges): + if byte_range is None: + other_requests.append( + { + "original_request_index": idx, + "path": path, + "range": None, + } + ) + elif isinstance(byte_range, RangeByteRequest): per_file_bounded_requests[path].append( - {"original_request_index": idx, "start": start, "end": end} + {"original_request_index": idx, "start": byte_range.start, "end": byte_range.end} ) - elif start < 0: - # Suffix request from the end + elif isinstance(byte_range, OffsetByteRequest): other_requests.append( - {"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} + { + "original_request_index": idx, + "path": path, + "range": {"offset": byte_range.offset}, + } ) - elif start >= 0: - # Offset request to the end + elif isinstance(byte_range, SuffixByteRequest): other_requests.append( - {"original_request_index": idx, "path": path, "range": {"offset": start}} + { + "original_request_index": idx, + "path": path, + "range": {"suffix": byte_range.suffix}, + } ) else: - raise ValueError(f"Unsupported range input: {start=}, {end=}") + raise ValueError(f"Unsupported range input: {byte_range}") futs: list[Coroutine[Any, Any, list[_Response]]] = [] for path, bounded_ranges in per_file_bounded_requests.items(): From 315e22ef21e19972c51de6cf1b75b5e7ea5d889d Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 10 Jan 2025 18:03:20 -0500 Subject: [PATCH 36/67] Raise not implemented error on pickling --- src/zarr/storage/object_store.py | 6 ++++++ tests/test_store/test_object.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 71c4c36215..45e7b708d1 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -69,6 +69,12 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"ObjectStore({self})" + def __getstate__(self) -> None: + raise NotImplementedError("Pickling has not been implement for ObjectStore") + + def __setstate__(self) -> None: + raise NotImplementedError("Pickling has not been implement for ObjectStore") + async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer: diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index ca6a1b176f..70a105c986 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -3,6 +3,7 @@ obstore = pytest.importorskip("obstore") +import pickle import re from zarr.core.buffer import Buffer, cpu @@ -51,3 +52,8 @@ def test_store_supports_partial_writes(self, store: ObjectStore) -> None: def test_store_supports_listing(self, store: ObjectStore) -> None: assert store.supports_listing + + @pytest.mark.xfail(reason="Not Implemented") + def test_serializable_store(self, store: ObjectStore) -> None: + foo = pickle.dumps(store) + assert pickle.loads(foo) == store From fc930297c52092a628490b81072c14680212f848 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 16 Jan 2025 20:15:14 -0500 Subject: [PATCH 37/67] Bump obstore --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 46cb0e35d1..d369e3ffc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"] # User extras remote = [ "fsspec>=2023.10.0", - "obstore==0.3.0b10", + "obstore==0.3.0", ] gpu = [ @@ -77,7 +77,7 @@ test = [ "mypy", "hypothesis", "universal-pathlib", - "obstore==0.3.0b10", + "obstore==0.3.0", ] optional = ["rich", "universal-pathlib"] docs = [ From 1b9f9f2d52d6ebceebde87ecf97810826219b88c Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 16 Jan 2025 21:11:06 -0500 Subject: [PATCH 38/67] Catch allowed exceptions --- src/zarr/storage/object_store.py | 51 +++++++++++++++++++------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 45e7b708d1..28e3f00159 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -28,6 +28,12 @@ from zarr.core.buffer import Buffer, BufferPrototype from zarr.core.common import BytesLike +ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( + FileNotFoundError, + IsADirectoryError, + NotADirectoryError, +) + class ObjectStore(Store): """A Zarr store that uses obstore for fast read/write from AWS, GCP, and Azure. @@ -77,27 +83,30 @@ def __setstate__(self) -> None: async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None - ) -> Buffer: - if byte_range is None: - resp = await obs.get_async(self.store, key) - return prototype.buffer.from_bytes(await resp.bytes_async()) - elif isinstance(byte_range, RangeByteRequest): - resp = await obs.get_range_async( - self.store, key, start=byte_range.start, end=byte_range.end - ) - return prototype.buffer.from_bytes(memoryview(resp)) - elif isinstance(byte_range, OffsetByteRequest): - resp = await obs.get_async( - self.store, key, options={"range": {"offset": byte_range.offset}} - ) - return prototype.buffer.from_bytes(await resp.bytes_async()) - elif isinstance(byte_range, SuffixByteRequest): - resp = await obs.get_async( - self.store, key, options={"range": {"suffix": byte_range.suffix}} - ) - return prototype.buffer.from_bytes(await resp.bytes_async()) - else: - raise ValueError(f"Unexpected input to `get`: {byte_range}") + ) -> Buffer | None: + try: + if byte_range is None: + resp = await obs.get_async(self.store, key) + return prototype.buffer.from_bytes(await resp.bytes_async()) + elif isinstance(byte_range, RangeByteRequest): + resp = await obs.get_range_async( + self.store, key, start=byte_range.start, end=byte_range.end + ) + return prototype.buffer.from_bytes(memoryview(resp)) + elif isinstance(byte_range, OffsetByteRequest): + resp = await obs.get_async( + self.store, key, options={"range": {"offset": byte_range.offset}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) + elif isinstance(byte_range, SuffixByteRequest): + resp = await obs.get_async( + self.store, key, options={"range": {"suffix": byte_range.suffix}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) + else: + raise ValueError(f"Unexpected input to `get`: {byte_range}") + except ALLOWED_EXCEPTIONS: + return None async def get_partial_values( self, From b18de3877df147a7fbc6a82345722bb5dad3718a Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Tue, 4 Feb 2025 13:17:33 -0500 Subject: [PATCH 39/67] WIP --- pyproject.toml | 2 +- src/zarr/storage/object_store.py | 33 ++++++++++++++++++++++---------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b16d3c86e8..fd8e91e83c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,7 +86,6 @@ test = [ "mypy", "hypothesis", "universal-pathlib", - "obstore==0.3.0", ] optional = ["rich", "universal-pathlib"] docs = [ @@ -213,6 +212,7 @@ dependencies = [ 'universal_pathlib @ git+https://github.com/fsspec/universal_pathlib', 'typing_extensions @ git+https://github.com/python/typing_extensions', 'donfig @ git+https://github.com/pytroll/donfig', + "obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore", # test deps 'hypothesis', 'pytest', diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 28e3f00159..081bc9a404 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -168,7 +168,7 @@ def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: return _transform_list(objects) def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: - objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) + objects: ListStream[list[ObjectMeta]] = obs.list_with_delimiter(self.store, prefix=prefix) return _transform_list_dir(objects, prefix) @@ -179,20 +179,33 @@ async def _transform_list( for item in batch: yield item["path"] - +# ['zarr.json', 'c/0', 'c/1'] -> ['zarr.json'] +# ['zarr.json', 'c/0', 'c/1'] -> ['zarr.json, 'c'] | ['zarr.json, 'c', 'c'] async def _transform_list_dir( - list_stream: AsyncGenerator[list[ObjectMeta], None], prefix: str + list_stream: obstore.ListResult , None], prefix: str ) -> AsyncGenerator[str, None]: # We assume that the underlying object-store implementation correctly handles the # prefix, so we don't double-check that the returned results actually start with the # given prefix. - prefix_len = len(prefix) + 1 # If one is not added to the length, all items will contain "/" - async for batch in list_stream: - for item in batch: - # Yield this item if "/" does not exist after the prefix - item_path = item["path"][prefix_len:] - if "/" not in item_path: - yield item_path + prefixes = list_stream["common_prefixes"] + objects = [obj["path"].lstrip(prefix) for obj in list_stream["objects"]] + full = prefixes + objects + for item in full: + yield item + + # prefix_len = len(prefix) + # async for batch in list_stream: + # returned_prefixes = [] + # for item in batch: + # # Yield this item if "/" does not exist after the prefix + # item_path = item["path"][prefix_len:] + # if "/" not in item_path: + # yield item_path + # else: + # prefix = item_path.split("/")[0] + # if prefix in returned_prefixes: + # returned_prefixes.append(prefix) + # yield prefix class _BoundedRequest(TypedDict): From 6976738e9c0f6d0aadf9e23dbf4623437a8e70be Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 4 Feb 2025 13:25:33 -0500 Subject: [PATCH 40/67] Fix list dir --- src/zarr/storage/object_store.py | 37 ++++++++++++-------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 081bc9a404..1a68450f8b 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -22,7 +22,7 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable from typing import Any - from obstore import ListStream, ObjectMeta, OffsetRange, SuffixRange + from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange from obstore.store import ObjectStore as _ObjectStore from zarr.core.buffer import Buffer, BufferPrototype @@ -168,8 +168,8 @@ def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: return _transform_list(objects) def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: - objects: ListStream[list[ObjectMeta]] = obs.list_with_delimiter(self.store, prefix=prefix) - return _transform_list_dir(objects, prefix) + coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) + return _transform_list_dir(coroutine, prefix) async def _transform_list( @@ -179,34 +179,25 @@ async def _transform_list( for item in batch: yield item["path"] + # ['zarr.json', 'c/0', 'c/1'] -> ['zarr.json'] -# ['zarr.json', 'c/0', 'c/1'] -> ['zarr.json, 'c'] | ['zarr.json, 'c', 'c'] +# ['zarr.json', 'c/0', 'c/1'] -> ['zarr.json, 'c'] async def _transform_list_dir( - list_stream: obstore.ListResult , None], prefix: str + list_result_coroutine: Coroutine[Any, Any, ListResult], prefix: str ) -> AsyncGenerator[str, None]: + """ + Transform the result of list_with_delimiter into an async generator of prefixes and paths. + """ + list_result = await list_result_coroutine + # We assume that the underlying object-store implementation correctly handles the # prefix, so we don't double-check that the returned results actually start with the # given prefix. - prefixes = list_stream["common_prefixes"] - objects = [obj["path"].lstrip(prefix) for obj in list_stream["objects"]] - full = prefixes + objects - for item in full: + prefixes = list_result["common_prefixes"] + objects = [obj["path"].lstrip(prefix) for obj in list_result["objects"]] + for item in prefixes + objects: yield item - # prefix_len = len(prefix) - # async for batch in list_stream: - # returned_prefixes = [] - # for item in batch: - # # Yield this item if "/" does not exist after the prefix - # item_path = item["path"][prefix_len:] - # if "/" not in item_path: - # yield item_path - # else: - # prefix = item_path.split("/")[0] - # if prefix in returned_prefixes: - # returned_prefixes.append(prefix) - # yield prefix - class _BoundedRequest(TypedDict): """Range request with a known start and end byte. From f287780ef16867dbf9d7f75b83c002a3a711883e Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 10:00:11 -0500 Subject: [PATCH 41/67] Fix failing tests --- pyproject.toml | 2 +- src/zarr/storage/object_store.py | 12 ++++++++---- tests/test_store/test_object.py | 8 ++++---- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fd8e91e83c..a8417c1c22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"] # User extras remote = [ "fsspec>=2023.10.0", - "obstore==0.3.0", + "obstore==0.4.0b2", ] gpu = [ diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 1a68450f8b..8917480dad 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -25,7 +25,7 @@ from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange from obstore.store import ObjectStore as _ObjectStore - from zarr.core.buffer import Buffer, BufferPrototype + from zarr.core.buffer import BufferPrototype from zarr.core.common import BytesLike ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( @@ -104,7 +104,7 @@ async def get( ) return prototype.buffer.from_bytes(await resp.bytes_async()) else: - raise ValueError(f"Unexpected input to `get`: {byte_range}") + raise ValueError(f"Unexpected byte_range, got {byte_range}") except ALLOWED_EXCEPTIONS: return None @@ -129,6 +129,10 @@ def supports_writes(self) -> bool: async def set(self, key: str, value: Buffer) -> None: self._check_writable() + if not isinstance(value, Buffer): + raise TypeError( + f"ObjectStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." + ) buf = value.to_bytes() await obs.put_async(self.store, key, buf) @@ -193,8 +197,8 @@ async def _transform_list_dir( # We assume that the underlying object-store implementation correctly handles the # prefix, so we don't double-check that the returned results actually start with the # given prefix. - prefixes = list_result["common_prefixes"] - objects = [obj["path"].lstrip(prefix) for obj in list_result["objects"]] + prefixes = [obj.lstrip(prefix).lstrip("/") for obj in list_result["common_prefixes"]] + objects = [obj["path"].lstrip(prefix).lstrip("/") for obj in list_result["objects"]] for item in prefixes + objects: yield item diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 70a105c986..1200f9fb98 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -10,7 +10,7 @@ from zarr.storage.object_store import ObjectStore from zarr.testing.store import StoreTests -PATTERN = r"file://(/[\w/.-]+)" +PATTERN = r'LocalStore\("([^"]+)"\)' class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): @@ -27,13 +27,13 @@ def store(self, store_kwargs: dict[str, str | bool]) -> ObjectStore: return self.store_cls(**store_kwargs) async def get(self, store: ObjectStore, key: str) -> Buffer: - # TODO: There must be a better way to get the path to the store + # TODO: Use new store.prefix in obstore 0.4.0 store_path = re.search(PATTERN, str(store)).group(1) new_local_store = obstore.store.LocalStore(prefix=store_path) return self.buffer_cls.from_bytes(obstore.get(new_local_store, key).bytes()) async def set(self, store: ObjectStore, key: str, value: Buffer) -> None: - # TODO: There must be a better way to get the path to the store + # TODO: Use new store.prefix in obstore 0.4.0 store_path = re.search(PATTERN, str(store)).group(1) new_local_store = obstore.store.LocalStore(prefix=store_path) obstore.put(new_local_store, key, value.to_bytes()) @@ -41,7 +41,7 @@ async def set(self, store: ObjectStore, key: str, value: Buffer) -> None: def test_store_repr(self, store: ObjectStore) -> None: from fnmatch import fnmatch - pattern = "ObjectStore(object://LocalStore(file:///*))" + pattern = "ObjectStore(object://LocalStore(*))" assert fnmatch(f"{store!r}", pattern) def test_store_supports_writes(self, store: ObjectStore) -> None: From 08b7771143778116c68527a4667977fefdf7f51a Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 10:32:52 -0500 Subject: [PATCH 42/67] Make module structure consistent with other stores --- src/zarr/storage/__init__.py | 2 ++ .../storage/{object_store.py => _object.py} | 30 ++++++++++++++++--- tests/test_store/test_object.py | 2 +- 3 files changed, 29 insertions(+), 5 deletions(-) rename src/zarr/storage/{object_store.py => _object.py} (94%) diff --git a/src/zarr/storage/__init__.py b/src/zarr/storage/__init__.py index 649857f773..ba9a57fcae 100644 --- a/src/zarr/storage/__init__.py +++ b/src/zarr/storage/__init__.py @@ -8,6 +8,7 @@ from zarr.storage._local import LocalStore from zarr.storage._logging import LoggingStore from zarr.storage._memory import GpuMemoryStore, MemoryStore +from zarr.storage._object import ObjectStore from zarr.storage._wrapper import WrapperStore from zarr.storage._zip import ZipStore @@ -17,6 +18,7 @@ "LocalStore", "LoggingStore", "MemoryStore", + "ObjectStore", "StoreLike", "StorePath", "WrapperStore", diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/_object.py similarity index 94% rename from src/zarr/storage/object_store.py rename to src/zarr/storage/_object.py index 8917480dad..f5ce945df3 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/_object.py @@ -36,12 +36,19 @@ class ObjectStore(Store): - """A Zarr store that uses obstore for fast read/write from AWS, GCP, and Azure. + """A Zarr store that uses obstore for fast read/write from AWS, GCP, Azure. Parameters ---------- store : obstore.store.ObjectStore An obstore store instance that is set up with the proper credentials. + read_only : bool + Whether to open the store in read-only mode. + + Warnings + -------- + ObjectStore is experimental and subject to API changes without notice. Please + raise an issue with any comments/concerns about the store. """ store: _ObjectStore @@ -84,6 +91,7 @@ def __setstate__(self) -> None: async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer | None: + # docstring inherited try: if byte_range is None: resp = await obs.get_async(self.store, key) @@ -113,9 +121,11 @@ async def get_partial_values( prototype: BufferPrototype, key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: + # docstring inherited return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges) async def exists(self, key: str) -> bool: + # docstring inherited try: await obs.head_async(self.store, key) except FileNotFoundError: @@ -125,9 +135,11 @@ async def exists(self, key: str) -> bool: @property def supports_writes(self) -> bool: + # docstring inherited return True async def set(self, key: str, value: Buffer) -> None: + # docstring inherited self._check_writable() if not isinstance(value, Buffer): raise TypeError( @@ -137,6 +149,7 @@ async def set(self, key: str, value: Buffer) -> None: await obs.put_async(self.store, key, buf) async def set_if_not_exists(self, key: str, value: Buffer) -> None: + # docstring inherited self._check_writable() buf = value.to_bytes() with contextlib.suppress(obs.exceptions.AlreadyExistsError): @@ -144,34 +157,42 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None: @property def supports_deletes(self) -> bool: + # docstring inherited return True async def delete(self, key: str) -> None: + # docstring inherited self._check_writable() await obs.delete_async(self.store, key) @property def supports_partial_writes(self) -> bool: + # docstring inherited return False async def set_partial_values( self, key_start_values: Iterable[tuple[str, int, BytesLike]] ) -> None: + # docstring inherited raise NotImplementedError @property def supports_listing(self) -> bool: + # docstring inherited return True def list(self) -> AsyncGenerator[str, None]: + # docstring inherited objects: ListStream[list[ObjectMeta]] = obs.list(self.store) return _transform_list(objects) def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: + # docstring inherited objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) return _transform_list(objects) def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: + # docstring inherited coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) return _transform_list_dir(coroutine, prefix) @@ -179,18 +200,19 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: async def _transform_list( list_stream: AsyncGenerator[list[ObjectMeta], None], ) -> AsyncGenerator[str, None]: + """ + Transform the result of list into an async generator of paths. + """ async for batch in list_stream: for item in batch: yield item["path"] -# ['zarr.json', 'c/0', 'c/1'] -> ['zarr.json'] -# ['zarr.json', 'c/0', 'c/1'] -> ['zarr.json, 'c'] async def _transform_list_dir( list_result_coroutine: Coroutine[Any, Any, ListResult], prefix: str ) -> AsyncGenerator[str, None]: """ - Transform the result of list_with_delimiter into an async generator of prefixes and paths. + Transform the result of list_with_delimiter into an async generator of paths. """ list_result = await list_result_coroutine diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 1200f9fb98..daa0f8f66a 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -7,7 +7,7 @@ import re from zarr.core.buffer import Buffer, cpu -from zarr.storage.object_store import ObjectStore +from zarr.storage import ObjectStore from zarr.testing.store import StoreTests PATTERN = r'LocalStore\("([^"]+)"\)' From 8038b947d2540db3e59a6edba5b6e26d43dcf07f Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 10:37:00 -0500 Subject: [PATCH 43/67] Remove override of pytest parameterization --- src/zarr/testing/store.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index f798d1dd93..7d47143781 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -152,7 +152,6 @@ async def test_get(self, store: S, key: str, data: bytes, byte_range: ByteReques """ Ensure that data can be read from the store using the store.get method. """ - data = b"\x01\x02\x03\x04" data_buf = self.buffer_cls.from_bytes(data) await self.set(store, key, data_buf) observed = await store.get(key, prototype=default_buffer_prototype(), byte_range=byte_range) From b65d439dacf6f301682f1ecdbfc6cb1cfb6560de Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 5 Feb 2025 12:10:33 -0500 Subject: [PATCH 44/67] Use store prefix --- pyproject.toml | 3 +-- tests/test_store/test_object.py | 23 ++++++++++------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a8417c1c22..581d7dcbf3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,8 +64,7 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"] # User extras remote = [ "fsspec>=2023.10.0", - "obstore==0.4.0b2", - + "obstore==0.4.0b3", ] gpu = [ "cupy-cuda12x", diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index daa0f8f66a..39b2c575db 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -1,25 +1,24 @@ # ruff: noqa: E402 +import pickle +from typing import Any + import pytest obstore = pytest.importorskip("obstore") - -import pickle -import re +from obstore.store import LocalStore from zarr.core.buffer import Buffer, cpu from zarr.storage import ObjectStore from zarr.testing.store import StoreTests -PATTERN = r'LocalStore\("([^"]+)"\)' - class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): store_cls = ObjectStore buffer_cls = cpu.Buffer @pytest.fixture - def store_kwargs(self, tmpdir) -> dict[str, str | bool]: - store = obstore.store.LocalStore(prefix=tmpdir) + def store_kwargs(self, tmpdir) -> dict[str, Any]: + store = LocalStore(prefix=tmpdir) return {"store": store, "read_only": False} @pytest.fixture @@ -27,15 +26,13 @@ def store(self, store_kwargs: dict[str, str | bool]) -> ObjectStore: return self.store_cls(**store_kwargs) async def get(self, store: ObjectStore, key: str) -> Buffer: - # TODO: Use new store.prefix in obstore 0.4.0 - store_path = re.search(PATTERN, str(store)).group(1) - new_local_store = obstore.store.LocalStore(prefix=store_path) + assert isinstance(store.store, LocalStore) + new_local_store = LocalStore(prefix=store.store.prefix) return self.buffer_cls.from_bytes(obstore.get(new_local_store, key).bytes()) async def set(self, store: ObjectStore, key: str, value: Buffer) -> None: - # TODO: Use new store.prefix in obstore 0.4.0 - store_path = re.search(PATTERN, str(store)).group(1) - new_local_store = obstore.store.LocalStore(prefix=store_path) + assert isinstance(store.store, LocalStore) + new_local_store = LocalStore(prefix=store.store.prefix) obstore.put(new_local_store, key, value.to_bytes()) def test_store_repr(self, store: ObjectStore) -> None: From 1fa01259381f264232ec145e1c6b787249d503c7 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 5 Feb 2025 12:10:41 -0500 Subject: [PATCH 45/67] Remove xfail for pickle --- tests/test_store/test_object.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 39b2c575db..0f74ee19ec 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -50,7 +50,6 @@ def test_store_supports_partial_writes(self, store: ObjectStore) -> None: def test_store_supports_listing(self, store: ObjectStore) -> None: assert store.supports_listing - @pytest.mark.xfail(reason="Not Implemented") def test_serializable_store(self, store: ObjectStore) -> None: foo = pickle.dumps(store) assert pickle.loads(foo) == store From b0b9d56b6cd38e0df62baf397e8cafdbe327e0dc Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 5 Feb 2025 12:11:07 -0500 Subject: [PATCH 46/67] Restore pickle xfail --- tests/test_store/test_object.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 0f74ee19ec..39b2c575db 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -50,6 +50,7 @@ def test_store_supports_partial_writes(self, store: ObjectStore) -> None: def test_store_supports_listing(self, store: ObjectStore) -> None: assert store.supports_listing + @pytest.mark.xfail(reason="Not Implemented") def test_serializable_store(self, store: ObjectStore) -> None: foo = pickle.dumps(store) assert pickle.loads(foo) == store From aa5bdac04643e0297088434ef21ee17cac5cd491 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 14:54:09 -0500 Subject: [PATCH 47/67] Mark xfail --- src/zarr/testing/store.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 7d47143781..ef9aab2c3c 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -5,7 +5,7 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Generic, TypeVar -from zarr.storage import WrapperStore +from zarr.storage import ObjectStore, WrapperStore if TYPE_CHECKING: from typing import Any @@ -152,6 +152,10 @@ async def test_get(self, store: S, key: str, data: bytes, byte_range: ByteReques """ Ensure that data can be read from the store using the store.get method. """ + if isinstance(store, ObjectStore) and not data: + pytest.xfail( + "Obstore does not allow loading invalid ranges - see https://github.com/apache/arrow-rs/pull/6751#issuecomment-2494702657" + ) data_buf = self.buffer_cls.from_bytes(data) await self.set(store, key, data_buf) observed = await store.get(key, prototype=default_buffer_prototype(), byte_range=byte_range) From 01de03d0b3065b2c6dd46b110387bea7d84001b4 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 14:55:36 -0500 Subject: [PATCH 48/67] Mark xfails --- src/zarr/testing/store.py | 2 ++ tests/test_store/test_object.py | 6 ------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index ef9aab2c3c..a7df9874e1 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -100,6 +100,8 @@ def test_store_eq(self, store: S, store_kwargs: dict[str, Any]) -> None: assert store == store2 def test_serializable_store(self, store: S) -> None: + if isinstance(store, ObjectStore): + pytest.xfail("Serialization not implemented for ObjectStore") new_store: S = pickle.loads(pickle.dumps(store)) assert new_store == store assert new_store.read_only == store.read_only diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 39b2c575db..1701819e3d 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -1,5 +1,4 @@ # ruff: noqa: E402 -import pickle from typing import Any import pytest @@ -49,8 +48,3 @@ def test_store_supports_partial_writes(self, store: ObjectStore) -> None: def test_store_supports_listing(self, store: ObjectStore) -> None: assert store.supports_listing - - @pytest.mark.xfail(reason="Not Implemented") - def test_serializable_store(self, store: ObjectStore) -> None: - foo = pickle.dumps(store) - assert pickle.loads(foo) == store From fdd7f8eb22228420713dec97c938f630ba8e7d95 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:23:15 -0500 Subject: [PATCH 49/67] Serialization should now be supported by obstore --- src/zarr/storage/_object.py | 6 ------ src/zarr/testing/store.py | 2 -- 2 files changed, 8 deletions(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index f5ce945df3..7b6061d69d 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -82,12 +82,6 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"ObjectStore({self})" - def __getstate__(self) -> None: - raise NotImplementedError("Pickling has not been implement for ObjectStore") - - def __setstate__(self) -> None: - raise NotImplementedError("Pickling has not been implement for ObjectStore") - async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer | None: diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index a7df9874e1..ef9aab2c3c 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -100,8 +100,6 @@ def test_store_eq(self, store: S, store_kwargs: dict[str, Any]) -> None: assert store == store2 def test_serializable_store(self, store: S) -> None: - if isinstance(store, ObjectStore): - pytest.xfail("Serialization not implemented for ObjectStore") new_store: S = pickle.loads(pickle.dumps(store)) assert new_store == store assert new_store.read_only == store.read_only From 73fa4595112ec91fc8c00b3eb8733891378afef3 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:37:27 -0500 Subject: [PATCH 50/67] Update equality --- src/zarr/storage/_object.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index 7b6061d69d..a2c9633448 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -55,10 +55,7 @@ class ObjectStore(Store): """The underlying obstore instance.""" def __eq__(self, value: object) -> bool: - if not isinstance(value, ObjectStore): - return False - - return bool(self.store.__eq__(value.store)) + return isinstance(value, type(self)) and self.store == value.store def __init__(self, store: _ObjectStore, *, read_only: bool = False) -> None: if not isinstance( From 6af1e8506440636250dc9b010f82ad79bc24ae1e Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:00:03 -0500 Subject: [PATCH 51/67] Bump obstore --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 581d7dcbf3..983fc59815 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"] # User extras remote = [ "fsspec>=2023.10.0", - "obstore==0.4.0b3", + "obstore==0.4.0b4", ] gpu = [ "cupy-cuda12x", From f4bb423ff2f5eba3221db89eb849d503a7590933 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:01:37 -0500 Subject: [PATCH 52/67] Fix serialization --- src/zarr/storage/_object.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index a2c9633448..ae1db4c6ce 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -2,6 +2,7 @@ import asyncio import contextlib +import pickle from collections import defaultdict from collections.abc import Iterable from typing import TYPE_CHECKING, Any, TypedDict @@ -70,8 +71,8 @@ def __init__(self, store: _ObjectStore, *, read_only: bool = False) -> None: ), ): raise TypeError(f"expected ObjectStore class, got {store!r}") - self.store = store super().__init__(read_only=read_only) + self.store = store def __str__(self) -> str: return f"object://{self.store}" @@ -79,6 +80,13 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"ObjectStore({self})" + def __getstate__(self) -> dict: + self.__dict__.update({"store": pickle.dumps(self.store)}) + return self.__dict__.copy() + + def __setstate__(self, state: dict) -> None: + self.__dict__.update(state) + async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer | None: From 1f7df5226edee9f04bc308aacfb316c7c8f3520e Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 6 Feb 2025 12:08:21 -0500 Subject: [PATCH 53/67] Improve equality checking --- src/zarr/storage/_object.py | 57 +++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index ae1db4c6ce..565aab9a03 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -5,9 +5,17 @@ import pickle from collections import defaultdict from collections.abc import Iterable -from typing import TYPE_CHECKING, Any, TypedDict +from typing import TYPE_CHECKING, Any, TypedDict, TypeVar import obstore as obs +from obstore.store import ( + AzureStore, + GCSStore, + HTTPStore, + LocalStore, + MemoryStore, + S3Store, +) from zarr.abc.store import ( ByteRequest, @@ -35,6 +43,8 @@ NotADirectoryError, ) +S = TypeVar("S", bound=Store) + class ObjectStore(Store): """A Zarr store that uses obstore for fast read/write from AWS, GCP, Azure. @@ -56,7 +66,50 @@ class ObjectStore(Store): """The underlying obstore instance.""" def __eq__(self, value: object) -> bool: - return isinstance(value, type(self)) and self.store == value.store + if not isinstance(self.store, type(value.store)): + return False + if not self.read_only == value.read_only: + return False + + match value.store: + case AzureStore(): + if ( + (self.store.config != value.store.config) + or (self.store.client_options != value.store.client_options) + or (self.store.prefix != value.store.prefix) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case GCSStore(): + if ( + (self.store.config != value.store.config) + or (self.store.client_options != value.store.client_options) + or (self.store.prefix != value.store.prefix) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case S3Store(): + if ( + (self.store.config != value.store.config) + or (self.store.client_options != value.store.client_options) + or (self.store.prefix != value.store.prefix) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case HTTPStore(): + if ( + (self.store.url != value.store.url) + or (self.store.client_options != value.store.client_options) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case LocalStore(): + if self.store.prefix != value.store.prefix: + return False + case MemoryStore(): + if self.store is not value.store: + return False # Two memory stores can't be equal because we can't pickle memory stores + return True def __init__(self, store: _ObjectStore, *, read_only: bool = False) -> None: if not isinstance( From 12ccb499283b0e1881174e1984ddbe182758b570 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 6 Feb 2025 12:45:58 -0500 Subject: [PATCH 54/67] Update src/zarr/storage/_object.py Co-authored-by: Max Jones <14077947+maxrjones@users.noreply.github.com> --- src/zarr/storage/_object.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index 565aab9a03..0be533687d 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -133,7 +133,7 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"ObjectStore({self})" - def __getstate__(self) -> dict: + def __getstate__(self) -> dict[Any, Any]: self.__dict__.update({"store": pickle.dumps(self.store)}) return self.__dict__.copy() From 25676a39177f217297847d0fb17f47fe10474694 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 6 Feb 2025 12:46:05 -0500 Subject: [PATCH 55/67] Update src/zarr/storage/_object.py Co-authored-by: Max Jones <14077947+maxrjones@users.noreply.github.com> --- src/zarr/storage/_object.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index 0be533687d..4fb8ab8c92 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -137,7 +137,7 @@ def __getstate__(self) -> dict[Any, Any]: self.__dict__.update({"store": pickle.dumps(self.store)}) return self.__dict__.copy() - def __setstate__(self, state: dict) -> None: + def __setstate__(self, state: dict[Any, Any]) -> None: self.__dict__.update(state) async def get( From 4d15e439acd5775e257789e46f38d333678fed4f Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 6 Feb 2025 12:49:24 -0500 Subject: [PATCH 56/67] update __eq__ --- src/zarr/storage/_object.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index 4fb8ab8c92..d17cff229a 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -66,6 +66,9 @@ class ObjectStore(Store): """The underlying obstore instance.""" def __eq__(self, value: object) -> bool: + if not isinstance(value, ObjectStore): + return False + if not isinstance(self.store, type(value.store)): return False if not self.read_only == value.read_only: @@ -73,6 +76,7 @@ def __eq__(self, value: object) -> bool: match value.store: case AzureStore(): + assert isinstance(self.store, AzureStore) if ( (self.store.config != value.store.config) or (self.store.client_options != value.store.client_options) @@ -81,6 +85,7 @@ def __eq__(self, value: object) -> bool: ): return False case GCSStore(): + assert isinstance(self.store, GCSStore) if ( (self.store.config != value.store.config) or (self.store.client_options != value.store.client_options) @@ -89,6 +94,7 @@ def __eq__(self, value: object) -> bool: ): return False case S3Store(): + assert isinstance(self.store, S3Store) if ( (self.store.config != value.store.config) or (self.store.client_options != value.store.client_options) @@ -97,6 +103,7 @@ def __eq__(self, value: object) -> bool: ): return False case HTTPStore(): + assert isinstance(self.store, HTTPStore) if ( (self.store.url != value.store.url) or (self.store.client_options != value.store.client_options) @@ -104,6 +111,7 @@ def __eq__(self, value: object) -> bool: ): return False case LocalStore(): + assert isinstance(self.store, LocalStore) if self.store.prefix != value.store.prefix: return False case MemoryStore(): From 9a250578558fa39f4dab63f8f4057399f6fd642b Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:05:32 -0500 Subject: [PATCH 57/67] Fix serialization --- src/zarr/storage/_object.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index d17cff229a..c5aff2804f 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -142,10 +142,12 @@ def __repr__(self) -> str: return f"ObjectStore({self})" def __getstate__(self) -> dict[Any, Any]: - self.__dict__.update({"store": pickle.dumps(self.store)}) - return self.__dict__.copy() + state = self.__dict__.copy() + state["store"] = pickle.dumps(self.store) + return state def __setstate__(self, state: dict[Any, Any]) -> None: + state["store"] = pickle.loads(state["store"]) self.__dict__.update(state) async def get( From 6f5f960032f1f90b937a338c143908a0b0572088 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:12:51 -0500 Subject: [PATCH 58/67] Expand test coverage --- tests/test_store/test_object.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 1701819e3d..e3b2f73ca4 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -4,7 +4,7 @@ import pytest obstore = pytest.importorskip("obstore") -from obstore.store import LocalStore +from obstore.store import LocalStore, MemoryStore from zarr.core.buffer import Buffer, cpu from zarr.storage import ObjectStore @@ -43,8 +43,29 @@ def test_store_repr(self, store: ObjectStore) -> None: def test_store_supports_writes(self, store: ObjectStore) -> None: assert store.supports_writes - def test_store_supports_partial_writes(self, store: ObjectStore) -> None: + async def test_store_supports_partial_writes(self, store: ObjectStore) -> None: assert not store.supports_partial_writes + with pytest.raises(NotImplementedError): + await store.set_partial_values([("foo", 0, b"\x01\x02\x03\x04")]) def test_store_supports_listing(self, store: ObjectStore) -> None: assert store.supports_listing + + def test_store_equal(self, store: ObjectStore) -> None: + """Test store equality""" + # Test equality against a different instance type + assert store != 0 + # Test equality against a different store type + new_memory_store = ObjectStore(MemoryStore()) + assert store != new_memory_store + # Test equality against a read only store + new_local_store = ObjectStore(LocalStore(prefix=store.store.prefix), read_only=True) + assert store != new_local_store + # Test two memory stores cannot be equal + second_memory_store = ObjectStore(MemoryStore()) + assert new_memory_store != second_memory_store + + def test_store_init_raises(self) -> None: + """Test __init__ raises appropriate error for improper store type""" + with pytest.raises(TypeError): + ObjectStore("path/to/store") From 8a0125773fa51419f7433faeff1c1c72137c450b Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 7 Feb 2025 17:09:56 -0500 Subject: [PATCH 59/67] Add docs --- docs/user-guide/storage.rst | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/user-guide/storage.rst b/docs/user-guide/storage.rst index 46505271b4..bf5aca6dcc 100644 --- a/docs/user-guide/storage.rst +++ b/docs/user-guide/storage.rst @@ -99,6 +99,25 @@ Zarr data (metadata and chunks) to a dictionary.: >>> zarr.create_array(store=store, shape=(2,), dtype='float64') +Object Store +~~~~~~~~~~~~ + + +:class:`zarr.storage.ObjectStore` stores the contents of the Zarr hierarchy using any ObjectStore +`storage implementation `_, such as +AWS S3, Google Cloud Storage, and Azure Blob Storage. This store is backed by `obstore `_, which +builds on the production quality Rust library `object_store `_. + + + >>> from obstore.store import MemoryStore + >>> store = zarr.storage.ObjectStore(MemoryStore()) + >>> zarr.create_array(store=store, shape=(2,), dtype='float64') + + + +.. warning:: + The :class:`zarr.storage.ObjectStore` class is experimental. + .. _user-guide-custom-stores: Developing custom stores From cdd956ff3f70d449cd58955510aa32d4b29affb8 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 10 Feb 2025 10:43:00 -0500 Subject: [PATCH 60/67] Bump to 0.4.0 --- .pre-commit-config.yaml | 8 ++++---- pyproject.toml | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e9e3add566..f986713c9a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,8 +19,8 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - - id: check-yaml - - id: trailing-whitespace + - id: check-yaml + - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.14.1 hooks: @@ -31,10 +31,10 @@ repos: - packaging - donfig - numcodecs[crc32c] - - numpy==2.1 # until https://github.com/numpy/numpy/issues/28034 is resolved + - numpy==2.1 # until https://github.com/numpy/numpy/issues/28034 is resolved - typing_extensions - universal-pathlib - - obstore==0.3.0-beta.8 + - obstore==0.4.0 # Tests - pytest - repo: https://github.com/scientific-python/cookie diff --git a/pyproject.toml b/pyproject.toml index cca35da99a..83ce495171 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"] # User extras remote = [ "fsspec>=2023.10.0", - "obstore==0.4.0b4", + "obstore==0.4.0", ] gpu = [ "cupy-cuda12x", @@ -212,7 +212,6 @@ dependencies = [ 'universal_pathlib @ git+https://github.com/fsspec/universal_pathlib', 'typing_extensions @ git+https://github.com/python/typing_extensions', 'donfig @ git+https://github.com/pytroll/donfig', - "obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore", # test deps 'hypothesis', 'pytest', From 5cc35081a0e7ed2536646ef71ad3d1552aba26eb Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 10 Feb 2025 16:28:07 -0500 Subject: [PATCH 61/67] Satisfy mypy --- src/zarr/storage/_object.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/_object.py index c5aff2804f..9c30e6acf8 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/_object.py @@ -157,22 +157,22 @@ async def get( try: if byte_range is None: resp = await obs.get_async(self.store, key) - return prototype.buffer.from_bytes(await resp.bytes_async()) + return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] elif isinstance(byte_range, RangeByteRequest): - resp = await obs.get_range_async( + bytes = await obs.get_range_async( self.store, key, start=byte_range.start, end=byte_range.end ) - return prototype.buffer.from_bytes(memoryview(resp)) + return prototype.buffer.from_bytes(bytes) # type: ignore[arg-type] elif isinstance(byte_range, OffsetByteRequest): resp = await obs.get_async( self.store, key, options={"range": {"offset": byte_range.offset}} ) - return prototype.buffer.from_bytes(await resp.bytes_async()) + return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] elif isinstance(byte_range, SuffixByteRequest): resp = await obs.get_async( self.store, key, options={"range": {"suffix": byte_range.suffix}} ) - return prototype.buffer.from_bytes(await resp.bytes_async()) + return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] else: raise ValueError(f"Unexpected byte_range, got {byte_range}") except ALLOWED_EXCEPTIONS: @@ -260,7 +260,7 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: async def _transform_list( - list_stream: AsyncGenerator[list[ObjectMeta], None], + list_stream: ListStream[list[ObjectMeta]], ) -> AsyncGenerator[str, None]: """ Transform the result of list into an async generator of paths. @@ -271,7 +271,7 @@ async def _transform_list( async def _transform_list_dir( - list_result_coroutine: Coroutine[Any, Any, ListResult], prefix: str + list_result_coroutine: Coroutine[Any, Any, ListResult[list[ObjectMeta]]], prefix: str ) -> AsyncGenerator[str, None]: """ Transform the result of list_with_delimiter into an async generator of paths. @@ -317,7 +317,7 @@ class _OtherRequest(TypedDict): path: str """The path to request from.""" - range: OffsetRange | SuffixRange + range: OffsetRange | SuffixRange | None """The range request type.""" @@ -353,7 +353,7 @@ async def _make_bounded_requests( buffer_responses.append( { "original_request_index": request["original_request_index"], - "buffer": prototype.buffer.from_bytes(memoryview(response)), + "buffer": prototype.buffer.from_bytes(response), # type: ignore[arg-type] } ) @@ -378,7 +378,7 @@ async def _make_other_request( return [ { "original_request_index": request["original_request_index"], - "buffer": prototype.buffer.from_bytes(buffer), + "buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type] } ] From c06467cdde2f15569f751b2aa0ea435f95fa0b3f Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 10 Feb 2025 16:36:12 -0500 Subject: [PATCH 62/67] Remove re-export from `__init__.py` --- docs/user-guide/storage.rst | 8 +++++--- src/zarr/storage/__init__.py | 2 -- src/zarr/storage/{_object.py => obstore.py} | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) rename src/zarr/storage/{_object.py => obstore.py} (98%) diff --git a/docs/user-guide/storage.rst b/docs/user-guide/storage.rst index bf5aca6dcc..cf715ba4d0 100644 --- a/docs/user-guide/storage.rst +++ b/docs/user-guide/storage.rst @@ -103,20 +103,22 @@ Object Store ~~~~~~~~~~~~ -:class:`zarr.storage.ObjectStore` stores the contents of the Zarr hierarchy using any ObjectStore +:class:`zarr.storage.obstore.ObjectStore` stores the contents of the Zarr hierarchy using any ObjectStore `storage implementation `_, such as AWS S3, Google Cloud Storage, and Azure Blob Storage. This store is backed by `obstore `_, which builds on the production quality Rust library `object_store `_. + >>> from zarr.storage.obstore import ObjectStore >>> from obstore.store import MemoryStore - >>> store = zarr.storage.ObjectStore(MemoryStore()) + >>> + >>> store = ObjectStore(MemoryStore()) >>> zarr.create_array(store=store, shape=(2,), dtype='float64') .. warning:: - The :class:`zarr.storage.ObjectStore` class is experimental. + The :class:`zarr.storage.obstore.ObjectStore` class is experimental. .. _user-guide-custom-stores: diff --git a/src/zarr/storage/__init__.py b/src/zarr/storage/__init__.py index ba9a57fcae..649857f773 100644 --- a/src/zarr/storage/__init__.py +++ b/src/zarr/storage/__init__.py @@ -8,7 +8,6 @@ from zarr.storage._local import LocalStore from zarr.storage._logging import LoggingStore from zarr.storage._memory import GpuMemoryStore, MemoryStore -from zarr.storage._object import ObjectStore from zarr.storage._wrapper import WrapperStore from zarr.storage._zip import ZipStore @@ -18,7 +17,6 @@ "LocalStore", "LoggingStore", "MemoryStore", - "ObjectStore", "StoreLike", "StorePath", "WrapperStore", diff --git a/src/zarr/storage/_object.py b/src/zarr/storage/obstore.py similarity index 98% rename from src/zarr/storage/_object.py rename to src/zarr/storage/obstore.py index 9c30e6acf8..2f4986f147 100644 --- a/src/zarr/storage/_object.py +++ b/src/zarr/storage/obstore.py @@ -5,7 +5,7 @@ import pickle from collections import defaultdict from collections.abc import Iterable -from typing import TYPE_CHECKING, Any, TypedDict, TypeVar +from typing import TYPE_CHECKING, Any, TypedDict import obstore as obs from obstore.store import ( @@ -37,14 +37,14 @@ from zarr.core.buffer import BufferPrototype from zarr.core.common import BytesLike -ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( +__all__ = ["ObjectStore"] + +_ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( FileNotFoundError, IsADirectoryError, NotADirectoryError, ) -S = TypeVar("S", bound=Store) - class ObjectStore(Store): """A Zarr store that uses obstore for fast read/write from AWS, GCP, Azure. @@ -175,7 +175,7 @@ async def get( return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] else: raise ValueError(f"Unexpected byte_range, got {byte_range}") - except ALLOWED_EXCEPTIONS: + except _ALLOWED_EXCEPTIONS: return None async def get_partial_values( From 9a2ae396f4c9c936fc96b1d5060fa5fe4630b802 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 10 Feb 2025 16:40:55 -0500 Subject: [PATCH 63/67] fix imports --- src/zarr/testing/store.py | 3 ++- tests/test_store/test_object.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index f9ac37c216..c6e6e2a82c 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -5,7 +5,8 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Generic, TypeVar -from zarr.storage import ObjectStore, WrapperStore +from zarr.storage import WrapperStore +from zarr.storage.obstore import ObjectStore if TYPE_CHECKING: from typing import Any diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index e3b2f73ca4..f00ee11229 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -7,7 +7,7 @@ from obstore.store import LocalStore, MemoryStore from zarr.core.buffer import Buffer, cpu -from zarr.storage import ObjectStore +from zarr.storage.obstore import ObjectStore from zarr.testing.store import StoreTests From b8bb0c34c3262076a0f698c296f674f5cd27eb79 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 10 Feb 2025 16:49:22 -0500 Subject: [PATCH 64/67] Add obstore to upstream env --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 83ce495171..77fc733fbf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -212,6 +212,7 @@ dependencies = [ 'universal_pathlib @ git+https://github.com/fsspec/universal_pathlib', 'typing_extensions @ git+https://github.com/python/typing_extensions', 'donfig @ git+https://github.com/pytroll/donfig', + 'obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore # test deps 'hypothesis', 'pytest', From 2978ce7780ed426474f48a287b174dad06fbd5a6 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 10 Feb 2025 16:50:18 -0500 Subject: [PATCH 65/67] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 77fc733fbf..b5cc30e4b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -212,7 +212,7 @@ dependencies = [ 'universal_pathlib @ git+https://github.com/fsspec/universal_pathlib', 'typing_extensions @ git+https://github.com/python/typing_extensions', 'donfig @ git+https://github.com/pytroll/donfig', - 'obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore + 'obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore', # test deps 'hypothesis', 'pytest', From 44654021579944c6ea51896173bca8e2196df761 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 12 Feb 2025 16:29:12 -0500 Subject: [PATCH 66/67] Import obstore inside class --- docs/user-guide/storage.rst | 6 +-- src/zarr/storage/__init__.py | 2 + src/zarr/storage/{obstore.py => _obstore.py} | 54 ++++++++++++++------ src/zarr/testing/store.py | 3 +- tests/test_store/test_object.py | 2 +- 5 files changed, 45 insertions(+), 22 deletions(-) rename src/zarr/storage/{obstore.py => _obstore.py} (94%) diff --git a/docs/user-guide/storage.rst b/docs/user-guide/storage.rst index cf715ba4d0..ef0f12e7ff 100644 --- a/docs/user-guide/storage.rst +++ b/docs/user-guide/storage.rst @@ -103,13 +103,13 @@ Object Store ~~~~~~~~~~~~ -:class:`zarr.storage.obstore.ObjectStore` stores the contents of the Zarr hierarchy using any ObjectStore +:class:`zarr.storage.ObjectStore` stores the contents of the Zarr hierarchy using any ObjectStore `storage implementation `_, such as AWS S3, Google Cloud Storage, and Azure Blob Storage. This store is backed by `obstore `_, which builds on the production quality Rust library `object_store `_. - >>> from zarr.storage.obstore import ObjectStore + >>> from zarr.storage import ObjectStore >>> from obstore.store import MemoryStore >>> >>> store = ObjectStore(MemoryStore()) @@ -118,7 +118,7 @@ builds on the production quality Rust library `object_store bool: + from obstore.store import ( + AzureStore, + GCSStore, + HTTPStore, + LocalStore, + MemoryStore, + S3Store, + ) + if not isinstance(value, ObjectStore): return False @@ -119,7 +118,11 @@ def __eq__(self, value: object) -> bool: return False # Two memory stores can't be equal because we can't pickle memory stores return True - def __init__(self, store: _ObjectStore, *, read_only: bool = False) -> None: + def __init__(self, store: _UpstreamObjectStore, *, read_only: bool = False) -> None: + import obstore as obs + + self.obs = obs + if not isinstance( store, ( @@ -154,6 +157,8 @@ async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer | None: # docstring inherited + import obstore as obs + try: if byte_range is None: resp = await obs.get_async(self.store, key) @@ -188,6 +193,8 @@ async def get_partial_values( async def exists(self, key: str) -> bool: # docstring inherited + import obstore as obs + try: await obs.head_async(self.store, key) except FileNotFoundError: @@ -202,6 +209,8 @@ def supports_writes(self) -> bool: async def set(self, key: str, value: Buffer) -> None: # docstring inherited + import obstore as obs + self._check_writable() if not isinstance(value, Buffer): raise TypeError( @@ -212,6 +221,8 @@ async def set(self, key: str, value: Buffer) -> None: async def set_if_not_exists(self, key: str, value: Buffer) -> None: # docstring inherited + import obstore as obs + self._check_writable() buf = value.to_bytes() with contextlib.suppress(obs.exceptions.AlreadyExistsError): @@ -224,6 +235,8 @@ def supports_deletes(self) -> bool: async def delete(self, key: str) -> None: # docstring inherited + import obstore as obs + self._check_writable() await obs.delete_async(self.store, key) @@ -245,16 +258,22 @@ def supports_listing(self) -> bool: def list(self) -> AsyncGenerator[str, None]: # docstring inherited + import obstore as obs + objects: ListStream[list[ObjectMeta]] = obs.list(self.store) return _transform_list(objects) def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited + import obstore as obs + objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) return _transform_list(objects) def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited + import obstore as obs + coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) return _transform_list_dir(coroutine, prefix) @@ -332,7 +351,7 @@ class _Response(TypedDict): async def _make_bounded_requests( - store: obs.store.ObjectStore, + store: _UpstreamObjectStore, path: str, requests: list[_BoundedRequest], prototype: BufferPrototype, @@ -343,6 +362,7 @@ async def _make_bounded_requests( within a single file, and will e.g. merge concurrent requests. This only uses one single Python coroutine. """ + import obstore as obs starts = [r["start"] for r in requests] ends = [r["end"] for r in requests] @@ -361,7 +381,7 @@ async def _make_bounded_requests( async def _make_other_request( - store: obs.store.ObjectStore, + store: _UpstreamObjectStore, request: _OtherRequest, prototype: BufferPrototype, ) -> list[_Response]: @@ -370,6 +390,8 @@ async def _make_other_request( We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all futures can be gathered together. """ + import obstore as obs + if request["range"] is None: resp = await obs.get_async(store, request["path"]) else: @@ -384,7 +406,7 @@ async def _make_other_request( async def _get_partial_values( - store: obs.store.ObjectStore, + store: _UpstreamObjectStore, prototype: BufferPrototype, key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index c6e6e2a82c..f9ac37c216 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -5,8 +5,7 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Generic, TypeVar -from zarr.storage import WrapperStore -from zarr.storage.obstore import ObjectStore +from zarr.storage import ObjectStore, WrapperStore if TYPE_CHECKING: from typing import Any diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index f00ee11229..e3b2f73ca4 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -7,7 +7,7 @@ from obstore.store import LocalStore, MemoryStore from zarr.core.buffer import Buffer, cpu -from zarr.storage.obstore import ObjectStore +from zarr.storage import ObjectStore from zarr.testing.store import StoreTests From 6599d105063090f8e87e3cb5460edb13b14112a4 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 13 Feb 2025 10:19:50 -0500 Subject: [PATCH 67/67] remove typo --- src/zarr/storage/_obstore.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py index f8ee996e0b..b2e34b1fd0 100644 --- a/src/zarr/storage/_obstore.py +++ b/src/zarr/storage/_obstore.py @@ -121,8 +121,6 @@ def __eq__(self, value: object) -> bool: def __init__(self, store: _UpstreamObjectStore, *, read_only: bool = False) -> None: import obstore as obs - self.obs = obs - if not isinstance( store, (