Skip to content

Commit 79634c0

Browse files
authored
Document parquet writer memory limiting (#5450) (#5457)
* Document parquet writer memory limiting (#5450) * Review feedback * Review feedback
1 parent c6ba0f7 commit 79634c0

File tree

2 files changed

+57
-14
lines changed

2 files changed

+57
-14
lines changed

parquet/src/arrow/arrow_writer/mod.rs

+26
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,32 @@ mod levels;
8080
///
8181
/// assert_eq!(to_write, read);
8282
/// ```
83+
///
84+
/// ## Memory Limiting
85+
///
86+
/// The nature of parquet forces buffering of an entire row group before it can be flushed
87+
/// to the underlying writer. Data is buffered in its encoded form, to reduce memory usage,
88+
/// but if writing rows containing large strings or very nested data, this may still result in
89+
/// non-trivial memory usage.
90+
///
91+
/// [`ArrowWriter::in_progress_size`] can be used to track the size of the buffered row group,
92+
/// and potentially trigger an early flush of a row group based on a memory threshold and/or
93+
/// global memory pressure. However, users should be aware that smaller row groups will result
94+
/// in higher metadata overheads, and may worsen compression ratios and query performance.
95+
///
96+
/// ```no_run
97+
/// # use std::io::Write;
98+
/// # use arrow_array::RecordBatch;
99+
/// # use parquet::arrow::ArrowWriter;
100+
/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
101+
/// # let batch: RecordBatch = todo!();
102+
/// writer.write(&batch).unwrap();
103+
/// // Trigger an early flush if buffered size exceeds 1_000_000
104+
/// if writer.in_progress_size() > 1_000_000 {
105+
/// writer.flush().unwrap();
106+
/// }
107+
/// ```
108+
///
83109
pub struct ArrowWriter<W: Write> {
84110
/// Underlying Parquet writer
85111
writer: SerializedFileWriter<W>,

parquet/src/arrow/async_writer/mod.rs

+31-14
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,29 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
6969
/// It is implemented based on the sync writer [`ArrowWriter`] with an inner buffer.
7070
/// The buffered data will be flushed to the writer provided by caller when the
7171
/// buffer's threshold is exceeded.
72+
///
73+
/// ## Memory Limiting
74+
///
75+
/// The nature of parquet forces buffering of an entire row group before it can be flushed
76+
/// to the underlying writer. This buffering may exceed the configured buffer size
77+
/// of [`AsyncArrowWriter`]. Memory usage can be limited by prematurely flushing the row group,
78+
/// although this will have implications for file size and query performance. See [ArrowWriter]
79+
/// for more information.
80+
///
81+
/// ```no_run
82+
/// # use tokio::fs::File;
83+
/// # use arrow_array::RecordBatch;
84+
/// # use parquet::arrow::AsyncArrowWriter;
85+
/// # async fn test() {
86+
/// let mut writer: AsyncArrowWriter<File> = todo!();
87+
/// let batch: RecordBatch = todo!();
88+
/// writer.write(&batch).await.unwrap();
89+
/// // Trigger an early flush if buffered size exceeds 1_000_000
90+
/// if writer.in_progress_size() > 1_000_000 {
91+
/// writer.flush().await.unwrap()
92+
/// }
93+
/// # }
94+
/// ```
7295
pub struct AsyncArrowWriter<W> {
7396
/// Underlying sync writer
7497
sync_writer: ArrowWriter<SharedBuffer>,
@@ -86,13 +109,10 @@ pub struct AsyncArrowWriter<W> {
86109
impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
87110
/// Try to create a new Async Arrow Writer.
88111
///
89-
/// `buffer_size` determines the number of bytes to buffer before flushing
90-
/// to the underlying [`AsyncWrite`]
91-
///
92-
/// The intermediate buffer will automatically be resized if necessary
93-
///
94-
/// [`Self::write`] will flush this intermediate buffer if it is at least
95-
/// half full
112+
/// `buffer_size` determines the minimum number of bytes to buffer before flushing
113+
/// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may
114+
/// force buffering of data in excess of this within the underlying [`ArrowWriter`].
115+
/// See the documentation on [`ArrowWriter`] for more details
96116
pub fn try_new(
97117
writer: W,
98118
arrow_schema: SchemaRef,
@@ -105,13 +125,10 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
105125

106126
/// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`].
107127
///
108-
/// `buffer_size` determines the number of bytes to buffer before flushing
109-
/// to the underlying [`AsyncWrite`]
110-
///
111-
/// The intermediate buffer will automatically be resized if necessary
112-
///
113-
/// [`Self::write`] will flush this intermediate buffer if it is at least
114-
/// half full
128+
/// `buffer_size` determines the minimum number of bytes to buffer before flushing
129+
/// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may
130+
/// force buffering of data in excess of this within the underlying [`ArrowWriter`].
131+
/// See the documentation on [`ArrowWriter`] for more details
115132
pub fn try_new_with_options(
116133
writer: W,
117134
arrow_schema: SchemaRef,

0 commit comments

Comments
 (0)