Skip to content

Commit

Permalink
refactor: typed_kv::adapter should use Buffer (#4497)
Browse files Browse the repository at this point in the history
* refactor: typed_kv::adapter should use Buffer

Signed-off-by: tison <[email protected]>

* impl fn collect(mut self) -> Buffer

Signed-off-by: tison <[email protected]>

* Revert "impl fn collect(mut self) -> Buffer"

This reverts commit 482ff97.

* Enforce user to call clone outside

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: tison <[email protected]>
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
  • Loading branch information
tisonkun and Xuanwo authored Apr 16, 2024
1 parent 655a487 commit 2e9e89b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 26 deletions.
7 changes: 3 additions & 4 deletions core/src/raw/adapters/typed_kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use std::fmt::Debug;
use std::mem::size_of;

use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;

use crate::EntryMode;
use crate::Error;
use crate::ErrorKind;
use crate::Metadata;
use crate::Result;
use crate::Scheme;
use crate::{Buffer, EntryMode};

/// Adapter is the typed adapter to underlying kv services.
///
Expand Down Expand Up @@ -97,7 +96,7 @@ pub struct Value {
/// Metadata of this value.
pub metadata: Metadata,
/// The corresponding content of this value.
pub value: Bytes,
pub value: Buffer,
}

impl Value {
Expand All @@ -107,7 +106,7 @@ impl Value {
metadata: Metadata::new(EntryMode::DIR)
.with_content_length(0)
.with_last_modified(Utc::now()),
value: Bytes::new(),
value: Buffer::new(),
}
}

Expand Down
26 changes: 10 additions & 16 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use std::sync::Arc;
use std::vec::IntoIter;

use async_trait::async_trait;
use bytes::{Buf, Bytes};

use super::Adapter;
use super::Value;
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::{HierarchyLister, QueueBuf};
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -56,8 +55,8 @@ where
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<S: Adapter> Accessor for Backend<S> {
type Reader = Bytes;
type BlockingReader = Bytes;
type Reader = Buffer;
type BlockingReader = Buffer;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Lister = HierarchyLister<KvLister>;
Expand Down Expand Up @@ -238,7 +237,7 @@ pub struct KvWriter<S> {
path: String,

op: OpWrite,
buf: Option<Vec<u8>>,
buf: Option<QueueBuf>,
value: Option<Value>,
}

Expand All @@ -259,7 +258,7 @@ impl<S> KvWriter<S> {
}

fn build(&mut self) -> Value {
let value = self.buf.take().map(Bytes::from).unwrap_or_default();
let value = self.buf.take().map(QueueBuf::collect).unwrap_or_default();

let mut metadata = Metadata::new(EntryMode::FILE);
metadata.set_content_length(value.len() as u64);
Expand All @@ -280,11 +279,9 @@ impl<S> KvWriter<S> {

impl<S: Adapter> oio::Write for KvWriter<S> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let size = bs.chunk().len();

let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size));
buf.extend_from_slice(bs.chunk());

let size = bs.len();
let mut buf = self.buf.take().unwrap_or_default();
buf.push(bs);
self.buf = Some(buf);
Ok(size)
}
Expand All @@ -311,12 +308,9 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
let size = bs.len();

let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size));
buf.extend_from_slice(bs.chunk());

let mut buf = self.buf.take().unwrap_or_default();
buf.push(bs);
self.buf = Some(buf);

Ok(size)
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/raw/oio/buf/queue_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::collections::VecDeque;
/// - `push`: Push a new buffer in the queue.
/// - `collect`: Collect all buffer in the queue as a new [`Buffer`]
/// - `advance`: Advance the queue by `cnt` bytes.
#[derive(Default)]
#[derive(Clone, Default)]
pub struct QueueBuf(VecDeque<Buffer>);

impl QueueBuf {
Expand Down Expand Up @@ -68,13 +68,13 @@ impl QueueBuf {
/// most of them should be acceptable since we can expect the item length of buffers are slower
/// than 4k.
#[inline]
pub fn collect(&self) -> Buffer {
pub fn collect(mut self) -> Buffer {
if self.0.is_empty() {
Buffer::new()
} else if self.0.len() == 1 {
self.0.clone().pop_front().unwrap()
self.0.pop_front().unwrap()
} else {
self.0.clone().into_iter().flatten().collect()
self.0.into_iter().flatten().collect()
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/oio/write/exact_buf_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<W: oio::Write> ExactBufWriter<W> {
impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
if self.buffer.len() >= self.buffer_size {
let written = self.inner.write(self.buffer.collect()).await?;
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
}

Expand All @@ -64,7 +64,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
break;
}

let written = self.inner.write(self.buffer.collect()).await?;
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
}

Expand Down

0 comments on commit 2e9e89b

Please sign in to comment.