Skip to content

Commit

Permalink
refactor(core/raw): Migrate oio::Write to async in trait (#4358)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Mar 14, 2024
1 parent 7533627 commit 6ebd75c
Show file tree
Hide file tree
Showing 65 changed files with 945 additions and 1,640 deletions.
15 changes: 6 additions & 9 deletions core/benches/oio/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use opendal::raw::oio;
use rand::prelude::ThreadRng;
Expand All @@ -27,16 +24,16 @@ use rand::RngCore;
pub struct BlackHoleWriter;

impl oio::Write for BlackHoleWriter {
fn poll_write(&mut self, _: &mut Context<'_>, bs: Bytes) -> Poll<opendal::Result<usize>> {
Poll::Ready(Ok(bs.len()))
async fn write(&mut self, bs: Bytes) -> opendal::Result<usize> {
Ok(bs.len())
}

fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<opendal::Result<()>> {
Poll::Ready(Ok(()))
async fn abort(&mut self) -> opendal::Result<()> {
Ok(())
}

fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<opendal::Result<()>> {
Poll::Ready(Ok(()))
async fn close(&mut self) -> opendal::Result<()> {
Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/benches/oio/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::Buf;
use criterion::Criterion;
use once_cell::sync::Lazy;
use opendal::raw::oio::ExactBufWriter;
use opendal::raw::oio::WriteExt;
use opendal::raw::oio::Write;
use rand::thread_rng;
use size::Size;

Expand Down
8 changes: 3 additions & 5 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use async_trait::async_trait;
use bytes;
use bytes::Bytes;
use futures::future::poll_fn;

use tokio::runtime::Handle;

use crate::raw::*;
Expand Down Expand Up @@ -299,13 +299,11 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.handle
.block_on(poll_fn(|cx| self.inner.poll_write(cx, bs.clone())))
self.handle.block_on(self.inner.write(bs))
}

fn close(&mut self) -> Result<()> {
self.handle
.block_on(poll_fn(|cx| self.inner.poll_close(cx)))
self.handle.block_on(self.inner.close())
}
}

Expand Down
23 changes: 10 additions & 13 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
use std::cmp;
use std::fmt::Debug;
use std::fmt::Formatter;

use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -158,7 +156,7 @@ impl<A: Accessor> CompleteAccessor<A> {
}
if capability.write_can_empty && capability.list {
let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
oio::WriteExt::close(&mut w).await?;
oio::Write::close(&mut w).await?;
return Ok(RpCreateDir::default());
}

Expand Down Expand Up @@ -712,35 +710,34 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
let n = ready!(w.poll_write(cx, bs))?;

Poll::Ready(Ok(n))
w.write(bs).await
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn close(&mut self) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

ready!(w.poll_close(cx))?;
w.close().await?;
self.inner = None;

Poll::Ready(Ok(()))
Ok(())
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn abort(&mut self) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

ready!(w.poll_abort(cx))?;
w.abort().await?;
self.inner = None;

Poll::Ready(Ok(()))
Ok(())
}
}

Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use std::fmt::Debug;

use std::io::SeekFrom;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -278,16 +276,16 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
}

impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs)
async fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs).await
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_close(cx)
async fn close(&mut self) -> Result<()> {
self.inner.close().await
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_abort(cx)
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
}

Expand Down
24 changes: 13 additions & 11 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
use std::ffi::CString;
use std::fmt::Debug;
use std::fmt::Formatter;

use std::io;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -408,12 +407,13 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
}

impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
.poll_write(cx, bs)
.map_ok(|n| {
.write(bs)
.await
.map(|n| {
probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n);
n
})
Expand All @@ -423,12 +423,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
})
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn abort(&mut self) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_poll_abort_start, c_path.as_ptr());
self.inner
.poll_abort(cx)
.map_ok(|_| {
.abort()
.await
.map(|_| {
probe_lazy!(opendal, writer_poll_abort_ok, c_path.as_ptr());
})
.map_err(|err| {
Expand All @@ -437,12 +438,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
})
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn close(&mut self) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_close_start, c_path.as_ptr());
self.inner
.poll_close(cx)
.map_ok(|_| {
.close()
.await
.map(|_| {
probe_lazy!(opendal, writer_close_ok, c_path.as_ptr());
})
.map_err(|err| {
Expand Down
15 changes: 6 additions & 9 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use std::fmt::Debug;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -387,27 +385,26 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
}
}

#[async_trait::async_trait]
impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs.clone()).map_err(|err| {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs.clone()).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("write_buf", bs.len().to_string())
})
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_close(cx).map_err(|err| {
async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(WriteOperation::Close)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_abort(cx).map_err(|err| {
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
err.with_operation(WriteOperation::Abort)
.with_context("service", self.scheme)
.with_context("path", &self.path)
Expand Down
27 changes: 12 additions & 15 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
use std::fmt::Debug;

use std::io;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -1147,8 +1144,8 @@ impl<W> LoggingWriter<W> {
}

impl<W: oio::Write> oio::Write for LoggingWriter<W> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
match ready!(self.inner.poll_write(cx, bs.clone())) {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
match self.inner.write(bs.clone()).await {
Ok(n) => {
self.written += n as u64;
trace!(
Expand All @@ -1161,7 +1158,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
bs.len(),
n,
);
Poll::Ready(Ok(n))
Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand All @@ -1176,13 +1173,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.ctx.error_print(&err),
)
}
Poll::Ready(Err(err))
Err(err)
}
}
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match ready!(self.inner.poll_abort(cx)) {
async fn abort(&mut self) -> Result<()> {
match self.inner.abort().await {
Ok(_) => {
trace!(
target: LOGGING_TARGET,
Expand All @@ -1192,7 +1189,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.path,
self.written,
);
Poll::Ready(Ok(()))
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand All @@ -1207,13 +1204,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.ctx.error_print(&err),
)
}
Poll::Ready(Err(err))
Err(err)
}
}
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match ready!(self.inner.poll_close(cx)) {
async fn close(&mut self) -> Result<()> {
match self.inner.close().await {
Ok(_) => {
debug!(
target: LOGGING_TARGET,
Expand All @@ -1223,7 +1220,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.path,
self.written
);
Poll::Ready(Ok(()))
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand All @@ -1238,7 +1235,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.ctx.error_print(&err),
)
}
Poll::Ready(Err(err))
Err(err)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub struct MadsimWriter {
}

impl oio::Write for MadsimWriter {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<crate::Result<usize>> {
async fn write(&mut self, bs: Bytes) -> crate::Result<usize> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
Expand All @@ -307,15 +307,15 @@ impl oio::Write for MadsimWriter {
}
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
Poll::Ready(Err(Error::new(
async fn abort(&mut self) -> crate::Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
)))
))
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
Poll::Ready(Ok(()))
async fn close(&mut self) -> crate::Result<()> {
Ok(())
}
}

Expand Down
Loading

0 comments on commit 6ebd75c

Please sign in to comment.