Skip to content

Commit 28ca6d1

Browse files
authored
Minor: name some constant values in arrow writer, parquet writer (#8642)
* Minor: name some constant values in arrow writer * Add constants to parquet.rs, update doc comments * fix
1 parent bb99d2a commit 28ca6d1

File tree

5 files changed

+28
-10
lines changed

5 files changed

+28
-10
lines changed

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Apache Arrow format abstractions
18+
//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions
1919
//!
2020
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
2121
@@ -58,6 +58,13 @@ use super::file_compression_type::FileCompressionType;
5858
use super::write::demux::start_demuxer_task;
5959
use super::write::{create_writer, SharedBuffer};
6060

61+
/// Initial writing buffer size. Note this is just a size hint for efficiency. It
62+
/// will grow beyond the set value if needed.
63+
const INITIAL_BUFFER_BYTES: usize = 1048576;
64+
65+
/// If the buffered Arrow data exceeds this size, it is flushed to object store
66+
const BUFFER_FLUSH_BYTES: usize = 1024000;
67+
6168
/// Arrow `FileFormat` implementation.
6269
#[derive(Default, Debug)]
6370
pub struct ArrowFormat;
@@ -239,7 +246,7 @@ impl DataSink for ArrowFileSink {
239246
IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)?
240247
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
241248
while let Some((path, mut rx)) = file_stream_rx.recv().await {
242-
let shared_buffer = SharedBuffer::new(1048576);
249+
let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES);
243250
let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options(
244251
shared_buffer.clone(),
245252
&self.get_writer_schema(),
@@ -257,7 +264,7 @@ impl DataSink for ArrowFileSink {
257264
row_count += batch.num_rows();
258265
arrow_writer.write(&batch)?;
259266
let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap();
260-
if buff_to_flush.len() > 1024000 {
267+
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
261268
object_store_writer
262269
.write_all(buff_to_flush.as_slice())
263270
.await?;

datafusion/core/src/datasource/file_format/avro.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Apache Avro format abstractions
18+
//! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions
1919
2020
use std::any::Any;
2121
use std::sync::Arc;

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! CSV format abstractions
18+
//! [`CsvFormat`], Comma Separated Value (CSV) [`FileFormat`] abstractions
1919
2020
use std::any::Any;
2121
use std::collections::HashSet;

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Line delimited JSON format abstractions
18+
//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions
1919
2020
use std::any::Any;
2121
use std::fmt;

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Parquet format abstractions
18+
//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
1919
2020
use arrow_array::RecordBatch;
2121
use async_trait::async_trait;
@@ -75,6 +75,17 @@ use crate::physical_plan::{
7575
Statistics,
7676
};
7777

78+
/// Size of the buffer for [`AsyncArrowWriter`].
79+
const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760;
80+
81+
/// Initial writing buffer size. Note this is just a size hint for efficiency. It
82+
/// will grow beyond the set value if needed.
83+
const INITIAL_BUFFER_BYTES: usize = 1048576;
84+
85+
/// When writing parquet files in parallel, if the buffered Parquet data exceeds
86+
/// this size, it is flushed to object store
87+
const BUFFER_FLUSH_BYTES: usize = 1024000;
88+
7889
/// The Apache Parquet `FileFormat` implementation
7990
///
8091
/// Note it is recommended these are instead configured on the [`ConfigOptions`]
@@ -680,7 +691,7 @@ impl ParquetSink {
680691
let writer = AsyncArrowWriter::try_new(
681692
multipart_writer,
682693
self.get_writer_schema(),
683-
10485760,
694+
PARQUET_WRITER_BUFFER_SIZE,
684695
Some(parquet_props),
685696
)?;
686697
Ok(writer)
@@ -1004,7 +1015,7 @@ async fn concatenate_parallel_row_groups(
10041015
writer_props: Arc<WriterProperties>,
10051016
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
10061017
) -> Result<usize> {
1007-
let merged_buff = SharedBuffer::new(1048576);
1018+
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
10081019

10091020
let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
10101021
let mut parquet_writer = SerializedFileWriter::new(
@@ -1025,7 +1036,7 @@ async fn concatenate_parallel_row_groups(
10251036
for chunk in serialized_columns {
10261037
chunk.append_to_row_group(&mut rg_out)?;
10271038
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1028-
if buff_to_flush.len() > 1024000 {
1039+
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
10291040
object_store_writer
10301041
.write_all(buff_to_flush.as_slice())
10311042
.await?;

0 commit comments

Comments
 (0)