diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index 7583f443744..474f7489780 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -40,6 +40,10 @@ pub enum AsyncBody { Empty, /// Body with bytes. Bytes(Bytes), + /// Body with chunked bytes. + /// + /// This is nearly the same with stream, but we can save an extra box. + ChunkedBytes(oio::ChunkedBytes), /// Body with stream. Stream(oio::Streamer), } diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index d0926d1cd1f..783eaf8fbd1 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -101,6 +101,7 @@ impl HttpClient { req_builder = match body { AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), + AsyncBody::ChunkedBytes(bs) => req_builder.body(reqwest::Body::wrap_stream(bs)), AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)), }; diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index edc061c1f2d..a6c6f765278 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -410,6 +410,7 @@ impl MixedPart { bs.len() as u64, Some(Box::new(oio::Cursor::from(bs)) as Streamer), ), + AsyncBody::ChunkedBytes(bs) => (bs.len() as u64, Some(Box::new(bs) as Streamer)), AsyncBody::Stream(stream) => { let len = parts .headers diff --git a/core/src/raw/oio/buf/chunked_bytes.rs b/core/src/raw/oio/buf/chunked_bytes.rs index 82447dc62b4..810c0dfda39 100644 --- a/core/src/raw/oio/buf/chunked_bytes.rs +++ b/core/src/raw/oio/buf/chunked_bytes.rs @@ -16,9 +16,11 @@ // under the License. use bytes::{Bytes, BytesMut}; +use futures::Stream; use std::cmp::min; use std::collections::VecDeque; use std::io::IoSlice; +use std::pin::Pin; use std::task::{Context, Poll}; use crate::raw::*; @@ -74,9 +76,9 @@ impl ChunkedBytes { /// Reference: pub fn from_vec(bs: Vec) -> Self { Self { + size: bs.iter().map(|v| v.len()).sum(), frozen: bs.into(), active: BytesMut::new(), - size: 0, chunk_size: DEFAULT_CHUNK_SIZE, } @@ -332,6 +334,24 @@ impl oio::Stream for ChunkedBytes { } } +impl Stream for ChunkedBytes { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + match self.frozen.pop_front() { + Some(bs) => { + self.size -= bs.len(); + Poll::Ready(Some(Ok(bs))) + } + None if !self.active.is_empty() => { + self.size -= self.active.len(); + Poll::Ready(Some(Ok(self.active.split().freeze()))) + } + None => Poll::Ready(None), + } + } +} + #[cfg(test)] mod tests { use log::debug; diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index ddead52c9d5..0cc4e42a5df 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::Ordering; -use std::collections::VecDeque; use std::io::Read; use std::io::SeekFrom; use std::task::Context; @@ -174,379 +172,3 @@ impl oio::Stream for Cursor { Poll::Ready(Ok(())) } } - -/// ChunkedCursor is used represents a non-contiguous bytes in memory. -/// -/// This is useful when we buffer users' random writes without copy. ChunkedCursor implements -/// [`oio::Stream`] so it can be used in [`oio::Write::copy_from`] directly. -/// -/// # TODO -/// -/// we can do some compaction during runtime. For example, merge 4K data -/// into the same bytes instead. -#[derive(Clone)] -pub struct ChunkedCursor { - inner: VecDeque, - idx: usize, -} - -impl Default for ChunkedCursor { - fn default() -> Self { - Self::new() - } -} - -impl ChunkedCursor { - /// Create a new chunked cursor. - pub fn new() -> Self { - Self { - inner: VecDeque::new(), - idx: 0, - } - } - - /// Returns `true` if current cursor is empty. - pub fn is_empty(&self) -> bool { - self.inner.len() <= self.idx - } - - /// Return current bytes size of cursor. - pub fn len(&self) -> usize { - self.inner.iter().skip(self.idx).map(|v| v.len()).sum() - } - - /// Clear the entire cursor. - pub fn clear(&mut self) { - self.idx = 0; - self.inner.clear(); - } - - /// Push a new bytes into vector cursor. - pub fn push(&mut self, bs: Bytes) { - self.inner.push_back(bs); - } - - /// split_off will split the cursor into two cursors at given size. - /// - /// After split, `self` will contains the `0..at` part and the returned cursor contains - /// `at..` parts. - /// - /// # Panics - /// - /// - Panics if `at > len` - /// - Panics if `idx != 0`, the cursor must be reset before split. - pub fn split_off(&mut self, at: usize) -> Self { - assert!( - at <= self.len(), - "split_off at must smaller than current size" - ); - assert_eq!(self.idx, 0, "split_off must reset cursor first"); - - let mut chunks = VecDeque::new(); - let mut size = self.len() - at; - - while let Some(mut bs) = self.inner.pop_back() { - match size.cmp(&bs.len()) { - Ordering::Less => { - let remaining = bs.split_off(bs.len() - size); - chunks.push_front(remaining); - self.inner.push_back(bs); - break; - } - Ordering::Equal => { - chunks.push_front(bs); - break; - } - Ordering::Greater => { - size -= bs.len(); - chunks.push_front(bs); - } - } - } - - Self { - inner: chunks, - idx: 0, - } - } - - /// split_to will split the cursor into two cursors at given size. - /// - /// After split, `self` will contains the `at..` part and the returned cursor contains - /// `0..at` parts. - /// - /// # Panics - /// - /// - Panics if `at > len` - /// - Panics if `idx != 0`, the cursor must be reset before split. - pub fn split_to(&mut self, at: usize) -> Self { - assert!( - at <= self.len(), - "split_to at must smaller than current size" - ); - assert_eq!(self.idx, 0, "split_to must reset cursor first"); - - let mut chunks = VecDeque::new(); - let mut size = at; - - while let Some(mut bs) = self.inner.pop_front() { - match size.cmp(&bs.len()) { - Ordering::Less => { - let remaining = bs.split_off(size); - chunks.push_back(bs); - self.inner.push_front(remaining); - break; - } - Ordering::Equal => { - chunks.push_back(bs); - break; - } - Ordering::Greater => { - size -= bs.len(); - chunks.push_back(bs); - } - } - } - - Self { - inner: chunks, - idx: 0, - } - } - - #[cfg(test)] - fn concat(&self) -> Bytes { - let mut bs = bytes::BytesMut::new(); - for v in self.inner.iter().skip(self.idx) { - bs.extend_from_slice(v); - } - bs.freeze() - } -} - -impl oio::Stream for ChunkedCursor { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - if self.is_empty() { - return Poll::Ready(None); - } - - let bs = self.inner[self.idx].clone(); - self.idx += 1; - Poll::Ready(Some(Ok(bs))) - } - - fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll> { - self.idx = 0; - Poll::Ready(Ok(())) - } -} - -#[cfg(test)] -mod tests { - use pretty_assertions::assert_eq; - use rand::thread_rng; - use rand::Rng; - use rand::RngCore; - use sha2::Digest; - use sha2::Sha256; - - use super::*; - use crate::raw::oio::StreamExt; - - #[tokio::test] - async fn test_chunked_cursor() -> Result<()> { - let mut c = ChunkedCursor::new(); - - c.push(Bytes::from("hello")); - assert_eq!(c.len(), 5); - assert!(!c.is_empty()); - - c.push(Bytes::from("world")); - assert_eq!(c.len(), 10); - assert!(!c.is_empty()); - - let bs = c.next().await.unwrap().unwrap(); - assert_eq!(bs, Bytes::from("hello")); - assert_eq!(c.len(), 5); - assert!(!c.is_empty()); - - let bs = c.next().await.unwrap().unwrap(); - assert_eq!(bs, Bytes::from("world")); - assert_eq!(c.len(), 0); - assert!(c.is_empty()); - - c.reset().await?; - assert_eq!(c.len(), 10); - assert!(!c.is_empty()); - - c.clear(); - assert_eq!(c.len(), 0); - assert!(c.is_empty()); - - Ok(()) - } - - #[test] - fn test_chunked_cursor_split_to() { - let mut base = ChunkedCursor::new(); - base.push(Bytes::from("Hello")); - base.push(Bytes::from("Wor")); - base.push(Bytes::from("ld")); - - // Case 1: split less than first chunk - let mut c1 = base.clone(); - let c2 = c1.split_to(3); - - assert_eq!(c1.len(), 7); - assert_eq!( - &c1.inner, - &[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")] - ); - - assert_eq!(c2.len(), 3); - assert_eq!(&c2.inner, &[Bytes::from("Hel")]); - - // Case 2: split larger than first chunk - let mut c1 = base.clone(); - let c2 = c1.split_to(6); - - assert_eq!(c1.len(), 4); - assert_eq!(&c1.inner, &[Bytes::from("or"), Bytes::from("ld")]); - - assert_eq!(c2.len(), 6); - assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("W")]); - - // Case 3: split at chunk edge - let mut c1 = base.clone(); - let c2 = c1.split_to(8); - - assert_eq!(c1.len(), 2); - assert_eq!(&c1.inner, &[Bytes::from("ld")]); - - assert_eq!(c2.len(), 8); - assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]); - } - - #[test] - fn test_chunked_cursor_split_off() { - let mut base = ChunkedCursor::new(); - base.push(Bytes::from("Hello")); - base.push(Bytes::from("Wor")); - base.push(Bytes::from("ld")); - - // Case 1: split less than first chunk - let mut c1 = base.clone(); - let c2 = c1.split_off(3); - - assert_eq!(c1.len(), 3); - assert_eq!(&c1.inner, &[Bytes::from("Hel")]); - - assert_eq!(c2.len(), 7); - assert_eq!( - &c2.inner, - &[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")] - ); - - // Case 2: split larger than first chunk - let mut c1 = base.clone(); - let c2 = c1.split_off(6); - - assert_eq!(c1.len(), 6); - assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("W")]); - - assert_eq!(c2.len(), 4); - assert_eq!(&c2.inner, &[Bytes::from("or"), Bytes::from("ld")]); - - // Case 3: split at chunk edge - let mut c1 = base.clone(); - let c2 = c1.split_off(8); - - assert_eq!(c1.len(), 8); - assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]); - - assert_eq!(c2.len(), 2); - assert_eq!(&c2.inner, &[Bytes::from("ld")]); - } - - #[test] - fn test_fuzz_chunked_cursor_split_to() { - let mut rng = thread_rng(); - let mut expected = vec![]; - let mut total_size = 0; - - let mut cursor = ChunkedCursor::new(); - - // Build Cursor - let count = rng.gen_range(1..1000); - for _ in 0..count { - let size = rng.gen_range(1..100); - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - total_size += size; - - expected.extend_from_slice(&content); - cursor.push(Bytes::from(content)); - } - - // Test Cursor - for _ in 0..count { - let mut cursor = cursor.clone(); - - let at = rng.gen_range(0..total_size); - let to = cursor.split_to(at); - - assert_eq!(cursor.len(), total_size - at); - assert_eq!( - format!("{:x}", Sha256::digest(&cursor.concat())), - format!("{:x}", Sha256::digest(&expected[at..])), - ); - - assert_eq!(to.len(), at); - assert_eq!( - format!("{:x}", Sha256::digest(&to.concat())), - format!("{:x}", Sha256::digest(&expected[0..at])), - ); - } - } - - #[test] - fn test_fuzz_chunked_cursor_split_off() { - let mut rng = thread_rng(); - let mut expected = vec![]; - let mut total_size = 0; - - let mut cursor = ChunkedCursor::new(); - - // Build Cursor - let count = rng.gen_range(1..1000); - for _ in 0..count { - let size = rng.gen_range(1..100); - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - total_size += size; - - expected.extend_from_slice(&content); - cursor.push(Bytes::from(content)); - } - - // Test Cursor - for _ in 0..count { - let mut cursor = cursor.clone(); - - let at = rng.gen_range(0..total_size); - let off = cursor.split_off(at); - - assert_eq!(cursor.len(), at); - assert_eq!( - format!("{:x}", Sha256::digest(&cursor.concat())), - format!("{:x}", Sha256::digest(&expected[..at])), - ); - - assert_eq!(off.len(), total_size - at); - assert_eq!( - format!("{:x}", Sha256::digest(&off.concat())), - format!("{:x}", Sha256::digest(&expected[at..])), - ); - } - } -} diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index 29cf8e474b6..9e52acfd263 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -35,7 +35,6 @@ mod page; pub use page::*; mod cursor; -pub use cursor::ChunkedCursor; pub use cursor::Cursor; mod entry; diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index ec42275f048..6edeb96a88f 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -21,7 +21,6 @@ use std::task::Context; use std::task::Poll; use async_trait::async_trait; -use bytes::Bytes; use futures::future::BoxFuture; use crate::raw::*; @@ -109,7 +108,7 @@ pub struct MultipartUploadPart { pub struct MultipartUploadWriter { state: State, - cache: Option, + cache: Option, upload_id: Option>, parts: Vec, } @@ -163,7 +162,7 @@ where &upload_id, part_number, size as u64, - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), ) .await; @@ -174,7 +173,8 @@ where // Fill cache with the first write. if self.cache.is_none() { let size = bs.remaining(); - self.cache = Some(bs.bytes(size)); + let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); + self.cache = Some(cb); return Poll::Ready(Ok(size)); } @@ -197,7 +197,8 @@ where self.parts.push(part?); // Replace the cache when last write succeeded let size = bs.remaining(); - self.cache = Some(bs.bytes(size)); + let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); + self.cache = Some(cb); return Poll::Ready(Ok(size)); } State::Close(_) => { @@ -232,7 +233,7 @@ where &upload_id, parts.len(), size as u64, - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), ) .await; (w, part) @@ -250,7 +251,9 @@ where Some(bs) => { self.state = State::Close(Box::pin(async move { let size = bs.len(); - let res = w.write_once(size as u64, AsyncBody::Bytes(bs)).await; + let res = w + .write_once(size as u64, AsyncBody::ChunkedBytes(bs)) + .await; (w, res) })); } diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 9dfe73197e1..9ae017a5e71 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -50,6 +50,13 @@ use crate::*; /// - Expose `RangeWriter` as `Accessor::Writer` #[async_trait] pub trait RangeWrite: Send + Sync + Unpin + 'static { + /// write_once is used to write the data to underlying storage at once. + /// + /// RangeWriter will call this API when: + /// + /// - All the data has been written to the buffer and we can perform the upload at once. + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>; + /// Initiate range the range write, the returning value is the location. async fn initiate_range(&self) -> Result; @@ -79,8 +86,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static { pub struct RangeWriter { location: Option, written: u64, - align_size: usize, - align_buffer: oio::ChunkedCursor, + buffer: Option, state: State, } @@ -88,8 +94,7 @@ pub struct RangeWriter { enum State { Idle(Option), Init(BoxFuture<'static, (W, Result)>), - /// The returning value is (consume size, written size). - Write(BoxFuture<'static, (W, Result<(usize, u64)>)>), + Write(BoxFuture<'static, (W, Result)>), Complete(BoxFuture<'static, (W, Result<()>)>), Abort(BoxFuture<'static, (W, Result<()>)>), } @@ -105,25 +110,11 @@ impl RangeWriter { Self { state: State::Idle(Some(inner)), + buffer: None, location: None, written: 0, - align_size: 256 * 1024, - align_buffer: oio::ChunkedCursor::default(), } } - - /// Set the align size. - /// - /// The size is default to 256 KiB. - /// - /// # Note - /// - /// Please don't mix this with the buffer size. Align size is usually the hard - /// limit for the service to accept the chunk. - pub fn with_align_size(mut self, size: usize) -> Self { - self.align_size = size; - self - } } impl oio::Write for RangeWriter { @@ -133,51 +124,38 @@ impl oio::Write for RangeWriter { State::Idle(w) => { match self.location.clone() { Some(location) => { - let remaining = bs.remaining(); - let current_size = self.align_buffer.len(); - let mut total_size = current_size + remaining; - - if total_size <= self.align_size { - let bs = bs.bytes(remaining); - self.align_buffer.push(bs); - return Poll::Ready(Ok(remaining)); - } - // If total_size is aligned, we need to write one less chunk to make sure - // that the file has at least one chunk during complete stage. - if total_size % self.align_size == 0 { - total_size -= self.align_size; - } - - let consume = total_size - total_size % self.align_size - current_size; - let mut align_buffer = self.align_buffer.clone(); - let bs = bs.bytes(consume); - align_buffer.push(bs); - let written = self.written; - let w = w.take().unwrap(); - let fut = async move { - let size = align_buffer.len() as u64; + + let buffer = self.buffer.clone().expect("cache must be valid").clone(); + let w = w.take().expect("writer must be valid"); + self.state = State::Write(Box::pin(async move { + let size = buffer.len() as u64; let res = w .write_range( &location, written, size, - AsyncBody::Stream(Box::new(align_buffer)), + AsyncBody::ChunkedBytes(buffer), ) .await; - (w, res.map(|_| (consume, size))) - }; - self.state = State::Write(Box::pin(fut)); + (w, res.map(|_| size)) + })); } None => { - let w = w.take().unwrap(); - let fut = async move { - let res = w.initiate_range().await; + // Fill cache with the first write. + if self.buffer.is_none() { + let size = bs.remaining(); + let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); + self.buffer = Some(cb); + return Poll::Ready(Ok(size)); + } - (w, res) - }; - self.state = State::Init(Box::pin(fut)); + let w = w.take().expect("writer must be valid"); + self.state = State::Init(Box::pin(async move { + let location = w.initiate_range().await; + (w, location) + })); } } } @@ -187,15 +165,16 @@ impl oio::Write for RangeWriter { self.location = Some(res?); } State::Write(fut) => { - let (w, res) = ready!(fut.poll_unpin(cx)); + let (w, size) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); - let (consume, written) = res?; - self.written += written; - self.align_buffer.clear(); - // It's possible that the buffer is already aligned, no bytes has been consumed. - if consume != 0 { - return Poll::Ready(Ok(consume)); - } + // Update the written. + self.written += size?; + + // Replace the cache when last write succeeded + let size = bs.remaining(); + let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); + self.buffer = Some(cb); + return Poll::Ready(Ok(size)); } State::Complete(_) => { unreachable!("RangeWriter must not go into State::Complete during poll_write") @@ -211,28 +190,41 @@ impl oio::Write for RangeWriter { loop { match &mut self.state { State::Idle(w) => { - let w = w.take().unwrap(); + let w = w.take().expect("writer must be valid"); match self.location.clone() { Some(location) => { - let align_buffer = self.align_buffer.clone(); - let written = self.written; - let fut = async move { - let size = align_buffer.len() as u64; - let res = w - .complete_range( - &location, - written, - size, - AsyncBody::Stream(Box::new(align_buffer)), - ) - .await; - - (w, res) - }; - self.state = State::Complete(Box::pin(fut)); + match self.buffer.clone() { + Some(bs) => { + self.state = State::Complete(Box::pin(async move { + let res = w + .complete_range( + &location, + written, + bs.len() as u64, + AsyncBody::ChunkedBytes(bs), + ) + .await; + (w, res) + })); + } + None => { + unreachable!("It's must be bug that RangeWrite is in State::Idle with no cache but has location") + } + } } - None => return Poll::Ready(Ok(())), + None => match self.buffer.clone() { + Some(bs) => { + self.state = State::Complete(Box::pin(async move { + let size = bs.len(); + let res = w + .write_once(size as u64, AsyncBody::ChunkedBytes(bs)) + .await; + (w, res) + })); + } + None => return Poll::Ready(Ok(())), + }, } } State::Init(_) => { @@ -244,7 +236,6 @@ impl oio::Write for RangeWriter { State::Complete(fut) => { let (w, res) = ready!(fut.poll_unpin(cx)); self.state = State::Idle(Some(w)); - self.align_buffer.clear(); return Poll::Ready(res); } State::Abort(_) => { @@ -283,7 +274,7 @@ impl oio::Write for RangeWriter { State::Abort(fut) => { let (w, res) = ready!(fut.poll_unpin(cx)); self.state = State::Idle(Some(w)); - self.align_buffer.clear(); + self.buffer = None; return Poll::Ready(res); } } diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index f91c792a078..9e26cdce184 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -337,7 +338,7 @@ pub struct GcsBackend { impl Accessor for GcsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = GcsWriters; + type Writer = oio::TwoWaysWriter>; type BlockingWriter = (); type Pager = GcsPager; type BlockingPager = (); @@ -420,10 +421,15 @@ impl Accessor for GcsBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let w = GcsWriter::new(self.core.clone(), path, args.clone()); - let w = if args.content_length().is_some() { - GcsWriters::One(oio::OneShotWriter::new(w)) + let w = oio::RangeWriter::new(w); + + let w = if let Some(buffer_size) = args.buffer_size() { + // FIXME: we should align with 256KiB instead. + let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size); + + oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size)) } else { - GcsWriters::Two(oio::RangeWriter::new(w)) + oio::TwoWaysWriter::One(w) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 06d772e0698..40acf42bcda 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -299,6 +299,9 @@ impl GcsCore { AsyncBody::Bytes(bytes) => { media_part = media_part.content(bytes); } + AsyncBody::ChunkedBytes(bs) => { + media_part = media_part.stream(bs.len() as u64, Box::new(bs)); + } AsyncBody::Stream(stream) => { media_part = media_part.stream(size.unwrap(), stream); } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 1b22acd7bf3..d55d6d7a95b 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::GcsCore; @@ -26,8 +25,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type GcsWriters = - oio::TwoWaysWriter, oio::RangeWriter>; +pub type GcsWriters = oio::RangeWriter; pub struct GcsWriter { core: Arc, @@ -46,13 +44,13 @@ impl GcsWriter { } #[async_trait] -impl oio::OneShotWrite for GcsWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { +impl oio::RangeWrite for GcsWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core.gcs_insert_object_request( &percent_encode_path(&self.path), - Some(bs.len() as u64), + Some(size), &self.op, - AsyncBody::Bytes(bs), + body, )?; self.core.sign(&mut req).await?; @@ -69,10 +67,7 @@ impl oio::OneShotWrite for GcsWriter { _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::RangeWrite for GcsWriter { async fn initiate_range(&self) -> Result { let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?; let status = resp.status();