Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
imWildCat authored Jun 17, 2024
2 parents 1260920 + dbb9b9f commit f4bc0e8
Show file tree
Hide file tree
Showing 95 changed files with 4,102 additions and 3,554 deletions.
207 changes: 167 additions & 40 deletions README.md

Large diffs are not rendered by default.

181 changes: 70 additions & 111 deletions bin/oay/Cargo.lock

Large diffs are not rendered by default.

28 changes: 15 additions & 13 deletions bin/oay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,37 @@ default = ["frontends-webdav", "frontends-s3"]

frontends-s3 = []
frontends-webdav = [
"dep:dav-server",
"dep:dav-server-opendalfs",
"dep:bytes",
"dep:futures-util",
"dep:dav-server",
"dep:dav-server-opendalfs",
"dep:bytes",
"dep:futures-util",
]

[dependencies]
anyhow = "1"
axum = "0.6"
axum = "0.7"
bytes = { version = "1.5.0", optional = true }
chrono = "0.4.31"
clap = { version = "4", features = ["cargo", "string"] }
dav-server = { version = "0.5.8", optional = true }
dav-server = { version = "0.6", optional = true }
dav-server-opendalfs = { version = "0.0.2", path = "../../integrations/dav-server", optional = true }
dirs = "5.0.1"
futures = "0.3"
futures-util = { version = "0.3.29", optional = true }
opendal = { version = "0.47.0", path = "../../core" }
quick-xml = { version = "0.31", features = ["serialize", "overlapped-lists"] }
opendal = { version = "0.47.0", path = "../../core", features = [
"services-fs",
] }
quick-xml = { version = "0.32", features = ["serialize", "overlapped-lists"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.34", features = [
"fs",
"macros",
"rt-multi-thread",
"io-std",
"fs",
"macros",
"rt-multi-thread",
"io-std",
] }
toml = "0.8.12"
tower = "0.4"
tower-http = { version = "0.4", features = ["trace"] }
tower-http = { version = "0.5", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.5.1"
Expand Down
8 changes: 2 additions & 6 deletions bin/oay/src/services/s3/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use opendal::Metakey;
use opendal::Operator;
use serde::Deserialize;
use serde::Serialize;
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
use tracing::debug;

use crate::Config;
Expand All @@ -51,14 +49,12 @@ impl S3Service {

let app = Router::new()
.route("/", get(handle_list_objects))
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()))
.with_state(S3State {
op: self.op.clone(),
});

axum::Server::bind(&s3_cfg.addr.parse().unwrap())
.serve(app.into_make_service())
.await?;
let listener = tokio::net::TcpListener::bind(&s3_cfg.addr).await.unwrap();
axum::serve(listener, app.into_make_service()).await?;

