From 2c94a2142b998cb9c86ddf9629582594340bb91e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 4 Sep 2023 14:51:17 +0800 Subject: [PATCH] refactor(raw): Return written bytes in oio::Write (#3005) Signed-off-by: Xuanwo --- core/benches/oio/utils.rs | 8 ++-- core/src/layers/blocking.rs | 2 +- core/src/layers/complete.rs | 16 +++---- core/src/layers/concurrent_limit.rs | 6 +-- core/src/layers/error_context.rs | 6 +-- core/src/layers/logging.rs | 29 ++++++------ core/src/layers/madsim.rs | 4 +- core/src/layers/metrics.rs | 23 ++++++---- core/src/layers/minitrace.rs | 6 +-- core/src/layers/oteltrace.rs | 6 +-- core/src/layers/prometheus.rs | 23 +++++----- core/src/layers/retry.rs | 8 ++-- core/src/layers/throttle.rs | 6 +-- core/src/layers/timeout.rs | 4 +- core/src/layers/tracing.rs | 6 +-- core/src/raw/adapters/kv/backend.rs | 12 +++-- core/src/raw/adapters/typed_kv/backend.rs | 12 +++-- core/src/raw/oio/write/api.rs | 46 +++++++++---------- core/src/raw/oio/write/append_object_write.rs | 12 +++-- core/src/raw/oio/write/at_least_buf_write.rs | 23 +++++++--- core/src/raw/oio/write/compose_write.rs | 8 ++-- core/src/raw/oio/write/exact_buf_write.rs | 19 ++++---- .../raw/oio/write/multipart_upload_write.rs | 12 +++-- core/src/raw/oio/write/one_shot_write.rs | 15 +++--- core/src/services/azblob/writer.rs | 20 ++++---- core/src/services/azdfs/writer.rs | 8 ++-- core/src/services/dropbox/writer.rs | 10 ++-- core/src/services/fs/writer.rs | 16 ++++--- core/src/services/ftp/writer.rs | 8 ++-- core/src/services/gcs/writer.rs | 22 +++++---- core/src/services/gdrive/writer.rs | 11 +++-- core/src/services/ghac/writer.rs | 6 +-- core/src/services/hdfs/writer.rs | 14 ++++-- core/src/services/ipmfs/writer.rs | 7 +-- core/src/services/onedrive/writer.rs | 10 ++-- core/src/services/sftp/writer.rs | 7 +-- core/src/services/supabase/writer.rs | 10 ++-- core/src/services/vercel_artifacts/writer.rs | 8 ++-- core/src/services/wasabi/writer.rs | 10 ++-- core/src/services/webdav/writer.rs | 15 ++++-- core/src/services/webhdfs/writer.rs | 10 ++-- core/src/types/writer.rs | 28 ++++++++--- 42 files changed, 310 insertions(+), 222 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 0e70bcfc74a..9a14442c242 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -27,12 +27,12 @@ pub struct BlackHoleWriter; #[async_trait] impl oio::Write for BlackHoleWriter { - async fn write(&mut self, _: Bytes) -> opendal::Result<()> { - Ok(()) + async fn write(&mut self, bs: Bytes) -> opendal::Result { + Ok(bs.len() as u64) } - async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> { - Ok(()) + async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result { + Ok(size) } async fn abort(&mut self) -> opendal::Result<()> { diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 7b80b595642..6e530023c1b 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -196,7 +196,7 @@ impl oio::BlockingRead for BlockingWrapper { } impl oio::BlockingWrite for BlockingWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { self.handle.block_on(self.inner.write(bs)) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 4c58b6e308f..e986d1a4751 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -711,7 +711,7 @@ impl oio::Write for CompleteWriter where W: oio::Write, { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let n = bs.len(); if let Some(size) = self.size { @@ -731,10 +731,10 @@ where })?; w.write(bs).await?; self.written += n as u64; - Ok(()) + Ok(n as u64) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { if let Some(total_size) = self.size { if self.written + size > total_size { return Err(Error::new( @@ -750,9 +750,9 @@ where let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.sink(size, s).await?; - self.written += size; - Ok(()) + let n = w.sink(size, s).await?; + self.written += n; + Ok(n) } async fn abort(&mut self) -> Result<()> { @@ -794,7 +794,7 @@ impl oio::BlockingWrite for CompleteWriter where W: oio::BlockingWrite, { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { let n = bs.len(); if let Some(size) = self.size { @@ -815,7 +815,7 @@ where w.write(bs)?; self.written += n as u64; - Ok(()) + Ok(n as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 9cef0fb9b62..96a682d61c1 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,7 +285,7 @@ impl oio::BlockingRead for ConcurrentLimitWrapper { #[async_trait] impl oio::Write for ConcurrentLimitWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs).await } @@ -293,7 +293,7 @@ impl oio::Write for ConcurrentLimitWrapper { self.inner.abort().await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { self.inner.sink(size, s).await } @@ -303,7 +303,7 @@ impl oio::Write for ConcurrentLimitWrapper { } impl oio::BlockingWrite for ConcurrentLimitWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs) } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index bfe9be4dfef..2acd6dd7d43 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -403,7 +403,7 @@ impl oio::BlockingRead for ErrorContextWrapper { #[async_trait::async_trait] impl oio::Write for ErrorContextWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs).await.map_err(|err| { err.with_operation(WriteOperation::Write) .with_context("service", self.scheme) @@ -419,7 +419,7 @@ impl oio::Write for ErrorContextWrapper { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { self.inner.sink(size, s).await.map_err(|err| { err.with_operation(WriteOperation::Sink) .with_context("service", self.scheme) @@ -437,7 +437,7 @@ impl oio::Write for ErrorContextWrapper { } impl oio::BlockingWrite for ErrorContextWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs).map_err(|err| { err.with_operation(WriteOperation::BlockingWrite) .with_context("service", self.scheme) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 07323cf8cde..6c63f466f5b 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,11 +1252,10 @@ impl LoggingWriter { #[async_trait] impl oio::Write for LoggingWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write(&mut self, bs: Bytes) -> Result { match self.inner.write(bs).await { - Ok(_) => { - self.written += size as u64; + Ok(n) => { + self.written += n; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data write {}B", @@ -1264,9 +1263,9 @@ impl oio::Write for LoggingWriter { WriteOperation::Write, self.path, self.written, - size + n ); - Ok(()) + Ok(n) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1286,10 +1285,10 @@ impl oio::Write for LoggingWriter { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { match self.inner.sink(size, s).await { - Ok(_) => { - self.written += size; + Ok(n) => { + self.written += n; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data sink {}B", @@ -1297,9 +1296,9 @@ impl oio::Write for LoggingWriter { WriteOperation::Sink, self.path, self.written, - size + n ); - Ok(()) + Ok(n) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1383,11 +1382,11 @@ impl oio::Write for LoggingWriter { } impl oio::BlockingWrite for LoggingWriter { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { let size = bs.len(); match self.inner.write(bs) { - Ok(_) => { - self.written += size as u64; + Ok(n) => { + self.written += n; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data write {}B", @@ -1397,7 +1396,7 @@ impl oio::BlockingWrite for LoggingWriter { self.written, size ); - Ok(()) + Ok(n) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index fdf0ec5deb9..17835e5bab0 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,7 +302,7 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - async fn write(&mut self, bs: Bytes) -> crate::Result<()> { + async fn write(&mut self, bs: Bytes) -> crate::Result { #[cfg(madsim)] { let req = Request::Write(self.path.to_string(), bs); @@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 1eade833bd3..181ebb3c031 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,23 +847,28 @@ impl oio::BlockingRead for MetricWrapper { #[async_trait] impl oio::Write for MetricWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write(&mut self, bs: Bytes) -> Result { self.inner .write(bs) .await - .map(|_| self.bytes += size as u64) + .map(|n| { + self.bytes += n; + n + }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { self.inner .sink(size, s) .await - .map(|_| self.bytes += size) + .map(|n| { + self.bytes += n; + n + }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err @@ -886,11 +891,13 @@ impl oio::Write for MetricWrapper { } impl oio::BlockingWrite for MetricWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + fn write(&mut self, bs: Bytes) -> Result { self.inner .write(bs) - .map(|_| self.bytes += size as u64) + .map(|n| { + self.bytes += n; + n + }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 1213d692ebf..75c852c3ac0 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,7 +337,7 @@ impl oio::BlockingRead for MinitraceWrapper { #[async_trait] impl oio::Write for MinitraceWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { self.inner .write(bs) .in_span(Span::enter_with_parent( @@ -347,7 +347,7 @@ impl oio::Write for MinitraceWrapper { .await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { self.inner .sink(size, s) .in_span(Span::enter_with_parent( @@ -379,7 +379,7 @@ impl oio::Write for MinitraceWrapper { } impl oio::BlockingWrite for MinitraceWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static()); self.inner.write(bs) diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 2ae39b05c03..fde87e9bae3 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,11 +313,11 @@ impl oio::BlockingRead for OtelTraceWrapper { #[async_trait] impl oio::Write for OtelTraceWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs).await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { self.inner.sink(size, s).await } @@ -331,7 +331,7 @@ impl oio::Write for OtelTraceWrapper { } impl oio::BlockingWrite for OtelTraceWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs) } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 005d6aa974d..644532bf6a9 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,16 +662,16 @@ impl oio::BlockingRead for PrometheusMetricWrapper { #[async_trait] impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write(&mut self, bs: Bytes) -> Result { self.inner .write(bs) .await - .map(|_| { + .map(|n| { self.stats .bytes_total .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(size as f64) + .observe(n as f64); + n }) .map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); @@ -679,15 +679,16 @@ impl oio::Write for PrometheusMetricWrapper { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { self.inner .sink(size, s) .await - .map(|_| { + .map(|n| { self.stats .bytes_total .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(size as f64) + .observe(n as f64); + n }) .map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); @@ -711,15 +712,15 @@ impl oio::Write for PrometheusMetricWrapper { } impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + fn write(&mut self, bs: Bytes) -> Result { self.inner .write(bs) - .map(|_| { + .map(|n| { self.stats .bytes_total .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .observe(size as f64) + .observe(n as f64); + n }) .map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 7517c2c211a..07b92e24c41 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -873,7 +873,7 @@ impl oio::BlockingRead for RetryWrapp #[async_trait] impl oio::Write for RetryWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let mut backoff = self.builder.build(); loop { @@ -919,14 +919,14 @@ impl oio::Write for RetryWrapper { /// The overhead is constant, which means the overhead will not increase with the size of /// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005% /// which is acceptable. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { let s = Arc::new(Mutex::new(s)); let mut backoff = self.builder.build(); loop { match self.inner.sink(size, Box::new(s.clone())).await { - Ok(_) => return Ok(()), + Ok(n) => return Ok(n), Err(e) if !e.is_temporary() => return Err(e), Err(e) => match backoff.next() { None => return Err(e), @@ -1013,7 +1013,7 @@ impl oio::Write for RetryWrapper { } impl oio::BlockingWrite for RetryWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { { || self.inner.write(bs.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index d929226df81..a88d1c701b8 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -217,7 +217,7 @@ impl oio::BlockingRead for ThrottleWrapper { #[async_trait] impl oio::Write for ThrottleWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { @@ -242,7 +242,7 @@ impl oio::Write for ThrottleWrapper { } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result { self.inner.sink(size, s).await } @@ -256,7 +256,7 @@ impl oio::Write for ThrottleWrapper { } impl oio::BlockingWrite for ThrottleWrapper { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index c0fb739aa13..be2289d0475 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,7 +322,7 @@ impl oio::Read for TimeoutWrapper { #[async_trait] impl oio::Write for TimeoutWrapper { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let timeout = self.io_timeout(bs.len() as u64); tokio::time::timeout(timeout, self.inner.write(bs)) @@ -335,7 +335,7 @@ impl oio::Write for TimeoutWrapper { })? } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { let timeout = self.io_timeout(size); tokio::time::timeout(timeout, self.inner.sink(size, s)) diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 9042e6e35f2..33dcbdebc12 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -324,7 +324,7 @@ impl oio::Write for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs).await } @@ -332,7 +332,7 @@ impl oio::Write for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { self.inner.sink(size, s).await } @@ -358,7 +358,7 @@ impl oio::BlockingWrite for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { self.inner.write(bs) } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index d1097110407..be4913ff9e5 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -390,13 +390,14 @@ impl KvWriter { #[async_trait] impl oio::Write for KvWriter { // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); self.buf = Some(bs.into()); - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", @@ -420,10 +421,11 @@ impl oio::Write for KvWriter { } impl oio::BlockingWrite for KvWriter { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); self.buf = Some(bs.into()); - Ok(()) + Ok(size as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 9f6186a38ad..48232c1fc8d 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -403,13 +403,14 @@ impl KvWriter { #[async_trait] impl oio::Write for KvWriter { // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); self.buf.push(bs); - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", @@ -429,10 +430,11 @@ impl oio::Write for KvWriter { } impl oio::BlockingWrite for KvWriter { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); self.buf.push(bs); - Ok(()) + Ok(size as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index f2bb025afc8..8ced843dafa 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -74,31 +74,29 @@ impl From for &'static str { pub type Writer = Box; /// Write is the trait that OpenDAL returns to callers. -/// -/// # Notes -/// -/// There are two possible two cases: -/// -/// - Sized: The total size of the object is known in advance. -/// - Unsized: The total size of the object is unknown in advance. -/// -/// And it's possible that the given bs length is less than the total -/// content length. Users will call write multiple times to write -/// the whole data. #[async_trait] pub trait Write: Unpin + Send + Sync { /// Write given bytes into writer. /// - /// # Notes + /// # Behavior /// - /// It's possible that the given bs length is less than the total - /// content length. And users will call write multiple times. + /// - `Ok(n)` means `n` bytes has been written successfully. + /// - `Err(err)` means error happens and no bytes has been written. /// - /// Please make sure `write` is safe to re-enter. - async fn write(&mut self, bs: Bytes) -> Result<()>; + /// It's possible that `n < bs.len()`, caller should pass the remaining bytes + /// repeatedly until all bytes has been written. + async fn write(&mut self, bs: Bytes) -> Result; /// Sink given stream into writer. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>; + /// + /// # Behavior + /// + /// - `Ok(n)` means `n` bytes has been written successfully. + /// - `Err(err)` means error happens and no bytes has been written. + /// + /// It's possible that `n < size`, caller should pass the remaining bytes + /// repeatedly until all bytes has been written. + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -109,13 +107,13 @@ pub trait Write: Unpin + Send + Sync { #[async_trait] impl Write for () { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let _ = bs; unimplemented!("write is required to be implemented for oio::Write") } - async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support sink", @@ -142,11 +140,11 @@ impl Write for () { /// To make Writer work as expected, we must add this impl. #[async_trait] impl Write for Box { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { (**self).write(bs).await } - async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result { (**self).sink(n, s).await } @@ -165,14 +163,14 @@ pub type BlockingWriter = Box; /// BlockingWrite is the trait that OpenDAL returns to callers. pub trait BlockingWrite: Send + Sync + 'static { /// Write whole content at once. - fn write(&mut self, bs: Bytes) -> Result<()>; + fn write(&mut self, bs: Bytes) -> Result; /// Close the writer and make sure all data has been flushed. fn close(&mut self) -> Result<()>; } impl BlockingWrite for () { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { let _ = bs; unimplemented!("write is required to be implemented for oio::BlockingWrite") @@ -190,7 +188,7 @@ impl BlockingWrite for () { /// /// To make BlockingWriter work as expected, we must add this impl. impl BlockingWrite for Box { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { (**self).write(bs) } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 07fa546cc6f..b047ef43dcb 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -79,7 +79,7 @@ impl oio::Write for AppendObjectWriter where W: AppendObjectWrite, { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let offset = self.offset().await?; let size = bs.len() as u64; @@ -87,16 +87,20 @@ where self.inner .append(offset, size, AsyncBody::Bytes(bs)) .await - .map(|_| self.offset = Some(offset + size)) + .map(|_| self.offset = Some(offset + size))?; + + Ok(size) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result { let offset = self.offset().await?; self.inner .append(offset, size, AsyncBody::Stream(s)) .await - .map(|_| self.offset = Some(offset + size)) + .map(|_| self.offset = Some(offset + size))?; + + Ok(size) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 91adddd3061..51f5d264512 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -64,7 +64,7 @@ impl AtLeastBufWriter { #[async_trait] impl oio::Write for AtLeastBufWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { // If total size is known and equals to given bytes, we can write it directly. if let Some(total_size) = self.total_size { if total_size == bs.len() as u64 { @@ -74,8 +74,9 @@ impl oio::Write for AtLeastBufWriter { // Push the bytes into the buffer if the buffer is not full. if self.buffer.len() + bs.len() < self.buffer_size { + let size = bs.len(); self.buffer.push(bs); - return Ok(()); + return Ok(size as u64); } let mut buf = self.buffer.clone(); @@ -85,10 +86,13 @@ impl oio::Write for AtLeastBufWriter { .sink(buf.len() as u64, Box::new(buf)) .await // Clear buffer if the write is successful. - .map(|_| self.buffer.clear()) + .map(|v| { + self.buffer.clear(); + v + }) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result { // If total size is known and equals to given stream, we can write it directly. if let Some(total_size) = self.total_size { if total_size == size { @@ -98,8 +102,10 @@ impl oio::Write for AtLeastBufWriter { // Push the bytes into the buffer if the buffer is not full. if self.buffer.len() as u64 + size < self.buffer_size as u64 { - self.buffer.push(s.collect().await?); - return Ok(()); + let bs = s.collect().await?; + let size = bs.len() as u64; + self.buffer.push(bs); + return Ok(size); } let buf = self.buffer.clone(); @@ -110,7 +116,10 @@ impl oio::Write for AtLeastBufWriter { .sink(buffer_size + size, Box::new(stream)) .await // Clear buffer if the write is successful. - .map(|_| self.buffer.clear()) + .map(|v| { + self.buffer.clear(); + v + }) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index 043df2978c3..79ddfc5ed77 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -57,14 +57,14 @@ pub enum TwoWaysWriter { #[async_trait] impl oio::Write for TwoWaysWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result { match self { Self::One(one) => one.sink(size, s).await, Self::Two(two) => two.sink(size, s).await, @@ -102,7 +102,7 @@ pub enum ThreeWaysWriter { impl oio::Write for ThreeWaysWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, @@ -110,7 +110,7 @@ impl oio::Write } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result { match self { Self::One(one) => one.sink(size, s).await, Self::Two(two) => two.sink(size, s).await, diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 8561c1a4ad0..8e2d8a922b2 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -84,7 +84,7 @@ impl ExactBufWriter { #[async_trait] impl oio::Write for ExactBufWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) .await } @@ -92,7 +92,7 @@ impl oio::Write for ExactBufWriter { /// # TODO /// /// We know every stream size, we can collect them into a buffer without chain them every time. - async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> { + async fn sink(&mut self, _: u64, mut s: Streamer) -> Result { if self.buffer.len() >= self.buffer_size { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); @@ -101,9 +101,10 @@ impl oio::Write for ExactBufWriter { .sink(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. - .map(|_| { + .map(|v| { self.buffer = buf; self.chain_stream(s); + v }); } @@ -120,8 +121,9 @@ impl oio::Write for ExactBufWriter { // // We don't need to chain stream here because it must be consumed. if buf.len() < self.buffer_size { + let size = buf.len() as u64; self.buffer = buf; - return Ok(()); + return Ok(size); } let to_write = buf.split_to(self.buffer_size); @@ -129,9 +131,10 @@ impl oio::Write for ExactBufWriter { .sink(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. - .map(|_| { + .map(|v| { self.buffer = buf; self.chain_stream(s); + v }) } @@ -202,14 +205,14 @@ mod tests { #[async_trait] impl Write for MockWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len()); self.buf.extend_from_slice(&bs); - Ok(()) + Ok(bs.len() as u64) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result { let bs = s.collect().await?; assert_eq!(bs.len() as u64, size); self.write(bs).await diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index c013b24c3a8..7bfd0342cf6 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -120,7 +120,7 @@ impl oio::Write for MultipartUploadWriter where W: MultipartUploadWrite, { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let upload_id = self.upload_id().await?; let size = bs.len(); @@ -133,16 +133,20 @@ where AsyncBody::Bytes(bs), ) .await - .map(|v| self.parts.push(v)) + .map(|v| self.parts.push(v))?; + + Ok(size as u64) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { let upload_id = self.upload_id().await?; self.inner .write_part(&upload_id, self.parts.len(), size, AsyncBody::Stream(s)) .await - .map(|v| self.parts.push(v)) + .map(|v| self.parts.push(v))?; + + Ok(size) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 35b1883bf2c..e6fe4761601 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -49,15 +49,18 @@ impl OneShotWriter { #[async_trait] impl oio::Write for OneShotWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let cursor = oio::Cursor::from(bs); - self.inner - .write_once(cursor.len() as u64, Box::new(cursor)) - .await + + let size = cursor.len() as u64; + self.inner.write_once(size, Box::new(cursor)).await?; + + Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.write_once(size, s).await + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { + self.inner.write_once(size, s).await?; + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 31a56f27a92..a3b8abe30a3 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -161,10 +161,11 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; + if self.op.append() { - self.append_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + self.append_oneshot(size, AsyncBody::Bytes(bs)).await?; } else { if self.op.content_length().is_none() { return Err(Error::new( @@ -173,14 +174,15 @@ impl oio::Write for AzblobWriter { )); } - self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; } + + Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { if self.op.append() { - self.append_oneshot(size, AsyncBody::Stream(s)).await + self.append_oneshot(size, AsyncBody::Stream(s)).await?; } else { if self.op.content_length().is_none() { return Err(Error::new( @@ -189,8 +191,10 @@ impl oio::Write for AzblobWriter { )); } - self.write_oneshot(size, AsyncBody::Stream(s)).await + self.write_oneshot(size, AsyncBody::Stream(s)).await?; } + + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 3c8db1ac1df..ff1125bfa4c 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -41,7 +41,9 @@ impl AzdfsWriter { #[async_trait] impl oio::Write for AzdfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; + let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -78,7 +80,7 @@ impl oio::Write for AzdfsWriter { match status { StatusCode::OK | StatusCode::ACCEPTED => { resp.into_body().consume().await?; - Ok(()) + Ok(size) } _ => Err(parse_error(resp) .await? @@ -86,7 +88,7 @@ impl oio::Write for AzdfsWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 2f5e9755800..1b5b6b17d1f 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -40,12 +40,14 @@ impl DropboxWriter { #[async_trait] impl oio::Write for DropboxWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); + let resp = self .core .dropbox_update( &self.path, - Some(bs.len()), + Some(size), self.op.content_type(), AsyncBody::Bytes(bs), ) @@ -54,13 +56,13 @@ impl oio::Write for DropboxWriter { match status { StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 4d31444af61..9ca571077e4 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -54,18 +54,20 @@ impl oio::Write for FsWriter { /// /// File could be partial written, so we will seek to start to make sure /// we write the same content. - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; + self.f .seek(SeekFrom::Start(self.pos)) .await .map_err(parse_io_error)?; self.f.write_all(&bs).await.map_err(parse_io_error)?; - self.pos += bs.len() as u64; + self.pos += size; - Ok(()) + Ok(size) } - async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result { while let Some(bs) = s.next().await { let bs = bs?; self.f @@ -76,7 +78,7 @@ impl oio::Write for FsWriter { self.pos += bs.len() as u64; } - Ok(()) + Ok(size) } async fn abort(&mut self) -> Result<()> { @@ -104,14 +106,14 @@ impl oio::BlockingWrite for FsWriter { /// /// File could be partial written, so we will seek to start to make sure /// we write the same content. - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { self.f .seek(SeekFrom::Start(self.pos)) .map_err(parse_io_error)?; self.f.write_all(&bs).map_err(parse_io_error)?; self.pos += bs.len() as u64; - Ok(()) + Ok(bs.len() as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index cd4ba0f6a49..18dd6fed97b 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -41,7 +41,9 @@ impl FtpWriter { #[async_trait] impl oio::Write for FtpWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); + let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; data_stream.write_all(&bs).await.map_err(|err| { @@ -50,10 +52,10 @@ impl oio::Write for FtpWriter { ftp_stream.finalize_put_stream(data_stream).await?; - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index e6cd8703c4c..8c4431302fc 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -118,16 +118,18 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; + let location = match &self.location { Some(location) => location, None => { if self.op.content_length().unwrap_or_default() == bs.len() as u64 && self.written == 0 { - return self - .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; + self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; + + return Ok(size); } else { let location = self.initiate_upload().await?; self.location = Some(location); @@ -138,22 +140,23 @@ impl oio::Write for GcsWriter { // Ignore empty bytes if bs.is_empty() { - return Ok(()); + return Ok(0); } self.buffer.push(bs); // Return directly if the buffer is not full if self.buffer.len() <= self.write_fixed_size { - return Ok(()); + return Ok(size); } let bs = self.buffer.peak_exact(self.write_fixed_size); + let size = bs.len() as u64; match self.write_part(location, bs).await { Ok(_) => { self.buffer.take(self.write_fixed_size); self.written += self.write_fixed_size as u64; - Ok(()) + Ok(size) } Err(e) => { // If the upload fails, we should pop the given bs to make sure @@ -164,8 +167,9 @@ impl oio::Write for GcsWriter { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { + self.write_oneshot(size, AsyncBody::Stream(s)).await?; + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 48d88f3ec04..b3313713730 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -95,15 +95,18 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; if self.file_id.is_none() { - self.write_create(bs.len() as u64, bs).await + self.write_create(size, bs).await?; } else { - self.write_overwrite(bs.len() as u64, bs).await + self.write_overwrite(size, bs).await?; } + + Ok(size) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index b2f9599476e..6bd4bf05751 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -42,7 +42,7 @@ impl GhacWriter { #[async_trait] impl oio::Write for GhacWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let size = bs.len() as u64; let req = self .backend @@ -54,7 +54,7 @@ impl oio::Write for GhacWriter { if resp.status().is_success() { resp.into_body().consume().await?; self.size += size; - Ok(()) + Ok(size) } else { Err(parse_error(resp) .await @@ -62,7 +62,7 @@ impl oio::Write for GhacWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 23c5f1d6827..011c8352e93 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -41,7 +41,9 @@ impl HdfsWriter { #[async_trait] impl oio::Write for HdfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); + while self.pos < bs.len() { let n = self .f @@ -53,10 +55,10 @@ impl oio::Write for HdfsWriter { // Reset pos to 0 for next write. self.pos = 0; - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", @@ -78,7 +80,9 @@ impl oio::Write for HdfsWriter { } impl oio::BlockingWrite for HdfsWriter { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); + while self.pos < bs.len() { let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?; self.pos += n; @@ -86,7 +90,7 @@ impl oio::BlockingWrite for HdfsWriter { // Reset pos to 0 for next write. self.pos = 0; - Ok(()) + Ok(size as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 52847814269..43a46e500f0 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -38,7 +38,8 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; let resp = self.backend.ipmfs_write(&self.path, bs).await?; let status = resp.status(); @@ -46,13 +47,13 @@ impl oio::Write for IpmfsWriter { match status { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 6201308474f..1086f3fde3d 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -46,17 +46,19 @@ impl OneDriveWriter { #[async_trait] impl oio::Write for OneDriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { let size = bs.len(); if size <= Self::MAX_SIMPLE_SIZE { - self.write_simple(bs).await + self.write_simple(bs).await?; } else { - self.write_chunked(bs).await + self.write_chunked(bs).await?; } + + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 76da70da3f8..71ac41d7c7a 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -36,13 +36,14 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; self.file.write_all(&bs).await?; - Ok(()) + Ok(size) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index f4c27131313..b786896c23c 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -67,15 +67,17 @@ impl SupabaseWriter { #[async_trait] impl oio::Write for SupabaseWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { if bs.is_empty() { - return Ok(()); + return Ok(9); } - self.upload(bs).await + let size = bs.len(); + self.upload(bs).await?; + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 6f32d67ba50..1db2d230f14 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -39,7 +39,9 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::Write for VercelArtifactsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); + let resp = self .backend .vercel_artifacts_put( @@ -54,13 +56,13 @@ impl oio::Write for VercelArtifactsWriter { match status { StatusCode::OK | StatusCode::ACCEPTED => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 689c334dcc8..130e8e9119b 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -41,12 +41,14 @@ impl WasabiWriter { #[async_trait] impl oio::Write for WasabiWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); + let resp = self .core .put_object( &self.path, - Some(bs.len()), + Some(size), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), @@ -57,13 +59,13 @@ impl oio::Write for WasabiWriter { match resp.status() { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index a3c17bafa7d..8dc093e65c6 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -62,13 +62,18 @@ impl WebdavWriter { #[async_trait] impl oio::Write for WebdavWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len() as u64; + + self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; + + Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result { + self.write_oneshot(size, AsyncBody::Stream(s)).await?; + + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 97eef2e3d25..1b055f122c8 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -39,12 +39,14 @@ impl WebhdfsWriter { #[async_trait] impl oio::Write for WebhdfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result { + let size = bs.len(); + let req = self .backend .webhdfs_create_object_request( &self.path, - Some(bs.len()), + Some(size), self.op.content_type(), AsyncBody::Bytes(bs), ) @@ -56,13 +58,13 @@ impl oio::Write for WebhdfsWriter { match status { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 37a9fe72ca9..f4b93dfa6ab 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -22,7 +22,7 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::future::BoxFuture; use futures::AsyncWrite; use futures::FutureExt; @@ -81,14 +81,23 @@ impl Writer { /// Write into inner writer. pub async fn write(&mut self, bs: impl Into) -> Result<()> { - if let State::Idle(Some(w)) = &mut self.state { - w.write(bs.into()).await + let w = if let State::Idle(Some(w)) = &mut self.state { + w } else { unreachable!( "writer state invalid while write, expect Idle, actual {}", self.state ); + }; + + let mut bs = bs.into(); + + while !bs.is_empty() { + let n = w.write(bs.clone()).await?; + bs.advance(n as usize); } + + Ok(()) } /// Sink into writer. @@ -123,7 +132,7 @@ impl Writer { /// Ok(()) /// } /// ``` - pub async fn sink(&mut self, size: u64, sink_from: S) -> Result<()> + pub async fn sink(&mut self, size: u64, sink_from: S) -> Result where S: futures::Stream> + Send + Sync + Unpin + 'static, T: Into, @@ -169,7 +178,7 @@ impl Writer { /// Ok(()) /// } /// ``` - pub async fn copy(&mut self, size: u64, read_from: R) -> Result<()> + pub async fn copy(&mut self, size: u64, read_from: R) -> Result where R: futures::AsyncRead + Send + Sync + Unpin + 'static, { @@ -390,7 +399,14 @@ impl BlockingWriter { /// Write into inner writer. pub fn write(&mut self, bs: impl Into) -> Result<()> { - self.inner.write(bs.into()) + let mut bs = bs.into(); + + while !bs.is_empty() { + let n = self.inner.write(bs.clone())?; + bs.advance(n as usize); + } + + Ok(()) } /// Close the writer and make sure all data have been stored.