Skip to content

Commit

Permalink
Merge branch 'main' into azdls-list-start-from
Browse files Browse the repository at this point in the history
  • Loading branch information
alexwilcoxson-rel authored Oct 29, 2024
2 parents c370249 + 3fe8caa commit df6cf90
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 36 deletions.
5 changes: 4 additions & 1 deletion bindings/c/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
# under the License.

cmake_minimum_required(VERSION 3.22)
cmake_policy(SET CMP0135 NEW)

project(opendal-c)

if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Debug")
endif()

option(TEST_ENABLE_ASAN "Enable AddressSanitizer for tests" OFF)
set(GOOGLETEST_VERSION 1.15.2)

# force the compiler to support these standards
set(CMAKE_C_STANDARD 11)
Expand All @@ -35,6 +36,8 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)

# fetch google test via GitHub
include(FetchContent)

set(GOOGLETEST_VERSION 1.15.2)
FetchContent_Declare(
googletest
URL https://github.com/google/googletest/archive/refs/tags/v${GOOGLETEST_VERSION}.zip
Expand Down
2 changes: 1 addition & 1 deletion bindings/c/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
- To **build the library and header file**.

```sh
mkdir build && cd build
mkdir -p build && cd build
cmake ..
make
```
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ futures = "0.3.28"
opendal = { version = ">=0", path = "../../core", features = [
"layers-blocking",
] }
pyo3 = { version = "0.21.2", features = ["abi3", "abi3-py311"] }
pyo3-asyncio = { package = "pyo3-asyncio-0-21", version = "0.21", features = [
pyo3 = { version = "0.22.5", features = ["abi3", "abi3-py311"] }
pyo3-async-runtimes = { version = "0.22.0", features = [
"tokio-runtime",
] }
tokio = "1"
Expand Down
4 changes: 1 addition & 3 deletions bindings/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ description = "Apache OpenDAL™ Python Binding"
license = { text = "Apache-2.0" }
name = "opendal"
readme = "README.md"
# PyO3 doesn't support python 3.13 yet.
# ref: https://github.com/apache/opendal/issues/4268
requires-python = ">=3.11, < 3.13"
requires-python = ">=3.11"

[project.optional-dependencies]
benchmark = [
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use pyo3::buffer::PyBuffer;
use pyo3::exceptions::PyIOError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
use pyo3_async_runtimes::tokio::future_into_py;
use tokio::sync::Mutex;

use crate::*;
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use futures::TryStreamExt;
use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
use pyo3_async_runtimes::tokio::future_into_py;
use tokio::sync::Mutex;

use crate::*;
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::time::Duration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::types::PyDict;
use pyo3_asyncio::tokio::future_into_py;
use pyo3_async_runtimes::tokio::future_into_py;

use crate::*;

Expand All @@ -32,7 +32,7 @@ fn build_operator(
) -> PyResult<ocore::Operator> {
let mut op = ocore::Operator::via_iter(scheme, map).map_err(format_pyerr)?;
if !op.info().full_capability().blocking {
let runtime = pyo3_asyncio::tokio::get_runtime();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _guard = runtime.enter();
op = op
.layer(ocore::layers::BlockingLayer::create().expect("blocking layer must be created"));
Expand Down
6 changes: 6 additions & 0 deletions integrations/compat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ repository = "https://github.com/apache/opendal"
rust-version = "1.75"
version = "1.0.0"

[package.metadata.docs.rs]
all-features = true

[features]
v0_50_to_v0_49 = ["dep:opendal_v0_49", "dep:opendal_v0_50"]

[dependencies]
async-trait = "0.1"
opendal_v0_49 = { package = "opendal", version = "0.49", optional = true }
opendal_v0_50 = { package = "opendal", version = "0.50", optional = true, path = "../../core" }

[dev-dependencies]
tokio = { version = "1.41", features = ["full"] }
175 changes: 150 additions & 25 deletions integrations/compat/src/v0_50_to_v0_49.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use opendal_v0_49::raw::{
};
use opendal_v0_49::Buffer;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::Arc;

/// Convert an opendal v0.50 `Operator` into an opendal v0.49 `Operator` for compatibility.
Expand Down Expand Up @@ -58,13 +59,16 @@ impl<A: Debug> Debug for CompatAccessor<A> {
impl<A: opendal_v0_50::raw::Access> opendal_v0_49::raw::Access for CompatAccessor<A> {
type Reader = CompatWrapper<A::Reader>;
type Writer = CompatWrapper<A::Writer>;
type Lister = CompatWrapper<A::Lister>;
type Lister = CompatListWrapper<A::Lister>;
type BlockingReader = CompatWrapper<A::BlockingReader>;
type BlockingWriter = CompatWrapper<A::BlockingWriter>;
type BlockingLister = CompatWrapper<A::BlockingLister>;
type BlockingLister = CompatListWrapper<A::BlockingLister>;

fn info(&self) -> Arc<AccessorInfo> {
convert::raw_oio_accessor_info_into(self.0.info())
let new_info = self.0.info().deref().clone();
let old_info = convert::raw_oio_accessor_info_into(new_info);

Arc::new(old_info)
}

async fn create_dir(&self, path: &str, _: OpCreateDir) -> opendal_v0_49::Result<RpCreateDir> {
Expand Down Expand Up @@ -127,7 +131,10 @@ impl<A: opendal_v0_50::raw::Access> opendal_v0_49::raw::Access for CompatAccesso
.list(path, convert::raw_op_list_from(args))
.await
.map_err(convert::error_into)?;
Ok((convert::raw_rp_list_into(rp), CompatWrapper(lister)))
Ok((
convert::raw_rp_list_into(rp),
CompatListWrapper(path.to_string(), lister),
))
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> opendal_v0_49::Result<RpCopy> {
Expand Down Expand Up @@ -225,7 +232,10 @@ impl<A: opendal_v0_50::raw::Access> opendal_v0_49::raw::Access for CompatAccesso
.0
.blocking_list(path, convert::raw_op_list_from(args))
.map_err(convert::error_into)?;
Ok((convert::raw_rp_list_into(rp), CompatWrapper(lister)))
Ok((
convert::raw_rp_list_into(rp),
CompatListWrapper(path.to_string(), lister),
))
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> opendal_v0_49::Result<RpCopy> {
Expand Down Expand Up @@ -277,16 +287,6 @@ impl<I: opendal_v0_50::raw::oio::Write> opendal_v0_49::raw::oio::Write for Compa
}
}

impl<I: opendal_v0_50::raw::oio::List> opendal_v0_49::raw::oio::List for CompatWrapper<I> {
async fn next(&mut self) -> opendal_v0_49::Result<Option<opendal_v0_49::raw::oio::Entry>> {
self.0
.next()
.await
.map(|v| v.map(convert::raw_oio_entry_into))
.map_err(convert::error_into)
}
}

impl<I: opendal_v0_50::raw::oio::BlockingRead> opendal_v0_49::raw::oio::BlockingRead
for CompatWrapper<I>
{
Expand All @@ -312,14 +312,50 @@ impl<I: opendal_v0_50::raw::oio::BlockingWrite> opendal_v0_49::raw::oio::Blockin
}
}

/// A wrapper to convert `List` from v0.50 to v0.49.
///
/// The first `String` is the path of parent. We save it to check if the entry is itself.
/// In OpenDAL v0.50, lister will return itself, this behavior is different from v0.49.
struct CompatListWrapper<I>(String, I);

impl<I: opendal_v0_50::raw::oio::List> opendal_v0_49::raw::oio::List for CompatListWrapper<I> {
async fn next(&mut self) -> opendal_v0_49::Result<Option<opendal_v0_49::raw::oio::Entry>> {
loop {
let Some(de) = self
.1
.next()
.await
.map(|v| v.map(convert::raw_oio_entry_into))
.map_err(convert::error_into)?
else {
return Ok(None);
};
if de.path() == self.0 {
continue;
}
return Ok(Some(de));
}
}
}

impl<I: opendal_v0_50::raw::oio::BlockingList> opendal_v0_49::raw::oio::BlockingList
for CompatWrapper<I>
for CompatListWrapper<I>
{
fn next(&mut self) -> opendal_v0_49::Result<Option<opendal_v0_49::raw::oio::Entry>> {
self.0
.next()
.map(|v| v.map(convert::raw_oio_entry_into))
.map_err(convert::error_into)
loop {
let Some(de) = self
.1
.next()
.map(|v| v.map(convert::raw_oio_entry_into))
.map_err(convert::error_into)?
else {
return Ok(None);
};
if de.path() == self.0 {
continue;
}
return Ok(Some(de));
}
}
}

Expand All @@ -341,8 +377,8 @@ impl<I: opendal_v0_50::raw::oio::BlockingList> opendal_v0_49::raw::oio::Blocking
/// `transmute` also perform compile time checks to detect any type size mismatch like `OpWrite`
/// in which we added a new field since v0.50.
mod convert {
use opendal_v0_50::Metakey;
use std::mem::transmute;
use std::sync::Arc;

pub fn error_into(e: opendal_v0_50::Error) -> opendal_v0_49::Error {
unsafe { transmute(e) }
Expand All @@ -357,9 +393,61 @@ mod convert {
}

pub fn raw_oio_accessor_info_into(
e: Arc<opendal_v0_50::raw::AccessorInfo>,
) -> Arc<opendal_v0_49::raw::AccessorInfo> {
unsafe { transmute(e) }
e: opendal_v0_50::raw::AccessorInfo,
) -> opendal_v0_49::raw::AccessorInfo {
let mut info = opendal_v0_49::raw::AccessorInfo::default();
info.set_name(e.name())
.set_root(e.root())
.set_scheme(e.scheme().into_static().parse().unwrap())
.set_native_capability(capability_into(e.native_capability()));

info
}

/// opendal_v0_50 added a new field `write_with_if_none_match`.
pub fn capability_into(e: opendal_v0_50::Capability) -> opendal_v0_49::Capability {
opendal_v0_49::Capability {
stat: e.stat,
stat_with_if_match: e.stat_with_if_match,
stat_with_if_none_match: e.stat_with_if_none_match,
stat_with_override_cache_control: e.stat_with_override_cache_control,
stat_with_override_content_disposition: e.stat_with_override_content_disposition,
stat_with_override_content_type: e.stat_with_override_content_type,
read: e.read,
read_with_if_match: e.read_with_if_match,
read_with_if_none_match: e.read_with_if_none_match,
read_with_override_cache_control: e.read_with_override_cache_control,
read_with_override_content_disposition: e.read_with_override_content_disposition,
read_with_override_content_type: e.read_with_override_content_type,
write: e.write,
write_can_multi: e.write_can_multi,
write_can_empty: e.write_can_empty,
write_can_append: e.write_can_append,
write_with_content_type: e.write_with_content_type,
write_with_content_disposition: e.write_with_content_disposition,
write_with_cache_control: e.write_with_cache_control,
write_with_user_metadata: e.write_with_user_metadata,
write_multi_max_size: e.write_multi_max_size,
write_multi_min_size: e.write_multi_min_size,
write_multi_align_size: e.write_multi_align_size,
write_total_max_size: e.write_total_max_size,
create_dir: e.create_dir,
delete: e.delete,
copy: e.copy,
rename: e.rename,
list: e.list,
list_with_limit: e.list_with_limit,
list_with_start_after: e.list_with_start_after,
list_with_recursive: e.list_with_recursive,
presign: e.presign,
presign_read: e.presign_read,
presign_stat: e.presign_stat,
presign_write: e.presign_write,
batch: e.batch,
batch_delete: e.batch_delete,
batch_max_operations: e.batch_max_operations,
blocking: e.blocking,
}
}

pub fn raw_oio_entry_into(e: opendal_v0_50::raw::oio::Entry) -> opendal_v0_49::raw::oio::Entry {
Expand Down Expand Up @@ -419,8 +507,28 @@ mod convert {
unsafe { transmute(e) }
}

/// OpenDAL v0.50's OpList has a new field `version`.
pub fn raw_op_list_from(e: opendal_v0_49::raw::OpList) -> opendal_v0_50::raw::OpList {
unsafe { transmute(e) }
let mut op = opendal_v0_50::raw::OpList::new();

if let Some(v) = e.limit() {
op = op.with_limit(v);
}

if let Some(v) = e.start_after() {
op = op.with_start_after(v);
}

if e.recursive() {
op = op.with_recursive(true);
}

// There is no way for us to convert `metakey` without depending on `flagset`,
// let's just hardcode them.
op = op.with_metakey(Metakey::Mode | Metakey::LastModified);
op = op.with_concurrent(e.concurrent());

op
}

pub fn raw_rp_list_into(e: opendal_v0_50::raw::RpList) -> opendal_v0_49::raw::RpList {
Expand Down Expand Up @@ -459,3 +567,20 @@ mod convert {
unsafe { transmute(e) }
}
}

#[cfg(test)]
mod tests {
use opendal_v0_50 as new_o;

#[tokio::test]
async fn test_read() {
let new_op = new_o::Operator::from_config(new_o::services::MemoryConfig::default())
.unwrap()
.finish();
let old_op = super::v0_50_to_v0_49(new_op);

old_op.write("test", "hello, world!").await.unwrap();
let bs = old_op.read("test").await.unwrap();
assert_eq!(String::from_utf8_lossy(&bs.to_vec()), "hello, world!");
}
}

0 comments on commit df6cf90

Please sign in to comment.