Skip to content

Commit

Permalink
refactor(raw): Return written bytes in oio::Write (#3005)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Sep 4, 2023
1 parent 1dfc7e6 commit 2c94a21
Show file tree
Hide file tree
Showing 42 changed files with 310 additions and 222 deletions.
8 changes: 4 additions & 4 deletions core/benches/oio/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ pub struct BlackHoleWriter;

#[async_trait]
impl oio::Write for BlackHoleWriter {
async fn write(&mut self, _: Bytes) -> opendal::Result<()> {
Ok(())
async fn write(&mut self, bs: Bytes) -> opendal::Result<u64> {
Ok(bs.len() as u64)
}

async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
Ok(())
async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
Ok(size)
}

async fn abort(&mut self) -> opendal::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
}

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
fn write(&mut self, bs: Bytes) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<u64> {
self.handle.block_on(self.inner.write(bs))
}

Expand Down
16 changes: 8 additions & 8 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
async fn write(&mut self, bs: Bytes) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<u64> {
let n = bs.len();

if let Some(size) = self.size {
Expand All @@ -731,10 +731,10 @@ where
})?;
w.write(bs).await?;
self.written += n as u64;
Ok(())
Ok(n as u64)
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
if let Some(total_size) = self.size {
if self.written + size > total_size {
return Err(Error::new(
Expand All @@ -750,9 +750,9 @@ where
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
w.sink(size, s).await?;
self.written += size;
Ok(())
let n = w.sink(size, s).await?;
self.written += n;
Ok(n)
}

async fn abort(&mut self) -> Result<()> {
Expand Down Expand Up @@ -794,7 +794,7 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
fn write(&mut self, bs: Bytes) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<u64> {
let n = bs.len();

if let Some(size) = self.size {
Expand All @@ -815,7 +815,7 @@ where

w.write(bs)?;
self.written += n as u64;
Ok(())
Ok(n as u64)
}

fn close(&mut self) -> Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {

#[async_trait]
impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).await
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner.sink(size, s).await
}

Expand All @@ -303,7 +303,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
fn write(&mut self, bs: Bytes) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs)
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {

#[async_trait::async_trait]
impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
Expand All @@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner.sink(size, s).await.map_err(|err| {
err.with_operation(WriteOperation::Sink)
.with_context("service", self.scheme)
Expand All @@ -437,7 +437,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}

impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
fn write(&mut self, bs: Bytes) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
Expand Down
29 changes: 14 additions & 15 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,21 +1252,20 @@ impl<W> LoggingWriter<W> {

#[async_trait]
impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len();
async fn write(&mut self, bs: Bytes) -> Result<u64> {
match self.inner.write(bs).await {
Ok(_) => {
self.written += size as u64;
Ok(n) => {
self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data write {}B",
self.ctx.scheme,
WriteOperation::Write,
self.path,
self.written,
size
n
);
Ok(())
Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand All @@ -1286,20 +1285,20 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
match self.inner.sink(size, s).await {
Ok(_) => {
self.written += size;
Ok(n) => {
self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data sink {}B",
self.ctx.scheme,
WriteOperation::Sink,
self.path,
self.written,
size
n
);
Ok(())
Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand Down Expand Up @@ -1383,11 +1382,11 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}

impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
fn write(&mut self, bs: Bytes) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<u64> {
let size = bs.len();
match self.inner.write(bs) {
Ok(_) => {
self.written += size as u64;
Ok(n) => {
self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data write {}B",
Expand All @@ -1397,7 +1396,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
self.written,
size
);
Ok(())
Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ pub struct MadsimWriter {

#[async_trait]
impl oio::Write for MadsimWriter {
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
async fn write(&mut self, bs: Bytes) -> crate::Result<u64> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
Expand All @@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
}
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
Expand Down
23 changes: 15 additions & 8 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,23 +847,28 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {

#[async_trait]
impl<R: oio::Write> oio::Write for MetricWrapper<R> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len();
async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.await
.map(|_| self.bytes += size as u64)
.map(|n| {
self.bytes += n;
n
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
err
})
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner
.sink(size, s)
.await
.map(|_| self.bytes += size)
.map(|n| {
self.bytes += n;
n
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
err
Expand All @@ -886,11 +891,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len();
fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.map(|_| self.bytes += size as u64)
.map(|n| {
self.bytes += n;
n
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
err
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {

#[async_trait]
impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.in_span(Span::enter_with_parent(
Expand All @@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
.await
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner
.sink(size, s)
.in_span(Span::enter_with_parent(
Expand Down Expand Up @@ -379,7 +379,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
fn write(&mut self, bs: Bytes) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<u64> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {

#[async_trait]
impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).await
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner.sink(size, s).await
}

Expand All @@ -331,7 +331,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
fn write(&mut self, bs: Bytes) -> Result<()> {
fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs)
}

Expand Down
23 changes: 12 additions & 11 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,32 +662,33 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {

#[async_trait]
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len();
async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.await
.map(|_| {
.map(|n| {
self.stats
.bytes_total
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.observe(size as f64)
.observe(n as f64);
n
})
.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
err
})
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner
.sink(size, s)
.await
.map(|_| {
.map(|n| {
self.stats
.bytes_total
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.observe(size as f64)
.observe(n as f64);
n
})
.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
Expand All @@ -711,15 +712,15 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len();
fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.map(|_| {
.map(|n| {
self.stats
.bytes_total
.with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()])
.observe(size as f64)
.observe(n as f64);
n
})
.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
Expand Down
Loading

0 comments on commit 2c94a21

Please sign in to comment.