diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index 770c7a49c326..8dde42bd2d49 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -19,7 +19,6 @@ //! write support for the various file formats use std::io::Error; -use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -37,7 +36,6 @@ use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::ready; use futures::FutureExt; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; @@ -58,7 +56,7 @@ pub struct AsyncPutWriter { /// A buffer that stores the bytes to be sent current_buffer: Vec, /// Used for async handling in flush method - inner_state: AsyncPutState, + put_future: Option>>, } impl AsyncPutWriter { @@ -68,8 +66,7 @@ impl AsyncPutWriter { object_meta, store, current_buffer: vec![], - // The writer starts out in buffering mode - inner_state: AsyncPutState::Buffer, + put_future: None, } } @@ -79,37 +76,21 @@ impl AsyncPutWriter { &mut self, cx: &mut Context<'_>, ) -> Poll> { - loop { - match &mut self.inner_state { - AsyncPutState::Buffer => { - // Convert the current buffer to bytes and take ownership of it - let bytes = Bytes::from(mem::take(&mut self.current_buffer)); - // Set the inner state to Put variant with the bytes - self.inner_state = AsyncPutState::Put { bytes } - } - AsyncPutState::Put { bytes } => { - // Send the bytes to the object store's put method - return Poll::Ready( - ready!(self - .store - .put(&self.object_meta.location, bytes.clone()) - .poll_unpin(cx)) - .map_err(Error::from), - ); - } + let fut = self.put_future.get_or_insert_with(|| { + let buffer = std::mem::take(&mut self.current_buffer); + let store = Arc::clone(&self.store); + let location = self.object_meta.location.clone(); + + async move { + store.put(&location, buffer.into()).await?; + Ok(()) } - } + .boxed() + }); + fut.poll_unpin(cx) } } -/// An enum that represents the inner state of AsyncPut -enum AsyncPutState { - /// Building Bytes struct in this state - Buffer, - /// Data in the buffer is being sent to the object store - Put { bytes: Bytes }, -} - impl AsyncWrite for AsyncPutWriter { // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct fn poll_write(