From 655a487232fe8b1dc25cc0cb588fe54a75af77f5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Apr 2024 18:28:23 +0800 Subject: [PATCH 1/2] chore(core): Align Reader and Writer's API design (#4498) * chore(core): Align Reader and Writer's API design Signed-off-by: Xuanwo * Polish name Signed-off-by: Xuanwo * Fix cpp Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- bin/oli/src/commands/cat.rs | 2 +- bin/oli/src/commands/cp.rs | 11 ++++------- bindings/c/include/opendal.h | 4 ++-- bindings/c/src/reader.rs | 4 ++-- bindings/cpp/src/lib.rs | 4 +--- bindings/cpp/src/reader.rs | 2 +- bindings/nodejs/src/lib.rs | 8 ++++---- bindings/python/src/file.rs | 11 ++++------- core/benches/ops/read.rs | 2 +- core/src/types/blocking_reader.rs | 23 +++++++++++------------ core/src/types/mod.rs | 6 +++--- core/src/types/reader.rs | 23 +++++++++++------------ core/src/types/writer.rs | 14 +++++++------- core/tests/behavior/async_write.rs | 6 +++--- integrations/object_store/src/lib.rs | 12 ++++++------ 15 files changed, 61 insertions(+), 71 deletions(-) diff --git a/bin/oli/src/commands/cat.rs b/bin/oli/src/commands/cat.rs index 06f139fb339..db3b2ef8503 100644 --- a/bin/oli/src/commands/cat.rs +++ b/bin/oli/src/commands/cat.rs @@ -39,7 +39,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> { let reader = op.reader(&path).await?; let meta = op.stat(&path).await?; - let mut buf_reader = reader.into_futures_io_async_read(0..meta.content_length()); + let mut buf_reader = reader.into_futures_async_read(0..meta.content_length()); let mut stdout = io::AllowStdIo::new(std::io::stdout()); io::copy_buf(&mut buf_reader, &mut stdout).await?; Ok(()) diff --git a/bin/oli/src/commands/cp.rs b/bin/oli/src/commands/cp.rs index 84859bf4a7a..7dd9c640f79 100644 --- a/bin/oli/src/commands/cp.rs +++ b/bin/oli/src/commands/cp.rs @@ -46,13 +46,10 @@ pub async fn main(args: &ArgMatches) -> Result<()> { let (dst_op, dst_path) = cfg.parse_location(dst)?; if !recursive { - let mut dst_w = dst_op - .writer(&dst_path) - .await? - .into_futures_io_async_write(); + let mut dst_w = dst_op.writer(&dst_path).await?.into_futures_async_write(); let src_meta = src_op.stat(&src_path).await?; let reader = src_op.reader_with(&src_path).chunk(8 * 1024 * 1024).await?; - let buf_reader = reader.into_futures_io_async_read(0..src_meta.content_length()); + let buf_reader = reader.into_futures_async_read(0..src_meta.content_length()); futures::io::copy_buf(buf_reader, &mut dst_w).await?; // flush data dst_w.close().await?; @@ -74,12 +71,12 @@ pub async fn main(args: &ArgMatches) -> Result<()> { .strip_prefix(prefix) .expect("invalid path"); let reader = src_op.reader_with(de.path()).chunk(8 * 1024 * 1024).await?; - let buf_reader = reader.into_futures_io_async_read(0..meta.content_length()); + let buf_reader = reader.into_futures_async_read(0..meta.content_length()); let mut writer = dst_op .writer(&dst_root.join(fp).to_string_lossy()) .await? - .into_futures_io_async_write(); + .into_futures_async_write(); println!("Copying {}", de.path()); futures::io::copy_buf(buf_reader, &mut writer).await?; diff --git a/bindings/c/include/opendal.h b/bindings/c/include/opendal.h index ca9de2042ae..1efc5c7bf06 100644 --- a/bindings/c/include/opendal.h +++ b/bindings/c/include/opendal.h @@ -212,7 +212,7 @@ typedef struct OperatorInfo OperatorInfo; * * StdReader also implements [`Send`] and [`Sync`]. */ -typedef struct StdIoReader StdIoReader; +typedef struct StdReader StdReader; /** * \brief opendal_bytes carries raw-bytes with its length @@ -408,7 +408,7 @@ typedef struct opendal_result_read { * a opendal::BlockingReader, which is inside the Rust core code. */ typedef struct opendal_reader { - struct StdIoReader *inner; + struct StdReader *inner; } opendal_reader; /** diff --git a/bindings/c/src/reader.rs b/bindings/c/src/reader.rs index 5d93f7d2896..7e4d9098dcb 100644 --- a/bindings/c/src/reader.rs +++ b/bindings/c/src/reader.rs @@ -26,13 +26,13 @@ use super::*; /// a opendal::BlockingReader, which is inside the Rust core code. #[repr(C)] pub struct opendal_reader { - inner: *mut core::StdIoReader, + inner: *mut core::StdReader, } impl opendal_reader { pub(crate) fn new(reader: core::BlockingReader, size: u64) -> Self { Self { - inner: Box::into_raw(Box::new(reader.into_std_io_read(0..size))), + inner: Box::into_raw(Box::new(reader.into_std_read(0..size))), } } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 8cc88fddb01..720bf0bff0b 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -156,9 +156,7 @@ impl Operator { fn reader(&self, path: &str) -> Result> { let meta = self.0.stat(path)?; Ok(Box::new(Reader( - self.0 - .reader(path)? - .into_std_io_read(0..meta.content_length()), + self.0.reader(path)?.into_std_read(0..meta.content_length()), ))) } diff --git a/bindings/cpp/src/reader.rs b/bindings/cpp/src/reader.rs index 7c083501bca..0094d460004 100644 --- a/bindings/cpp/src/reader.rs +++ b/bindings/cpp/src/reader.rs @@ -21,7 +21,7 @@ use std::io::{Read, Seek}; use super::ffi; -pub struct Reader(pub od::StdIoReader); +pub struct Reader(pub od::StdReader); impl Reader { pub fn read(&mut self, buf: &mut [u8]) -> Result { diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index d47d5d51559..f8c44c7ebc4 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -191,7 +191,7 @@ impl Operator { let meta = self.0.stat(&path).await.map_err(format_napi_error)?; let r = self.0.reader(&path).await.map_err(format_napi_error)?; Ok(Reader { - inner: r.into_futures_io_async_read(0..meta.content_length()), + inner: r.into_futures_async_read(0..meta.content_length()), }) } @@ -220,7 +220,7 @@ impl Operator { let meta = self.0.blocking().stat(&path).map_err(format_napi_error)?; let r = self.0.blocking().reader(&path).map_err(format_napi_error)?; Ok(BlockingReader { - inner: r.into_std_io_read(0..meta.content_length()), + inner: r.into_std_read(0..meta.content_length()), }) } @@ -660,7 +660,7 @@ pub struct ListOptions { /// manner. #[napi] pub struct BlockingReader { - inner: opendal::StdIoReader, + inner: opendal::StdReader, } #[napi] @@ -677,7 +677,7 @@ impl BlockingReader { /// manner. #[napi] pub struct Reader { - inner: opendal::FuturesIoAsyncReader, + inner: opendal::FuturesAsyncReader, } #[napi] diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index a4a360a7bad..b2b240c3926 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -40,17 +40,14 @@ use crate::*; pub struct File(FileState, Capability); enum FileState { - Reader(ocore::StdIoReader), + Reader(ocore::StdReader), Writer(ocore::BlockingWriter), Closed, } impl File { pub fn new_reader(reader: ocore::BlockingReader, size: u64, capability: Capability) -> Self { - Self( - FileState::Reader(reader.into_std_io_read(0..size)), - capability, - ) + Self(FileState::Reader(reader.into_std_read(0..size)), capability) } pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self { @@ -289,7 +286,7 @@ impl File { pub struct AsyncFile(Arc>, Capability); enum AsyncFileState { - Reader(ocore::FuturesIoAsyncReader), + Reader(ocore::FuturesAsyncReader), Writer(ocore::Writer), Closed, } @@ -298,7 +295,7 @@ impl AsyncFile { pub fn new_reader(reader: ocore::Reader, size: u64, capability: Capability) -> Self { Self( Arc::new(Mutex::new(AsyncFileState::Reader( - reader.into_futures_io_async_read(0..size), + reader.into_futures_async_read(0..size), ))), capability, ) diff --git a/core/benches/ops/read.rs b/core/benches/ops/read.rs index 6df1f9f587b..b336c05efa2 100644 --- a/core/benches/ops/read.rs +++ b/core/benches/ops/read.rs @@ -52,7 +52,7 @@ fn bench_read_full(c: &mut Criterion, name: &str, op: Operator) { b.to_async(&*TEST_RUNTIME).iter(|| async { let r = op.reader_with(path).await.unwrap(); - let r = r.into_futures_io_async_read(0..size.bytes() as u64); + let r = r.into_futures_async_read(0..size.bytes() as u64); io::copy(r, &mut io::sink()).await.unwrap(); }) }); diff --git a/core/src/types/blocking_reader.rs b/core/src/types/blocking_reader.rs index a82c2caa9cf..38dcf0ecba9 100644 --- a/core/src/types/blocking_reader.rs +++ b/core/src/types/blocking_reader.rs @@ -142,18 +142,17 @@ impl BlockingReader { } } - /// Convert reader into [`FuturesIoAsyncReader`] which implements [`futures::AsyncRead`], + /// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`], /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. #[inline] - pub fn into_std_io_read(self, range: Range) -> StdIoReader { + pub fn into_std_read(self, range: Range) -> StdReader { // TODO: the capacity should be decided by services. - StdIoReader::new(self.inner, range) + StdReader::new(self.inner, range) } - /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`], - /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. + /// Convert reader into [`StdBytesIterator`] which implements [`Iterator`]. #[inline] - pub fn into_std_bytes_iterator(self, range: Range) -> StdBytesIterator { + pub fn into_bytes_iterator(self, range: Range) -> StdBytesIterator { StdBytesIterator::new(self.inner, range) } } @@ -176,7 +175,7 @@ pub mod into_std_read { /// Users can use this adapter in cases where they need to use [`Read`] or [`BufRead`] trait. /// /// StdReader also implements [`Send`] and [`Sync`]. - pub struct StdIoReader { + pub struct StdReader { inner: oio::BlockingReader, offset: u64, size: u64, @@ -186,11 +185,11 @@ pub mod into_std_read { buf: Buffer, } - impl StdIoReader { + impl StdReader { /// NOTE: don't allow users to create StdReader directly. #[inline] pub(super) fn new(r: oio::BlockingReader, range: Range) -> Self { - StdIoReader { + StdReader { inner: r, offset: range.start, size: range.end - range.start, @@ -209,7 +208,7 @@ pub mod into_std_read { } } - impl BufRead for StdIoReader { + impl BufRead for StdReader { fn fill_buf(&mut self) -> io::Result<&[u8]> { if self.buf.has_remaining() { return Ok(self.buf.chunk()); @@ -235,7 +234,7 @@ pub mod into_std_read { } } - impl Read for StdIoReader { + impl Read for StdReader { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { let bs = self.fill_buf()?; @@ -246,7 +245,7 @@ pub mod into_std_read { } } - impl Seek for StdIoReader { + impl Seek for StdReader { #[inline] fn seek(&mut self, pos: SeekFrom) -> io::Result { let new_pos = match pos { diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index 7e5523bb90e..8718cb590f7 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -29,17 +29,17 @@ pub use metadata::Metadata; pub use metadata::Metakey; mod reader; -pub use reader::into_futures_async_read::FuturesIoAsyncReader; +pub use reader::into_futures_async_read::FuturesAsyncReader; pub use reader::into_futures_stream::FuturesBytesStream; pub use reader::Reader; mod blocking_reader; pub use blocking_reader::into_std_iterator::StdBytesIterator; -pub use blocking_reader::into_std_read::StdIoReader; +pub use blocking_reader::into_std_read::StdReader; pub use blocking_reader::BlockingReader; mod writer; -pub use writer::into_futures_async_write::FuturesIoAsyncWriter; +pub use writer::into_futures_async_write::FuturesAsyncWriter; pub use writer::BlockingWriter; pub use writer::Writer; diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 52f820afa95..a67de651b60 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -171,17 +171,16 @@ impl Reader { stream::iter(futs).buffered(self.options.concurrent()) } - /// Convert reader into [`FuturesIoAsyncReader`] which implements [`futures::AsyncRead`], + /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`], /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. #[inline] - pub fn into_futures_io_async_read(self, range: Range) -> FuturesIoAsyncReader { - FuturesIoAsyncReader::new(self.inner, self.options.chunk(), range) + pub fn into_futures_async_read(self, range: Range) -> FuturesAsyncReader { + FuturesAsyncReader::new(self.inner, self.options.chunk(), range) } - /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`], - /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. + /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`]. #[inline] - pub fn into_futures_bytes_stream(self, range: Range) -> FuturesBytesStream { + pub fn into_bytes_stream(self, range: Range) -> FuturesBytesStream { FuturesBytesStream::new(self.inner, self.options.chunk(), range) } } @@ -288,7 +287,7 @@ pub mod into_futures_async_read { /// Users can use this adapter in cases where they need to use [`AsyncRead`] related trait. /// /// FuturesAsyncReader also implements [`Unpin`], [`Send`] and [`Sync`] - pub struct FuturesIoAsyncReader { + pub struct FuturesAsyncReader { state: State, offset: u64, size: u64, @@ -308,11 +307,11 @@ pub mod into_futures_async_read { /// FuturesReader only exposes `&mut self` to the outside world, so it's safe to be `Sync`. unsafe impl Sync for State {} - impl FuturesIoAsyncReader { + impl FuturesAsyncReader { /// NOTE: don't allow users to create FuturesAsyncReader directly. #[inline] pub(super) fn new(r: oio::Reader, chunk: Option, range: Range) -> Self { - FuturesIoAsyncReader { + FuturesAsyncReader { state: State::Idle(Some(r)), offset: range.start, size: range.end - range.start, @@ -324,7 +323,7 @@ pub mod into_futures_async_read { } } - impl AsyncBufRead for FuturesIoAsyncReader { + impl AsyncBufRead for FuturesAsyncReader { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); loop { @@ -369,7 +368,7 @@ pub mod into_futures_async_read { } } - impl AsyncRead for FuturesIoAsyncReader { + impl AsyncRead for FuturesAsyncReader { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -383,7 +382,7 @@ pub mod into_futures_async_read { } } - impl AsyncSeek for FuturesIoAsyncReader { + impl AsyncSeek for FuturesAsyncReader { fn poll_seek( mut self: Pin<&mut Self>, _: &mut Context<'_>, diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 5ab820fe5bb..fdc697566e1 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -176,9 +176,9 @@ impl Writer { self.inner.close().await } - /// Convert writer into [`FuturesIoAsyncWriter`] which implements [`futures::AsyncWrite`], - pub fn into_futures_io_async_write(self) -> FuturesIoAsyncWriter { - FuturesIoAsyncWriter::new(self.inner) + /// Convert writer into [`FuturesAsyncWriter`] which implements [`futures::AsyncWrite`], + pub fn into_futures_async_write(self) -> FuturesAsyncWriter { + FuturesAsyncWriter::new(self.inner) } } @@ -204,7 +204,7 @@ pub mod into_futures_async_write { /// # TODO /// /// We should insert checks if the input slice changed after future created. - pub struct FuturesIoAsyncWriter { + pub struct FuturesAsyncWriter { state: State, buf: oio::FlexBuf, } @@ -220,18 +220,18 @@ pub mod into_futures_async_write { /// FuturesReader only exposes `&mut self` to the outside world, so it's safe to be `Sync`. unsafe impl Sync for State {} - impl FuturesIoAsyncWriter { + impl FuturesAsyncWriter { /// NOTE: don't allow users to create FuturesIoAsyncWriter directly. #[inline] pub fn new(r: oio::Writer) -> Self { - FuturesIoAsyncWriter { + FuturesAsyncWriter { state: State::Idle(Some(r)), buf: oio::FlexBuf::new(256 * 1024), } } } - impl AsyncWrite for FuturesIoAsyncWriter { + impl AsyncWrite for FuturesAsyncWriter { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index 3f56d1cbc09..bb145ab2d5e 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -416,7 +416,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { .writer_with(&path) .buffer(8 * 1024 * 1024) .await? - .into_futures_io_async_write(); + .into_futures_async_write(); // Wrap a buf reader here to make sure content is read in 1MiB chunks. let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); @@ -452,7 +452,7 @@ pub async fn test_writer_futures_copy_with_concurrent(op: Operator) -> Result<() .buffer(8 * 1024 * 1024) .concurrent(4) .await? - .into_futures_io_async_write(); + .into_futures_async_write(); // Wrap a buf reader here to make sure content is read in 1MiB chunks. let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); @@ -515,7 +515,7 @@ pub async fn test_writer_with_append(op: Operator) -> Result<()> { .writer_with(&path) .append(true) .await? - .into_futures_io_async_write(); + .into_futures_async_write(); // Wrap a buf reader here to make sure content is read in 1MiB chunks. let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); diff --git a/integrations/object_store/src/lib.rs b/integrations/object_store/src/lib.rs index 981482f19eb..121fc93c1ef 100644 --- a/integrations/object_store/src/lib.rs +++ b/integrations/object_store/src/lib.rs @@ -134,12 +134,12 @@ impl ObjectStore for OpendalStore { .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; - let stream = r - .into_futures_bytes_stream(0..meta.size as u64) - .map_err(|err| object_store::Error::Generic { - store: "IoError", - source: Box::new(err), - }); + let stream = + r.into_bytes_stream(0..meta.size as u64) + .map_err(|err| object_store::Error::Generic { + store: "IoError", + source: Box::new(err), + }); Ok(GetResult { payload: GetResultPayload::Stream(Box::pin(stream)), From 2e9e89b9c959adb9c7de54e80ea3a1f6ca5a5b95 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 16 Apr 2024 18:47:31 +0800 Subject: [PATCH 2/2] refactor: typed_kv::adapter should use Buffer (#4497) * refactor: typed_kv::adapter should use Buffer Signed-off-by: tison * impl fn collect(mut self) -> Buffer Signed-off-by: tison * Revert "impl fn collect(mut self) -> Buffer" This reverts commit 482ff9700eb918360f7d017d8d0b48de823e85a6. * Enforce user to call clone outside Signed-off-by: Xuanwo --------- Signed-off-by: tison Signed-off-by: Xuanwo Co-authored-by: Xuanwo --- core/src/raw/adapters/typed_kv/api.rs | 7 +++--- core/src/raw/adapters/typed_kv/backend.rs | 26 +++++++++-------------- core/src/raw/oio/buf/queue_buf.rs | 8 +++---- core/src/raw/oio/write/exact_buf_write.rs | 4 ++-- 4 files changed, 19 insertions(+), 26 deletions(-) diff --git a/core/src/raw/adapters/typed_kv/api.rs b/core/src/raw/adapters/typed_kv/api.rs index f1990d5cc70..9b44c67e61c 100644 --- a/core/src/raw/adapters/typed_kv/api.rs +++ b/core/src/raw/adapters/typed_kv/api.rs @@ -19,15 +19,14 @@ use std::fmt::Debug; use std::mem::size_of; use async_trait::async_trait; -use bytes::Bytes; use chrono::Utc; -use crate::EntryMode; use crate::Error; use crate::ErrorKind; use crate::Metadata; use crate::Result; use crate::Scheme; +use crate::{Buffer, EntryMode}; /// Adapter is the typed adapter to underlying kv services. /// @@ -97,7 +96,7 @@ pub struct Value { /// Metadata of this value. pub metadata: Metadata, /// The corresponding content of this value. - pub value: Bytes, + pub value: Buffer, } impl Value { @@ -107,7 +106,7 @@ impl Value { metadata: Metadata::new(EntryMode::DIR) .with_content_length(0) .with_last_modified(Utc::now()), - value: Bytes::new(), + value: Buffer::new(), } } diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index b9935e25278..500ec205ebe 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -19,11 +19,10 @@ use std::sync::Arc; use std::vec::IntoIter; use async_trait::async_trait; -use bytes::{Buf, Bytes}; use super::Adapter; use super::Value; -use crate::raw::oio::HierarchyLister; +use crate::raw::oio::{HierarchyLister, QueueBuf}; use crate::raw::*; use crate::*; @@ -56,8 +55,8 @@ where #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Accessor for Backend { - type Reader = Bytes; - type BlockingReader = Bytes; + type Reader = Buffer; + type BlockingReader = Buffer; type Writer = KvWriter; type BlockingWriter = KvWriter; type Lister = HierarchyLister; @@ -238,7 +237,7 @@ pub struct KvWriter { path: String, op: OpWrite, - buf: Option>, + buf: Option, value: Option, } @@ -259,7 +258,7 @@ impl KvWriter { } fn build(&mut self) -> Value { - let value = self.buf.take().map(Bytes::from).unwrap_or_default(); + let value = self.buf.take().map(QueueBuf::collect).unwrap_or_default(); let mut metadata = Metadata::new(EntryMode::FILE); metadata.set_content_length(value.len() as u64); @@ -280,11 +279,9 @@ impl KvWriter { impl oio::Write for KvWriter { async fn write(&mut self, bs: Buffer) -> Result { - let size = bs.chunk().len(); - - let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size)); - buf.extend_from_slice(bs.chunk()); - + let size = bs.len(); + let mut buf = self.buf.take().unwrap_or_default(); + buf.push(bs); self.buf = Some(buf); Ok(size) } @@ -311,12 +308,9 @@ impl oio::Write for KvWriter { impl oio::BlockingWrite for KvWriter { fn write(&mut self, bs: Buffer) -> Result { let size = bs.len(); - - let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size)); - buf.extend_from_slice(bs.chunk()); - + let mut buf = self.buf.take().unwrap_or_default(); + buf.push(bs); self.buf = Some(buf); - Ok(size) } diff --git a/core/src/raw/oio/buf/queue_buf.rs b/core/src/raw/oio/buf/queue_buf.rs index 7e81ac4929b..102ee8c3d82 100644 --- a/core/src/raw/oio/buf/queue_buf.rs +++ b/core/src/raw/oio/buf/queue_buf.rs @@ -29,7 +29,7 @@ use std::collections::VecDeque; /// - `push`: Push a new buffer in the queue. /// - `collect`: Collect all buffer in the queue as a new [`Buffer`] /// - `advance`: Advance the queue by `cnt` bytes. -#[derive(Default)] +#[derive(Clone, Default)] pub struct QueueBuf(VecDeque); impl QueueBuf { @@ -68,13 +68,13 @@ impl QueueBuf { /// most of them should be acceptable since we can expect the item length of buffers are slower /// than 4k. #[inline] - pub fn collect(&self) -> Buffer { + pub fn collect(mut self) -> Buffer { if self.0.is_empty() { Buffer::new() } else if self.0.len() == 1 { - self.0.clone().pop_front().unwrap() + self.0.pop_front().unwrap() } else { - self.0.clone().into_iter().flatten().collect() + self.0.into_iter().flatten().collect() } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 6e6f385975e..3934915cc79 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -47,7 +47,7 @@ impl ExactBufWriter { impl oio::Write for ExactBufWriter { async fn write(&mut self, mut bs: Buffer) -> Result { if self.buffer.len() >= self.buffer_size { - let written = self.inner.write(self.buffer.collect()).await?; + let written = self.inner.write(self.buffer.clone().collect()).await?; self.buffer.advance(written); } @@ -64,7 +64,7 @@ impl oio::Write for ExactBufWriter { break; } - let written = self.inner.write(self.buffer.collect()).await?; + let written = self.inner.write(self.buffer.clone().collect()).await?; self.buffer.advance(written); }