Skip to content

Commit

Permalink
feat: Add get_opts and get_ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Feb 8, 2024
1 parent 8dd8e9b commit afb7cfd
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 3 deletions.
62 changes: 61 additions & 1 deletion object-store/python/object_store/_internal.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List, Self, Tuple

if TYPE_CHECKING:
import pyarrow.fs as fs
Expand Down Expand Up @@ -32,6 +32,58 @@ class ListResult:
def objects(self) -> list[ObjectMeta]:
"""Object metadata for the listing"""

class GetRange:
"""Request only a portion of an object's bytes."""

@classmethod
def from_bounded(cls, start: int, length: int) -> Self:
"""Request a specific range of bytes
If the given range is zero-length or starts after the end of the object, an
error will be returned. Additionally, if the range ends after the end of the
object, the entire remainder of the object will be returned. Otherwise, the
exact requested range will be returned.
"""
@classmethod
def from_offset(cls, offset: int) -> Self:
"""Request all bytes starting from a given byte offset."""
@classmethod
def from_suffix(cls, suffix: int) -> Self:
"""Request up to the last n bytes."""

class GetOptions:
"""Options for a get request, such as range."""

@property
def if_match(self) -> str | None:
"""
Request will succeed if the ObjectMeta::e_tag matches otherwise returning Error::Precondition
See https://datatracker.ietf.org/doc/html/rfc9110#name-if-match
"""
@property
def if_none_match(self) -> str | None:
"""
Request will succeed if the ObjectMeta::e_tag does not match otherwise returning Error::NotModified
See https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2
"""
@property
def range(self) -> GetRange | None:
"""Request transfer of only the specified range of bytes otherwise returning Error::NotModified
https://datatracker.ietf.org/doc/html/rfc9110#name-range
"""
@property
def version(self) -> str | None:
"""Request a particular object version."""
@property
def head(self) -> bool:
"""Request transfer of no content
https://datatracker.ietf.org/doc/html/rfc9110#name-head
"""

class ClientOptions:
"""HTTP client configuration for remote object stores"""

Expand Down Expand Up @@ -133,10 +185,18 @@ class ObjectStore:
"""Return the bytes that are stored at the specified location."""
async def get_async(self, location: Path) -> bytes:
"""Return the bytes that are stored at the specified location."""
def get_opts(self, location: Path, options: GetOptions) -> bytes:
"""Return the bytes that are stored at the specified location."""
async def get_opts_async(self, location: Path, options: GetOptions) -> bytes:
"""Return the bytes that are stored at the specified location."""
def get_range(self, location: Path, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range."""
async def get_range_async(self, location: Path, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range."""
def get_ranges(self, location: Path, ranges: List[Tuple[int, int]]) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range."""
async def get_ranges_async(self, location: Path, ranges: List[Tuple[int, int]]) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range."""
def put(self, location: Path, bytes: bytes) -> None:
"""Save the provided bytes to the specified location."""
async def put_async(self, location: Path, bytes: bytes) -> None:
Expand Down
154 changes: 152 additions & 2 deletions object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ use crate::utils::{flatten_list_stream, get_bytes};

use object_store::path::{Error as PathError, Path};
use object_store::{
BackoffConfig, ClientOptions, DynObjectStore, Error as InnerObjectStoreError, ListResult,
ObjectMeta, RetryConfig,
BackoffConfig, ClientOptions, DynObjectStore, Error as InnerObjectStoreError, GetOptions,
GetRange, ListResult, ObjectMeta, RetryConfig,
};
use pyo3::exceptions::{
PyException, PyFileExistsError, PyFileNotFoundError, PyNotImplementedError,
};
use pyo3::prelude::*;
use pyo3::types::PyType;
use pyo3::PyErr;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -219,6 +220,67 @@ impl From<ListResult> for PyListResult {
}
}

#[pyclass(name = "GetRange")]
#[derive(Debug, Clone)]
pub struct PyGetRange {
range: GetRange,
}

#[pymethods]
impl PyGetRange {
#[classmethod]
pub fn from_bounded(_cls: &PyType, start: usize, length: usize) -> Self {
Self {
range: GetRange::Bounded(start..start + length),
}
}

#[classmethod]
pub fn from_offset(_cls: &PyType, offset: usize) -> Self {
Self {
range: GetRange::Offset(offset),
}
}

#[classmethod]
pub fn from_suffix(_cls: &PyType, suffix: usize) -> Self {
Self {
range: GetRange::Suffix(suffix),
}
}
}

