Skip to content

Commit

Permalink
refactor: Polish RangeWrite implementation to remove the extra buffer…
Browse files Browse the repository at this point in the history
… logic (#3038)

* Build pass

Signed-off-by: Xuanwo <[email protected]>

* Save code

Signed-off-by: Xuanwo <[email protected]>

* Fix panic

Signed-off-by: Xuanwo <[email protected]>

* Fix gcs

Signed-off-by: Xuanwo <[email protected]>

* Remove chunked cursor

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Sep 12, 2023
1 parent 2b83c71 commit 9535d02
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 481 deletions.
4 changes: 4 additions & 0 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ pub enum AsyncBody {
Empty,
/// Body with bytes.
Bytes(Bytes),
/// Body with chunked bytes.
///
/// This is nearly the same with stream, but we can save an extra box.
ChunkedBytes(oio::ChunkedBytes),
/// Body with stream.
Stream(oio::Streamer),
}
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl HttpClient {
req_builder = match body {
AsyncBody::Empty => req_builder.body(reqwest::Body::from("")),
AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)),
AsyncBody::ChunkedBytes(bs) => req_builder.body(reqwest::Body::wrap_stream(bs)),
AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)),
};

Expand Down
1 change: 1 addition & 0 deletions core/src/raw/http_util/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl MixedPart {
bs.len() as u64,
Some(Box::new(oio::Cursor::from(bs)) as Streamer),
),
AsyncBody::ChunkedBytes(bs) => (bs.len() as u64, Some(Box::new(bs) as Streamer)),
AsyncBody::Stream(stream) => {
let len = parts
.headers
Expand Down
22 changes: 21 additions & 1 deletion core/src/raw/oio/buf/chunked_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
// under the License.

use bytes::{Bytes, BytesMut};
use futures::Stream;
use std::cmp::min;
use std::collections::VecDeque;
use std::io::IoSlice;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::raw::*;
Expand Down Expand Up @@ -74,9 +76,9 @@ impl ChunkedBytes {
/// Reference: <https://doc.rust-lang.org/stable/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT,+A%3E%3E-for-VecDeque%3CT,+A%3E>
pub fn from_vec(bs: Vec<Bytes>) -> Self {
Self {
size: bs.iter().map(|v| v.len()).sum(),
frozen: bs.into(),
active: BytesMut::new(),
size: 0,

chunk_size: DEFAULT_CHUNK_SIZE,
}
Expand Down Expand Up @@ -332,6 +334,24 @@ impl oio::Stream for ChunkedBytes {
}
}

impl Stream for ChunkedBytes {
type Item = Result<Bytes>;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.frozen.pop_front() {
Some(bs) => {
self.size -= bs.len();
Poll::Ready(Some(Ok(bs)))
}
None if !self.active.is_empty() => {
self.size -= self.active.len();
Poll::Ready(Some(Ok(self.active.split().freeze())))
}
None => Poll::Ready(None),
}
}
}

#[cfg(test)]
mod tests {
use log::debug;
Expand Down
Loading

0 comments on commit 9535d02

Please sign in to comment.