Skip to content

Commit d17b206

Browse files
authored
feat: Make AsyncArrowWriter accepts AsyncFileWriter (#5753)
Signed-off-by: Xuanwo <[email protected]>
1 parent 178ef99 commit d17b206

File tree

1 file changed

+60
-14
lines changed
  • parquet/src/arrow/async_writer

1 file changed

+60
-14
lines changed

parquet/src/arrow/async_writer/mod.rs

+60-14
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,62 @@ use crate::{
5959
};
6060
use arrow_array::RecordBatch;
6161
use arrow_schema::SchemaRef;
62+
use bytes::Bytes;
63+
use futures::future::BoxFuture;
64+
use futures::FutureExt;
65+
use std::mem;
6266
use tokio::io::{AsyncWrite, AsyncWriteExt};
6367

64-
/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncWrite`]
68+
/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files
69+
pub trait AsyncFileWriter: Send {
70+
/// Write the provided bytes to the underlying writer
71+
///
72+
/// The underlying writer CAN decide to buffer the data or write it immediately.
73+
/// This design allows the writer implementer to control the buffering and I/O scheduling.
74+
///
75+
/// The underlying writer MAY implement retry logic to prevent breaking users write process.
76+
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
77+
78+
/// Flush any buffered data to the underlying writer and finish writing process.
79+
///
80+
/// After `complete` returns `Ok(())`, caller SHOULD not call write again.
81+
fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
82+
}
83+
84+
impl AsyncFileWriter for Box<dyn AsyncFileWriter> {
85+
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
86+
self.as_mut().write(bs)
87+
}
88+
89+
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
90+
self.as_mut().complete()
91+
}
92+
}
93+
94+
impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
95+
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
96+
async move {
97+
self.write_all(&bs).await?;
98+
Ok(())
99+
}
100+
.boxed()
101+
}
102+
103+
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
104+
async move {
105+
self.flush().await?;
106+
self.shutdown().await?;
107+
Ok(())
108+
}
109+
.boxed()
110+
}
111+
}
112+
113+
/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`]
65114
///
66115
/// ## Memory Usage
67116
///
68-
/// This writer eagerly writes data as soon as possible to the underlying [`AsyncWrite`],
117+
/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`],
69118
/// permitting fine-grained control over buffering and I/O scheduling. However, the columnar
70119
/// nature of parquet forces data for an entire row group to be buffered in memory, before
71120
/// it can be flushed. Depending on the data and the configured row group size, this buffering
@@ -97,7 +146,7 @@ pub struct AsyncArrowWriter<W> {
97146
async_writer: W,
98147
}
99148

100-
impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
149+
impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
101150
/// Try to create a new Async Arrow Writer
102151
pub fn try_new(
103152
writer: W,
@@ -178,28 +227,25 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
178227

179228
// Force to flush the remaining data.
180229
self.do_write().await?;
181-
self.async_writer.shutdown().await?;
230+
self.async_writer.complete().await?;
182231

183232
Ok(metadata)
184233
}
185234

186235
/// Flush the data written by `sync_writer` into the `async_writer`
236+
///
237+
/// # Notes
238+
///
239+
/// This method will take the inner buffer from the `sync_writer` and write it into the
240+
/// async writer. After the write, the inner buffer will be empty.
187241
async fn do_write(&mut self) -> Result<()> {
188-
let buffer = self.sync_writer.inner_mut();
189-
190-
self.async_writer
191-
.write_all(buffer.as_slice())
192-
.await
193-
.map_err(|e| ParquetError::External(Box::new(e)))?;
242+
let buffer = mem::take(self.sync_writer.inner_mut());
194243

195244
self.async_writer
196-
.flush()
245+
.write(Bytes::from(buffer))
197246
.await
198247
.map_err(|e| ParquetError::External(Box::new(e)))?;
199248

200-
// reuse the buffer.
201-
buffer.clear();
202-
203249
Ok(())
204250
}
205251
}

0 commit comments

Comments
 (0)