Ok(())
}
Expand Down
7 changes: 4 additions & 3 deletions bin/oay/src/services/webdav/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ impl WebdavService {

let app = Router::new().route("/*path", any_service(webdav_service));

axum::Server::bind(&webdav_cfg.addr.parse().unwrap())
.serve(app.into_make_service())
.await?;
let listener = tokio::net::TcpListener::bind(&webdav_cfg.addr)
.await
.unwrap();
axum::serve(listener, app.into_make_service()).await?;

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions bin/ofs/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ static RUNTIME: OnceLock<Runtime> = OnceLock::new();

pub struct OfsTestContext {
pub mount_point: TempDir,
// This is a false positive, the field is used in the test.
#[allow(dead_code)]
pub capability: Capability,
mount_handle: MountHandle,
}
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ futures = "0.3.28"
opendal = { version = "0.47.0", path = "../../core", features = [
"layers-blocking",
] }
pyo3 = { version = "0.20.1", features = ["abi3", "abi3-py311"] }
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
pyo3 = { version = "0.21.2", features = ["abi3", "abi3-py311"] }
pyo3-asyncio = { package = "pyo3-asyncio-0-21", version = "0.21", features = ["tokio-runtime"]}
tokio = "1"

[target.'cfg(unix)'.dependencies.opendal]
Expand Down
44 changes: 21 additions & 23 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl File {
impl File {
/// Read and return at most size bytes, or if size is not given, until EOF.
#[pyo3(signature = (size=None,))]
pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> {
pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> PyResult<Bound<PyAny>> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
Expand Down Expand Up @@ -243,15 +243,13 @@ impl File {
pub fn flush(&mut self) -> PyResult<()> {
if matches!(self.0, FileState::Reader(_)) {
Ok(())
} else {
if let FileState::Writer(w) = &mut self.0 {
match w.flush() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
} else {
Ok(())
} else if let FileState::Writer(w) = &mut self.0 {
match w.flush() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
} else {
Ok(())
}
}

Expand Down Expand Up @@ -312,7 +310,7 @@ impl AsyncFile {
#[pymethods]
impl AsyncFile {
/// Read and return at most size bytes, or if size is not given, until EOF.
pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> {
pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();

future_into_py(py, async move {
Expand Down Expand Up @@ -357,7 +355,7 @@ impl AsyncFile {
}

/// Write bytes into the file.
pub fn write<'p>(&'p mut self, py: Python<'p>, bs: &'p [u8]) -> PyResult<&'p PyAny> {
pub fn write<'p>(&'p mut self, py: Python<'p>, bs: &'p [u8]) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();

// FIXME: can we avoid this clone?
Expand Down Expand Up @@ -398,7 +396,7 @@ impl AsyncFile {
///
/// Return the new absolute position.
#[pyo3(signature = (pos, whence = 0))]
pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> PyResult<&'p PyAny> {
pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();

let whence = match whence {
Expand Down Expand Up @@ -433,7 +431,7 @@ impl AsyncFile {
}

/// Return the current stream position.
pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();

future_into_py(py, async move {
Expand All @@ -460,7 +458,7 @@ impl AsyncFile {
})
}

fn close<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
fn close<'p>(&'p mut self, py: Python<'p>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let mut state = state.lock().await;
Expand All @@ -474,23 +472,23 @@ impl AsyncFile {
})
}

fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> {
fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
let slf = slf.into_py(py);
future_into_py(py, async move { Ok(slf) })
}

fn __aexit__<'a>(
&'a mut self,
py: Python<'a>,
_exc_type: &'a PyAny,
_exc_value: &'a PyAny,
_traceback: &'a PyAny,
) -> PyResult<&'a PyAny> {
_exc_type: &Bound<'a, PyAny>,
_exc_value: &Bound<'a, PyAny>,
_traceback: &Bound<'a, PyAny>,
) -> PyResult<Bound<'a, PyAny>> {
self.close(py)
}

/// Check if the stream may be read from.
pub fn readable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
pub fn readable<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Expand All @@ -499,7 +497,7 @@ impl AsyncFile {
}

/// Check if the stream may be written to.
pub fn writable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
pub fn writable<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Expand All @@ -508,7 +506,7 @@ impl AsyncFile {
}

/// Check if the stream reader may be re-located.
pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<PyAny>> {
if true {
self.readable(py)
} else {
Expand All @@ -518,7 +516,7 @@ impl AsyncFile {

/// Check if the stream is closed.
#[getter]
pub fn closed<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
pub fn closed<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Expand Down
42 changes: 24 additions & 18 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub use errors::*;
/// asyncio.run(main())
/// ```
#[pymodule]
fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
fn _opendal(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Operator>()?;
m.add_class::<AsyncOperator>()?;

Expand All @@ -83,28 +83,34 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<Capability>()?;

// Layer module
let layers_module = PyModule::new(py, "layers")?;
let layers_module = PyModule::new_bound(py, "layers")?;
layers_module.add_class::<Layer>()?;
layers_module.add_class::<RetryLayer>()?;
m.add_submodule(layers_module)?;
py.import("sys")?
m.add_submodule(&layers_module)?;
py.import_bound("sys")?
.getattr("modules")?
.set_item("opendal.layers", layers_module)?;

let exception_module = PyModule::new(py, "exceptions")?;
exception_module.add("Error", py.get_type::<Error>())?;
exception_module.add("Unexpected", py.get_type::<UnexpectedError>())?;
exception_module.add("Unsupported", py.get_type::<UnsupportedError>())?;
exception_module.add("ConfigInvalid", py.get_type::<ConfigInvalidError>())?;
exception_module.add("NotFound", py.get_type::<NotFoundError>())?;
exception_module.add("PermissionDenied", py.get_type::<PermissionDeniedError>())?;
exception_module.add("IsADirectory", py.get_type::<IsADirectoryError>())?;
exception_module.add("NotADirectory", py.get_type::<NotADirectoryError>())?;
exception_module.add("AlreadyExists", py.get_type::<AlreadyExistsError>())?;
exception_module.add("IsSameFile", py.get_type::<IsSameFileError>())?;
exception_module.add("ConditionNotMatch", py.get_type::<ConditionNotMatchError>())?;
m.add_submodule(exception_module)?;
py.import("sys")?
let exception_module = PyModule::new_bound(py, "exceptions")?;
exception_module.add("Error", py.get_type_bound::<Error>())?;
exception_module.add("Unexpected", py.get_type_bound::<UnexpectedError>())?;
exception_module.add("Unsupported", py.get_type_bound::<UnsupportedError>())?;
exception_module.add("ConfigInvalid", py.get_type_bound::<ConfigInvalidError>())?;
exception_module.add("NotFound", py.get_type_bound::<NotFoundError>())?;
exception_module.add(
"PermissionDenied",
py.get_type_bound::<PermissionDeniedError>(),
)?;
exception_module.add("IsADirectory", py.get_type_bound::<IsADirectoryError>())?;
exception_module.add("NotADirectory", py.get_type_bound::<NotADirectoryError>())?;
exception_module.add("AlreadyExists", py.get_type_bound::<AlreadyExistsError>())?;
exception_module.add("IsSameFile", py.get_type_bound::<IsSameFileError>())?;
exception_module.add(
"ConditionNotMatch",
py.get_type_bound::<ConditionNotMatchError>(),
)?;
m.add_submodule(&exception_module)?;
py.import_bound("sys")?
.getattr("modules")?
.set_item("opendal.exceptions", exception_module)?;
Ok(())
Expand Down
Loading

0 comments on commit f4bc0e8

Please sign in to comment.