Skip to content

Commit

Permalink
Merge branch 'main' into kv-write-buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo committed Apr 16, 2024
2 parents a4df302 + 2e9e89b commit 8fde30d
Show file tree
Hide file tree
Showing 19 changed files with 80 additions and 97 deletions.
2 changes: 1 addition & 1 deletion bin/oli/src/commands/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> {

let reader = op.reader(&path).await?;
let meta = op.stat(&path).await?;
let mut buf_reader = reader.into_futures_io_async_read(0..meta.content_length());
let mut buf_reader = reader.into_futures_async_read(0..meta.content_length());
let mut stdout = io::AllowStdIo::new(std::io::stdout());
io::copy_buf(&mut buf_reader, &mut stdout).await?;
Ok(())
Expand Down
11 changes: 4 additions & 7 deletions bin/oli/src/commands/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
let (dst_op, dst_path) = cfg.parse_location(dst)?;

if !recursive {
let mut dst_w = dst_op
.writer(&dst_path)
.await?
.into_futures_io_async_write();
let mut dst_w = dst_op.writer(&dst_path).await?.into_futures_async_write();
let src_meta = src_op.stat(&src_path).await?;
let reader = src_op.reader_with(&src_path).chunk(8 * 1024 * 1024).await?;
let buf_reader = reader.into_futures_io_async_read(0..src_meta.content_length());
let buf_reader = reader.into_futures_async_read(0..src_meta.content_length());
futures::io::copy_buf(buf_reader, &mut dst_w).await?;
// flush data
dst_w.close().await?;
Expand All @@ -74,12 +71,12 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
.strip_prefix(prefix)
.expect("invalid path");
let reader = src_op.reader_with(de.path()).chunk(8 * 1024 * 1024).await?;
let buf_reader = reader.into_futures_io_async_read(0..meta.content_length());
let buf_reader = reader.into_futures_async_read(0..meta.content_length());

let mut writer = dst_op
.writer(&dst_root.join(fp).to_string_lossy())
.await?
.into_futures_io_async_write();
.into_futures_async_write();

println!("Copying {}", de.path());
futures::io::copy_buf(buf_reader, &mut writer).await?;
Expand Down
4 changes: 2 additions & 2 deletions bindings/c/include/opendal.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ typedef struct OperatorInfo OperatorInfo;
*
* StdReader also implements [`Send`] and [`Sync`].
*/
typedef struct StdIoReader StdIoReader;
typedef struct StdReader StdReader;

/**
* \brief opendal_bytes carries raw-bytes with its length
Expand Down Expand Up @@ -408,7 +408,7 @@ typedef struct opendal_result_read {
* a opendal::BlockingReader, which is inside the Rust core code.
*/
typedef struct opendal_reader {
struct StdIoReader *inner;
struct StdReader *inner;
} opendal_reader;

/**
Expand Down
4 changes: 2 additions & 2 deletions bindings/c/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ use super::*;
/// a opendal::BlockingReader, which is inside the Rust core code.
#[repr(C)]
pub struct opendal_reader {
inner: *mut core::StdIoReader,
inner: *mut core::StdReader,
}

impl opendal_reader {
pub(crate) fn new(reader: core::BlockingReader, size: u64) -> Self {
Self {
inner: Box::into_raw(Box::new(reader.into_std_io_read(0..size))),
inner: Box::into_raw(Box::new(reader.into_std_read(0..size))),
}
}

Expand Down
4 changes: 1 addition & 3 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ impl Operator {
fn reader(&self, path: &str) -> Result<Box<Reader>> {
let meta = self.0.stat(path)?;
Ok(Box::new(Reader(
self.0
.reader(path)?
.into_std_io_read(0..meta.content_length()),
self.0.reader(path)?.into_std_read(0..meta.content_length()),
)))
}

Expand Down
2 changes: 1 addition & 1 deletion bindings/cpp/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::io::{Read, Seek};

use super::ffi;

pub struct Reader(pub od::StdIoReader);
pub struct Reader(pub od::StdReader);

impl Reader {
pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
Expand Down
8 changes: 4 additions & 4 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Operator {
let meta = self.0.stat(&path).await.map_err(format_napi_error)?;
let r = self.0.reader(&path).await.map_err(format_napi_error)?;
Ok(Reader {
inner: r.into_futures_io_async_read(0..meta.content_length()),
inner: r.into_futures_async_read(0..meta.content_length()),
})
}

Expand Down Expand Up @@ -220,7 +220,7 @@ impl Operator {
let meta = self.0.blocking().stat(&path).map_err(format_napi_error)?;
let r = self.0.blocking().reader(&path).map_err(format_napi_error)?;
Ok(BlockingReader {
inner: r.into_std_io_read(0..meta.content_length()),
inner: r.into_std_read(0..meta.content_length()),
})
}

Expand Down Expand Up @@ -660,7 +660,7 @@ pub struct ListOptions {
/// manner.
#[napi]
pub struct BlockingReader {
inner: opendal::StdIoReader,
inner: opendal::StdReader,
}

#[napi]
Expand All @@ -677,7 +677,7 @@ impl BlockingReader {
/// manner.
#[napi]
pub struct Reader {
inner: opendal::FuturesIoAsyncReader,
inner: opendal::FuturesAsyncReader,
}

#[napi]
Expand Down
11 changes: 4 additions & 7 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,14 @@ use crate::*;
pub struct File(FileState, Capability);

enum FileState {
Reader(ocore::StdIoReader),
Reader(ocore::StdReader),
Writer(ocore::BlockingWriter),
Closed,
}

impl File {
pub fn new_reader(reader: ocore::BlockingReader, size: u64, capability: Capability) -> Self {
Self(
FileState::Reader(reader.into_std_io_read(0..size)),
capability,
)
Self(FileState::Reader(reader.into_std_read(0..size)), capability)
}

pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self {
Expand Down Expand Up @@ -289,7 +286,7 @@ impl File {
pub struct AsyncFile(Arc<Mutex<AsyncFileState>>, Capability);

enum AsyncFileState {
Reader(ocore::FuturesIoAsyncReader),
Reader(ocore::FuturesAsyncReader),
Writer(ocore::Writer),
Closed,
}
Expand All @@ -298,7 +295,7 @@ impl AsyncFile {
pub fn new_reader(reader: ocore::Reader, size: u64, capability: Capability) -> Self {
Self(
Arc::new(Mutex::new(AsyncFileState::Reader(
reader.into_futures_io_async_read(0..size),
reader.into_futures_async_read(0..size),
))),
capability,
)
Expand Down
2 changes: 1 addition & 1 deletion core/benches/ops/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn bench_read_full(c: &mut Criterion, name: &str, op: Operator) {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let r = op.reader_with(path).await.unwrap();

let r = r.into_futures_io_async_read(0..size.bytes() as u64);
let r = r.into_futures_async_read(0..size.bytes() as u64);
io::copy(r, &mut io::sink()).await.unwrap();
})
});
Expand Down
7 changes: 3 additions & 4 deletions core/src/raw/adapters/typed_kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use std::fmt::Debug;
use std::mem::size_of;

use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;

use crate::EntryMode;
use crate::Error;
use crate::ErrorKind;
use crate::Metadata;
use crate::Result;
use crate::Scheme;
use crate::{Buffer, EntryMode};

/// Adapter is the typed adapter to underlying kv services.
///
Expand Down Expand Up @@ -97,7 +96,7 @@ pub struct Value {
/// Metadata of this value.
pub metadata: Metadata,
/// The corresponding content of this value.
pub value: Bytes,
pub value: Buffer,
}

impl Value {
Expand All @@ -107,7 +106,7 @@ impl Value {
metadata: Metadata::new(EntryMode::DIR)
.with_content_length(0)
.with_last_modified(Utc::now()),
value: Bytes::new(),
value: Buffer::new(),
}
}

Expand Down
26 changes: 10 additions & 16 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use std::sync::Arc;
use std::vec::IntoIter;

use async_trait::async_trait;
use bytes::{Buf, Bytes};

use super::Adapter;
use super::Value;
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::{HierarchyLister, QueueBuf};
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -56,8 +55,8 @@ where
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<S: Adapter> Accessor for Backend<S> {
type Reader = Bytes;
type BlockingReader = Bytes;
type Reader = Buffer;
type BlockingReader = Buffer;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Lister = HierarchyLister<KvLister>;
Expand Down Expand Up @@ -238,7 +237,7 @@ pub struct KvWriter<S> {
path: String,

op: OpWrite,
buf: Option<Vec<u8>>,
buf: Option<QueueBuf>,
value: Option<Value>,
}

Expand All @@ -259,7 +258,7 @@ impl<S> KvWriter<S> {
}

fn build(&mut self) -> Value {
let value = self.buf.take().map(Bytes::from).unwrap_or_default();
let value = self.buf.take().map(QueueBuf::collect).unwrap_or_default();

let mut metadata = Metadata::new(EntryMode::FILE);
metadata.set_content_length(value.len() as u64);
Expand All @@ -280,11 +279,9 @@ impl<S> KvWriter<S> {

impl<S: Adapter> oio::Write for KvWriter<S> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let size = bs.chunk().len();

let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size));
buf.extend_from_slice(bs.chunk());

let size = bs.len();
let mut buf = self.buf.take().unwrap_or_default();
buf.push(bs);
self.buf = Some(buf);
Ok(size)
}
Expand All @@ -311,12 +308,9 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
let size = bs.len();

let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size));
buf.extend_from_slice(bs.chunk());

let mut buf = self.buf.take().unwrap_or_default();
buf.push(bs);
self.buf = Some(buf);

Ok(size)
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/raw/oio/buf/queue_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::collections::VecDeque;
/// - `push`: Push a new buffer in the queue.
/// - `collect`: Collect all buffer in the queue as a new [`Buffer`]
/// - `advance`: Advance the queue by `cnt` bytes.
#[derive(Default)]
#[derive(Clone, Default)]
pub struct QueueBuf(VecDeque<Buffer>);

impl QueueBuf {
Expand Down Expand Up @@ -68,13 +68,13 @@ impl QueueBuf {
/// most of them should be acceptable since we can expect the item length of buffers are slower
/// than 4k.
#[inline]
pub fn collect(&self) -> Buffer {
pub fn collect(mut self) -> Buffer {
if self.0.is_empty() {
Buffer::new()
} else if self.0.len() == 1 {
self.0.clone().pop_front().unwrap()
self.0.pop_front().unwrap()
} else {
self.0.clone().into_iter().flatten().collect()
self.0.into_iter().flatten().collect()
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/oio/write/exact_buf_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<W: oio::Write> ExactBufWriter<W> {
impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
if self.buffer.len() >= self.buffer_size {
let written = self.inner.write(self.buffer.collect()).await?;
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
}

Expand All @@ -64,7 +64,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
break;
}

let written = self.inner.write(self.buffer.collect()).await?;
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
}

Expand Down
Loading

0 comments on commit 8fde30d

Please sign in to comment.