Skip to content

Commit

Permalink
refactor: kv::adapter should use Buffer (write part)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Apr 16, 2024
1 parent 5b1974b commit 616baba
Showing 1 changed file with 15 additions and 51 deletions.
66 changes: 15 additions & 51 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use std::sync::Arc;
use std::vec::IntoIter;

use async_trait::async_trait;
use bytes::BytesMut;
use bytes::{Buf, Bytes};
use bytes::Buf;

use super::Adapter;
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::{HierarchyLister, QueueBuf};
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -228,87 +227,52 @@ impl oio::BlockingList for KvLister {
pub struct KvWriter<S> {
kv: Arc<S>,
path: String,

buffer: FlexBuffer,
buffer: QueueBuf,
}

impl<S> KvWriter<S> {
fn new(kv: Arc<S>, path: String) -> Self {
KvWriter {
kv,
path,
buffer: FlexBuffer::Active(BytesMut::new()),
buffer: QueueBuf::new(),
}
}
}

/// TODO: replace with oio::FlexBuf.
enum FlexBuffer {
Active(BytesMut),
Frozen(Bytes),
}

/// # Safety
///
/// We will only take `&mut Self` reference for KvWriter.
unsafe impl<S: Adapter> Sync for KvWriter<S> {}

impl<S: Adapter> oio::Write for KvWriter<S> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
match &mut self.buffer {
FlexBuffer::Active(buf) => {
buf.extend_from_slice(bs.chunk());
Ok(bs.chunk().len())
}
FlexBuffer::Frozen(_) => {
unreachable!("KvWriter should not be frozen during poll_write")
}
}
let ret = bs.len();
self.buffer.push(bs);
Ok(ret)
}

async fn close(&mut self) -> Result<()> {
let buf = match &mut self.buffer {
FlexBuffer::Active(buf) => {
let buf = buf.split().freeze();
self.buffer = FlexBuffer::Frozen(buf.clone());
buf
}
FlexBuffer::Frozen(buf) => buf.clone(),
};

self.kv.set(&self.path, &buf).await
let buf = self.buffer.collect();
self.kv.set(&self.path, buf.chunk()).await
}

async fn abort(&mut self) -> Result<()> {
self.buffer = FlexBuffer::Active(BytesMut::new());
self.buffer.clear();
Ok(())
}
}

impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
match &mut self.buffer {
FlexBuffer::Active(buf) => {
buf.extend_from_slice(bs.chunk());
Ok(bs.chunk().len())
}
FlexBuffer::Frozen(_) => {
unreachable!("KvWriter should not be frozen during poll_write")
}
}
let ret = bs.len();
self.buffer.push(bs);
Ok(ret)
}

fn close(&mut self) -> Result<()> {
let buf = match &mut self.buffer {
FlexBuffer::Active(buf) => {
let buf = buf.split().freeze();
self.buffer = FlexBuffer::Frozen(buf.clone());
buf
}
FlexBuffer::Frozen(buf) => buf.clone(),
};

self.kv.blocking_set(&self.path, &buf)?;
let buf = self.buffer.collect();
self.kv.blocking_set(&self.path, buf.chunk())?;
Ok(())
}
}

0 comments on commit 616baba

Please sign in to comment.