diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 9053930f26c..783f43659b6 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -19,11 +19,10 @@ use std::sync::Arc; use std::vec::IntoIter; use async_trait::async_trait; -use bytes::BytesMut; -use bytes::{Buf, Bytes}; +use bytes::Buf; use super::Adapter; -use crate::raw::oio::HierarchyLister; +use crate::raw::oio::{HierarchyLister, QueueBuf}; use crate::raw::*; use crate::*; @@ -228,8 +227,7 @@ impl oio::BlockingList for KvLister { pub struct KvWriter { kv: Arc, path: String, - - buffer: FlexBuffer, + buffer: QueueBuf, } impl KvWriter { @@ -237,17 +235,11 @@ impl KvWriter { KvWriter { kv, path, - buffer: FlexBuffer::Active(BytesMut::new()), + buffer: QueueBuf::new(), } } } -/// TODO: replace with oio::FlexBuf. -enum FlexBuffer { - Active(BytesMut), - Frozen(Bytes), -} - /// # Safety /// /// We will only take `&mut Self` reference for KvWriter. @@ -255,60 +247,32 @@ unsafe impl Sync for KvWriter {} impl oio::Write for KvWriter { async fn write(&mut self, bs: Buffer) -> Result { - match &mut self.buffer { - FlexBuffer::Active(buf) => { - buf.extend_from_slice(bs.chunk()); - Ok(bs.chunk().len()) - } - FlexBuffer::Frozen(_) => { - unreachable!("KvWriter should not be frozen during poll_write") - } - } + let ret = bs.len(); + self.buffer.push(bs); + Ok(ret) } async fn close(&mut self) -> Result<()> { - let buf = match &mut self.buffer { - FlexBuffer::Active(buf) => { - let buf = buf.split().freeze(); - self.buffer = FlexBuffer::Frozen(buf.clone()); - buf - } - FlexBuffer::Frozen(buf) => buf.clone(), - }; - - self.kv.set(&self.path, &buf).await + let buf = self.buffer.collect(); + self.kv.set(&self.path, buf.chunk()).await } async fn abort(&mut self) -> Result<()> { - self.buffer = FlexBuffer::Active(BytesMut::new()); + self.buffer.clear(); Ok(()) } } impl oio::BlockingWrite for KvWriter { fn write(&mut self, bs: Buffer) -> Result { - match &mut self.buffer { - FlexBuffer::Active(buf) => { - buf.extend_from_slice(bs.chunk()); - Ok(bs.chunk().len()) - } - FlexBuffer::Frozen(_) => { - unreachable!("KvWriter should not be frozen during poll_write") - } - } + let ret = bs.len(); + self.buffer.push(bs); + Ok(ret) } fn close(&mut self) -> Result<()> { - let buf = match &mut self.buffer { - FlexBuffer::Active(buf) => { - let buf = buf.split().freeze(); - self.buffer = FlexBuffer::Frozen(buf.clone()); - buf - } - FlexBuffer::Frozen(buf) => buf.clone(), - }; - - self.kv.blocking_set(&self.path, &buf)?; + let buf = self.buffer.collect(); + self.kv.blocking_set(&self.path, buf.chunk())?; Ok(()) } }