diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 94d5342486..f986713c9a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,9 +8,9 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.9.4 hooks: - - id: ruff - args: ["--fix", "--show-fixes"] - - id: ruff-format + - id: ruff + args: ["--fix", "--show-fixes"] + - id: ruff-format - repo: https://github.com/codespell-project/codespell rev: v2.4.1 hooks: @@ -19,8 +19,8 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - - id: check-yaml - - id: trailing-whitespace + - id: check-yaml + - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.14.1 hooks: @@ -31,9 +31,10 @@ repos: - packaging - donfig - numcodecs[crc32c] - - numpy==2.1 # until https://github.com/numpy/numpy/issues/28034 is resolved + - numpy==2.1 # until https://github.com/numpy/numpy/issues/28034 is resolved - typing_extensions - universal-pathlib + - obstore==0.4.0 # Tests - pytest - repo: https://github.com/scientific-python/cookie diff --git a/docs/user-guide/storage.rst b/docs/user-guide/storage.rst index 46505271b4..ef0f12e7ff 100644 --- a/docs/user-guide/storage.rst +++ b/docs/user-guide/storage.rst @@ -99,6 +99,27 @@ Zarr data (metadata and chunks) to a dictionary.: >>> zarr.create_array(store=store, shape=(2,), dtype='float64') +Object Store +~~~~~~~~~~~~ + + +:class:`zarr.storage.ObjectStore` stores the contents of the Zarr hierarchy using any ObjectStore +`storage implementation `_, such as +AWS S3, Google Cloud Storage, and Azure Blob Storage. This store is backed by `obstore `_, which +builds on the production quality Rust library `object_store `_. + + + >>> from zarr.storage import ObjectStore + >>> from obstore.store import MemoryStore + >>> + >>> store = ObjectStore(MemoryStore()) + >>> zarr.create_array(store=store, shape=(2,), dtype='float64') + + + +.. warning:: + The :class:`zarr.storage.ObjectStore` class is experimental. + .. _user-guide-custom-stores: Developing custom stores diff --git a/pyproject.toml b/pyproject.toml index ab285ff7ff..b5cc30e4b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"] # User extras remote = [ "fsspec>=2023.10.0", + "obstore==0.4.0", ] gpu = [ "cupy-cuda12x", @@ -211,6 +212,7 @@ dependencies = [ 'universal_pathlib @ git+https://github.com/fsspec/universal_pathlib', 'typing_extensions @ git+https://github.com/python/typing_extensions', 'donfig @ git+https://github.com/pytroll/donfig', + 'obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore', # test deps 'hypothesis', 'pytest', diff --git a/src/zarr/storage/__init__.py b/src/zarr/storage/__init__.py index 649857f773..6721139375 100644 --- a/src/zarr/storage/__init__.py +++ b/src/zarr/storage/__init__.py @@ -8,6 +8,7 @@ from zarr.storage._local import LocalStore from zarr.storage._logging import LoggingStore from zarr.storage._memory import GpuMemoryStore, MemoryStore +from zarr.storage._obstore import ObjectStore from zarr.storage._wrapper import WrapperStore from zarr.storage._zip import ZipStore @@ -17,6 +18,7 @@ "LocalStore", "LoggingStore", "MemoryStore", + "ObjectStore", "StoreLike", "StorePath", "WrapperStore", diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py new file mode 100644 index 0000000000..b2e34b1fd0 --- /dev/null +++ b/src/zarr/storage/_obstore.py @@ -0,0 +1,473 @@ +from __future__ import annotations + +import asyncio +import contextlib +import pickle +from collections import defaultdict +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any, TypedDict + +from zarr.abc.store import ( + ByteRequest, + OffsetByteRequest, + RangeByteRequest, + Store, + SuffixByteRequest, +) +from zarr.core.buffer import Buffer +from zarr.core.buffer.core import BufferPrototype + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Coroutine, Iterable + from typing import Any + + from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange + from obstore.store import ObjectStore as _UpstreamObjectStore + + from zarr.core.buffer import BufferPrototype + from zarr.core.common import BytesLike + +__all__ = ["ObjectStore"] + +_ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( + FileNotFoundError, + IsADirectoryError, + NotADirectoryError, +) + + +class ObjectStore(Store): + """A Zarr store that uses obstore for fast read/write from AWS, GCP, Azure. + + Parameters + ---------- + store : obstore.store.ObjectStore + An obstore store instance that is set up with the proper credentials. + read_only : bool + Whether to open the store in read-only mode. + + Warnings + -------- + ObjectStore is experimental and subject to API changes without notice. Please + raise an issue with any comments/concerns about the store. + """ + + store: _UpstreamObjectStore + """The underlying obstore instance.""" + + def __eq__(self, value: object) -> bool: + from obstore.store import ( + AzureStore, + GCSStore, + HTTPStore, + LocalStore, + MemoryStore, + S3Store, + ) + + if not isinstance(value, ObjectStore): + return False + + if not isinstance(self.store, type(value.store)): + return False + if not self.read_only == value.read_only: + return False + + match value.store: + case AzureStore(): + assert isinstance(self.store, AzureStore) + if ( + (self.store.config != value.store.config) + or (self.store.client_options != value.store.client_options) + or (self.store.prefix != value.store.prefix) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case GCSStore(): + assert isinstance(self.store, GCSStore) + if ( + (self.store.config != value.store.config) + or (self.store.client_options != value.store.client_options) + or (self.store.prefix != value.store.prefix) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case S3Store(): + assert isinstance(self.store, S3Store) + if ( + (self.store.config != value.store.config) + or (self.store.client_options != value.store.client_options) + or (self.store.prefix != value.store.prefix) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case HTTPStore(): + assert isinstance(self.store, HTTPStore) + if ( + (self.store.url != value.store.url) + or (self.store.client_options != value.store.client_options) + or (self.store.retry_config != value.store.retry_config) + ): + return False + case LocalStore(): + assert isinstance(self.store, LocalStore) + if self.store.prefix != value.store.prefix: + return False + case MemoryStore(): + if self.store is not value.store: + return False # Two memory stores can't be equal because we can't pickle memory stores + return True + + def __init__(self, store: _UpstreamObjectStore, *, read_only: bool = False) -> None: + import obstore as obs + + if not isinstance( + store, + ( + obs.store.AzureStore, + obs.store.GCSStore, + obs.store.HTTPStore, + obs.store.S3Store, + obs.store.LocalStore, + obs.store.MemoryStore, + ), + ): + raise TypeError(f"expected ObjectStore class, got {store!r}") + super().__init__(read_only=read_only) + self.store = store + + def __str__(self) -> str: + return f"object://{self.store}" + + def __repr__(self) -> str: + return f"ObjectStore({self})" + + def __getstate__(self) -> dict[Any, Any]: + state = self.__dict__.copy() + state["store"] = pickle.dumps(self.store) + return state + + def __setstate__(self, state: dict[Any, Any]) -> None: + state["store"] = pickle.loads(state["store"]) + self.__dict__.update(state) + + async def get( + self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None + ) -> Buffer | None: + # docstring inherited + import obstore as obs + + try: + if byte_range is None: + resp = await obs.get_async(self.store, key) + return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] + elif isinstance(byte_range, RangeByteRequest): + bytes = await obs.get_range_async( + self.store, key, start=byte_range.start, end=byte_range.end + ) + return prototype.buffer.from_bytes(bytes) # type: ignore[arg-type] + elif isinstance(byte_range, OffsetByteRequest): + resp = await obs.get_async( + self.store, key, options={"range": {"offset": byte_range.offset}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] + elif isinstance(byte_range, SuffixByteRequest): + resp = await obs.get_async( + self.store, key, options={"range": {"suffix": byte_range.suffix}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] + else: + raise ValueError(f"Unexpected byte_range, got {byte_range}") + except _ALLOWED_EXCEPTIONS: + return None + + async def get_partial_values( + self, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRequest | None]], + ) -> list[Buffer | None]: + # docstring inherited + return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges) + + async def exists(self, key: str) -> bool: + # docstring inherited + import obstore as obs + + try: + await obs.head_async(self.store, key) + except FileNotFoundError: + return False + else: + return True + + @property + def supports_writes(self) -> bool: + # docstring inherited + return True + + async def set(self, key: str, value: Buffer) -> None: + # docstring inherited + import obstore as obs + + self._check_writable() + if not isinstance(value, Buffer): + raise TypeError( + f"ObjectStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." + ) + buf = value.to_bytes() + await obs.put_async(self.store, key, buf) + + async def set_if_not_exists(self, key: str, value: Buffer) -> None: + # docstring inherited + import obstore as obs + + self._check_writable() + buf = value.to_bytes() + with contextlib.suppress(obs.exceptions.AlreadyExistsError): + await obs.put_async(self.store, key, buf, mode="create") + + @property + def supports_deletes(self) -> bool: + # docstring inherited + return True + + async def delete(self, key: str) -> None: + # docstring inherited + import obstore as obs + + self._check_writable() + await obs.delete_async(self.store, key) + + @property + def supports_partial_writes(self) -> bool: + # docstring inherited + return False + + async def set_partial_values( + self, key_start_values: Iterable[tuple[str, int, BytesLike]] + ) -> None: + # docstring inherited + raise NotImplementedError + + @property + def supports_listing(self) -> bool: + # docstring inherited + return True + + def list(self) -> AsyncGenerator[str, None]: + # docstring inherited + import obstore as obs + + objects: ListStream[list[ObjectMeta]] = obs.list(self.store) + return _transform_list(objects) + + def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: + # docstring inherited + import obstore as obs + + objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) + return _transform_list(objects) + + def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: + # docstring inherited + import obstore as obs + + coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) + return _transform_list_dir(coroutine, prefix) + + +async def _transform_list( + list_stream: ListStream[list[ObjectMeta]], +) -> AsyncGenerator[str, None]: + """ + Transform the result of list into an async generator of paths. + """ + async for batch in list_stream: + for item in batch: + yield item["path"] + + +async def _transform_list_dir( + list_result_coroutine: Coroutine[Any, Any, ListResult[list[ObjectMeta]]], prefix: str +) -> AsyncGenerator[str, None]: + """ + Transform the result of list_with_delimiter into an async generator of paths. + """ + list_result = await list_result_coroutine + + # We assume that the underlying object-store implementation correctly handles the + # prefix, so we don't double-check that the returned results actually start with the + # given prefix. + prefixes = [obj.lstrip(prefix).lstrip("/") for obj in list_result["common_prefixes"]] + objects = [obj["path"].lstrip(prefix).lstrip("/") for obj in list_result["objects"]] + for item in prefixes + objects: + yield item + + +class _BoundedRequest(TypedDict): + """Range request with a known start and end byte. + + These requests can be multiplexed natively on the Rust side with + `obstore.get_ranges_async`. + """ + + original_request_index: int + """The positional index in the original key_ranges input""" + + start: int + """Start byte offset.""" + + end: int + """End byte offset.""" + + +class _OtherRequest(TypedDict): + """Offset or suffix range requests. + + These requests cannot be concurrent on the Rust side, and each need their own call + to `obstore.get_async`, passing in the `range` parameter. + """ + + original_request_index: int + """The positional index in the original key_ranges input""" + + path: str + """The path to request from.""" + + range: OffsetRange | SuffixRange | None + """The range request type.""" + + +class _Response(TypedDict): + """A response buffer associated with the original index that it should be restored to.""" + + original_request_index: int + """The positional index in the original key_ranges input""" + + buffer: Buffer + """The buffer returned from obstore's range request.""" + + +async def _make_bounded_requests( + store: _UpstreamObjectStore, + path: str, + requests: list[_BoundedRequest], + prototype: BufferPrototype, +) -> list[_Response]: + """Make all bounded requests for a specific file. + + `obstore.get_ranges_async` allows for making concurrent requests for multiple ranges + within a single file, and will e.g. merge concurrent requests. This only uses one + single Python coroutine. + """ + import obstore as obs + + starts = [r["start"] for r in requests] + ends = [r["end"] for r in requests] + responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) + + buffer_responses: list[_Response] = [] + for request, response in zip(requests, responses, strict=True): + buffer_responses.append( + { + "original_request_index": request["original_request_index"], + "buffer": prototype.buffer.from_bytes(response), # type: ignore[arg-type] + } + ) + + return buffer_responses + + +async def _make_other_request( + store: _UpstreamObjectStore, + request: _OtherRequest, + prototype: BufferPrototype, +) -> list[_Response]: + """Make suffix or offset requests. + + We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all + futures can be gathered together. + """ + import obstore as obs + + if request["range"] is None: + resp = await obs.get_async(store, request["path"]) + else: + resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) + buffer = await resp.bytes_async() + return [ + { + "original_request_index": request["original_request_index"], + "buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type] + } + ] + + +async def _get_partial_values( + store: _UpstreamObjectStore, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRequest | None]], +) -> list[Buffer | None]: + """Make multiple range requests. + + ObjectStore has a `get_ranges` method that will additionally merge nearby ranges, + but it's _per_ file. So we need to split these key_ranges into **per-file** key + ranges, and then reassemble the results in the original order. + + We separate into different requests: + + - One call to `obstore.get_ranges_async` **per target file** + - One call to `obstore.get_async` for each other request. + """ + key_ranges = list(key_ranges) + per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list) + other_requests: list[_OtherRequest] = [] + + for idx, (path, byte_range) in enumerate(key_ranges): + if byte_range is None: + other_requests.append( + { + "original_request_index": idx, + "path": path, + "range": None, + } + ) + elif isinstance(byte_range, RangeByteRequest): + per_file_bounded_requests[path].append( + {"original_request_index": idx, "start": byte_range.start, "end": byte_range.end} + ) + elif isinstance(byte_range, OffsetByteRequest): + other_requests.append( + { + "original_request_index": idx, + "path": path, + "range": {"offset": byte_range.offset}, + } + ) + elif isinstance(byte_range, SuffixByteRequest): + other_requests.append( + { + "original_request_index": idx, + "path": path, + "range": {"suffix": byte_range.suffix}, + } + ) + else: + raise ValueError(f"Unsupported range input: {byte_range}") + + futs: list[Coroutine[Any, Any, list[_Response]]] = [] + for path, bounded_ranges in per_file_bounded_requests.items(): + futs.append(_make_bounded_requests(store, path, bounded_ranges, prototype)) + + for request in other_requests: + futs.append(_make_other_request(store, request, prototype)) # noqa: PERF401 + + buffers: list[Buffer | None] = [None] * len(key_ranges) + + # TODO: this gather a list of list of Response; not sure if there's a way to + # unpack these lists inside of an `asyncio.gather`? + for responses in await asyncio.gather(*futs): + for resp in responses: + buffers[resp["original_request_index"]] = resp["buffer"] + + return buffers diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 112f6261e9..f9ac37c216 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -5,7 +5,7 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Generic, TypeVar -from zarr.storage import WrapperStore +from zarr.storage import ObjectStore, WrapperStore if TYPE_CHECKING: from typing import Any @@ -42,7 +42,7 @@ class StoreTests(Generic[S, B]): async def set(self, store: S, key: str, value: Buffer) -> None: """ Insert a value into a storage backend, with a specific key. - This should not not use any store methods. Bypassing the store methods allows them to be + This should not use any store methods. Bypassing the store methods allows them to be tested. """ ... @@ -51,7 +51,7 @@ async def set(self, store: S, key: str, value: Buffer) -> None: async def get(self, store: S, key: str) -> Buffer: """ Retrieve a value from a storage backend, by key. - This should not not use any store methods. Bypassing the store methods allows them to be + This should not use any store methods. Bypassing the store methods allows them to be tested. """ ... @@ -158,6 +158,10 @@ async def test_get(self, store: S, key: str, data: bytes, byte_range: ByteReques """ Ensure that data can be read from the store using the store.get method. """ + if isinstance(store, ObjectStore) and not data: + pytest.xfail( + "Obstore does not allow loading invalid ranges - see https://github.com/apache/arrow-rs/pull/6751#issuecomment-2494702657" + ) data_buf = self.buffer_cls.from_bytes(data) await self.set(store, key, data_buf) observed = await store.get(key, prototype=default_buffer_prototype(), byte_range=byte_range) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py new file mode 100644 index 0000000000..e3b2f73ca4 --- /dev/null +++ b/tests/test_store/test_object.py @@ -0,0 +1,71 @@ +# ruff: noqa: E402 +from typing import Any + +import pytest + +obstore = pytest.importorskip("obstore") +from obstore.store import LocalStore, MemoryStore + +from zarr.core.buffer import Buffer, cpu +from zarr.storage import ObjectStore +from zarr.testing.store import StoreTests + + +class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): + store_cls = ObjectStore + buffer_cls = cpu.Buffer + + @pytest.fixture + def store_kwargs(self, tmpdir) -> dict[str, Any]: + store = LocalStore(prefix=tmpdir) + return {"store": store, "read_only": False} + + @pytest.fixture + def store(self, store_kwargs: dict[str, str | bool]) -> ObjectStore: + return self.store_cls(**store_kwargs) + + async def get(self, store: ObjectStore, key: str) -> Buffer: + assert isinstance(store.store, LocalStore) + new_local_store = LocalStore(prefix=store.store.prefix) + return self.buffer_cls.from_bytes(obstore.get(new_local_store, key).bytes()) + + async def set(self, store: ObjectStore, key: str, value: Buffer) -> None: + assert isinstance(store.store, LocalStore) + new_local_store = LocalStore(prefix=store.store.prefix) + obstore.put(new_local_store, key, value.to_bytes()) + + def test_store_repr(self, store: ObjectStore) -> None: + from fnmatch import fnmatch + + pattern = "ObjectStore(object://LocalStore(*))" + assert fnmatch(f"{store!r}", pattern) + + def test_store_supports_writes(self, store: ObjectStore) -> None: + assert store.supports_writes + + async def test_store_supports_partial_writes(self, store: ObjectStore) -> None: + assert not store.supports_partial_writes + with pytest.raises(NotImplementedError): + await store.set_partial_values([("foo", 0, b"\x01\x02\x03\x04")]) + + def test_store_supports_listing(self, store: ObjectStore) -> None: + assert store.supports_listing + + def test_store_equal(self, store: ObjectStore) -> None: + """Test store equality""" + # Test equality against a different instance type + assert store != 0 + # Test equality against a different store type + new_memory_store = ObjectStore(MemoryStore()) + assert store != new_memory_store + # Test equality against a read only store + new_local_store = ObjectStore(LocalStore(prefix=store.store.prefix), read_only=True) + assert store != new_local_store + # Test two memory stores cannot be equal + second_memory_store = ObjectStore(MemoryStore()) + assert new_memory_store != second_memory_store + + def test_store_init_raises(self) -> None: + """Test __init__ raises appropriate error for improper store type""" + with pytest.raises(TypeError): + ObjectStore("path/to/store")