Skip to content

Commit

Permalink
chore(core/layers): adjust await point to simplify combinator code (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
koushiro authored Oct 30, 2024
1 parent 3b0e054 commit e5076ac
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 168 deletions.
8 changes: 3 additions & 5 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use futures::FutureExt;

use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -76,16 +74,16 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.inner
.read(path, args)
.map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))))
.await
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

#[async_backtrace::framed]
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner
.write(path, args)
.map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))))
.await
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

#[async_backtrace::framed]
Expand All @@ -112,8 +110,8 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.inner
.list(path, args)
.map(|v| v.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r))))
.await
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

#[async_backtrace::framed]
Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use await_tree::InstrumentAwait;
use futures::Future;
use futures::FutureExt;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -86,16 +85,16 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
self.inner
.read(path, args)
.instrument_await(format!("opendal::{}", Operation::Read))
.map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))))
.await
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner
.write(path, args)
.instrument_await(format!("opendal::{}", Operation::Write))
.map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))))
.await
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
Expand Down Expand Up @@ -130,8 +129,8 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
self.inner
.list(path, args)
.instrument_await(format!("opendal::{}", Operation::List))
.map(|v| v.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r))))
.await
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
Expand Down
3 changes: 1 addition & 2 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::sync::Arc;
use std::sync::Mutex;

use futures::FutureExt;
use rand::prelude::*;
use rand::rngs::StdRng;

Expand Down Expand Up @@ -114,8 +113,8 @@ impl<A: Access> LayeredAccess for ChaosAccessor<A> {
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.inner
.read(path, args)
.map(|v| v.map(|(rp, r)| (rp, ChaosReader::new(r, self.rng.clone(), self.error_ratio))))
.await
.map(|(rp, r)| (rp, ChaosReader::new(r, self.rng.clone(), self.error_ratio)))
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
Expand Down
87 changes: 35 additions & 52 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use futures::TryFutureExt;

use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -82,21 +80,19 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.inner
.create_dir(path, args)
.map_err(|err| {
err.with_operation(Operation::CreateDir)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
self.inner.create_dir(path, args).await.map_err(|err| {
err.with_operation(Operation::CreateDir)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let range = args.range();
self.inner
.read(path, args)
.map_ok(|(rp, r)| {
.await
.map(|(rp, r)| {
(
rp,
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), r)
Expand All @@ -109,13 +105,13 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
.with_context("path", path)
.with_context("range", range.to_string())
})
.await
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner
.write(path, args)
.map_ok(|(rp, w)| {
.await
.map(|(rp, w)| {
(
rp,
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), w),
Expand All @@ -126,59 +122,47 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner
.copy(from, to, args)
.map_err(|err| {
err.with_operation(Operation::Copy)
.with_context("service", self.meta.scheme())
.with_context("from", from)
.with_context("to", to)
})
.await
self.inner.copy(from, to, args).await.map_err(|err| {
err.with_operation(Operation::Copy)
.with_context("service", self.meta.scheme())
.with_context("from", from)
.with_context("to", to)
})
}

async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
self.inner
.rename(from, to, args)
.map_err(|err| {
err.with_operation(Operation::Rename)
.with_context("service", self.meta.scheme())
.with_context("from", from)
.with_context("to", to)
})
.await
self.inner.rename(from, to, args).await.map_err(|err| {
err.with_operation(Operation::Rename)
.with_context("service", self.meta.scheme())
.with_context("from", from)
.with_context("to", to)
})
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner
.stat(path, args)
.map_err(|err| {
err.with_operation(Operation::Stat)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
self.inner.stat(path, args).await.map_err(|err| {
err.with_operation(Operation::Stat)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.inner
.delete(path, args)
.map_err(|err| {
err.with_operation(Operation::Delete)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
self.inner.delete(path, args).await.map_err(|err| {
err.with_operation(Operation::Delete)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.inner
.list(path, args)
.map_ok(|(rp, p)| {
.await
.map(|(rp, p)| {
(
rp,
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), p),
Expand All @@ -189,13 +173,13 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner
.batch(args)
.map_ok(|v| {
.await
.map(|v| {
let res = v
.into_results()
.into_iter()
Expand All @@ -215,7 +199,6 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
err.with_operation(Operation::Batch)
.with_context("service", self.meta.scheme())
})
.await
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
Expand Down
52 changes: 18 additions & 34 deletions core/src/layers/fastrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::future::Future;
use std::sync::Arc;

use fastrace::prelude::*;
use futures::FutureExt;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -144,32 +143,22 @@ impl<A: Access> LayeredAccess for FastraceAccessor<A> {

#[trace(enter_on_poll = true)]
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.inner
.read(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
FastraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
)
})
})
.await
self.inner.read(path, args).await.map(|(rp, r)| {
(
rp,
FastraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
)
})
}

#[trace(enter_on_poll = true)]
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner
.write(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
FastraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
)
})
})
.await
self.inner.write(path, args).await.map(|(rp, r)| {
(
rp,
FastraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
)
})
}

#[trace(enter_on_poll = true)]
Expand All @@ -194,17 +183,12 @@ impl<A: Access> LayeredAccess for FastraceAccessor<A> {

#[trace(enter_on_poll = true)]
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.inner
.list(path, args)
.map(|v| {
v.map(|(rp, s)| {
(
rp,
FastraceWrapper::new(Span::enter_with_local_parent("ListOperation"), s),
)
})
})
.await
self.inner.list(path, args).await.map(|(rp, s)| {
(
rp,
FastraceWrapper::new(Span::enter_with_local_parent("ListOperation"), s),
)
})
}

#[trace(enter_on_poll = true)]
Expand Down
Loading

0 comments on commit e5076ac

Please sign in to comment.