#[pyclass(name = "GetOptions")]
#[derive(Debug, Clone, Default)]
pub struct PyGetOptions {
#[pyo3(get, set)]
pub if_match: Option<String>,
#[pyo3(get, set)]
pub if_none_match: Option<String>,
// chrono conversions not supported with abi3 feature
// pub if_modified_since: Option<DateTime<Utc>>,
// pub if_unmodified_since: Option<DateTime<Utc>>,
#[pyo3(get, set)]
pub range: Option<PyGetRange>,
#[pyo3(get, set)]
pub version: Option<String>,
#[pyo3(get, set)]
pub head: bool,
}

impl PyGetOptions {
fn get_options(&self) -> GetOptions {
GetOptions {
if_match: self.if_match.clone(),
if_none_match: self.if_none_match.clone(),
range: self.range.as_ref().map(|py_range| py_range.range.clone()),
version: self.version.clone(),
head: self.head,
..Default::default()
}
}
}

#[pyclass(name = "ClientOptions")]
#[derive(Debug, Clone, Default)]
pub struct PyClientOptions {
Expand Down Expand Up @@ -555,6 +617,41 @@ impl PyObjectStore {
})
}

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location, options)")]
fn get_opts(&self, location: PyPath, options: PyGetOptions) -> PyResult<Cow<[u8]>> {
let get_result = self
.rt
.block_on(self.inner.get_opts(&location.into(), options.get_options()))
.map_err(ObjectStoreError::from)?;
let obj = self
.rt
.block_on(get_result.bytes())
.map_err(ObjectStoreError::from)?;
Ok(Cow::Owned(obj.to_vec()))
}

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location, options)")]
fn get_opts_async<'a>(
&'a self,
py: Python<'a>,
location: PyPath,
options: PyGetOptions,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let obj = inner
.get_opts(&location.into(), options.get_options())
.await
.map_err(ObjectStoreError::from)?
.bytes()
.await
.map_err(ObjectStoreError::from)?;
Ok(Cow::<[u8]>::Owned(obj.to_vec()))
})
}

/// Return the bytes that are stored at the specified location in the given byte range
#[pyo3(text_signature = "($self, location, start, length)")]
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Cow<[u8]>> {
Expand Down Expand Up @@ -593,6 +690,59 @@ impl PyObjectStore {
})
}

/// Return the bytes that are stored at the specified location in the given byte ranges
#[pyo3(text_signature = "($self, location, ranges)")]
fn get_ranges(
&self,
location: PyPath,
ranges: Vec<(usize, usize)>,
) -> PyResult<Vec<Cow<[u8]>>> {
let ranges = ranges
.into_iter()
.map(|(start, length)| std::ops::Range {
start,
end: start + length,
})
.collect::<Vec<_>>();
let obj = self
.rt
.block_on(self.inner.get_ranges(&location.into(), &ranges))
.map_err(ObjectStoreError::from)?;
Ok(obj
.into_iter()
.map(|bytes| Cow::Owned(bytes.to_vec()))
.collect())
}

/// Return the bytes that are stored at the specified location in the given byte ranges
#[pyo3(text_signature = "($self, location, ranges)")]
fn get_ranges_async<'a>(
&'a self,
py: Python<'a>,
location: PyPath,
ranges: Vec<(usize, usize)>,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
let ranges = ranges
.into_iter()
.map(|(start, length)| std::ops::Range {
start,
end: start + length,
})
.collect::<Vec<_>>();

pyo3_asyncio::tokio::future_into_py(py, async move {
let obj = inner
.get_ranges(&location.into(), &ranges)
.await
.map_err(ObjectStoreError::from)?;
Ok(obj
.into_iter()
.map(|bytes| Cow::Owned(bytes.to_vec()))
.collect::<Vec<Cow<[u8]>>>())
})
}

/// Return the metadata for the specified location
#[pyo3(text_signature = "($self, location)")]
fn head(&self, location: PyPath) -> PyResult<PyObjectMeta> {
Expand Down

0 comments on commit afb7cfd

Please sign in to comment.