Skip to content

Commit 6effa7f

Browse files
committed
Add WriterProperties::bloom_filter_position
1 parent 5daf96f commit 6effa7f

File tree

3 files changed

+44
-2
lines changed

3 files changed

+44
-2
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::column::writer::{
4444
use crate::data_type::{ByteArray, FixedLenByteArray};
4545
use crate::errors::{ParquetError, Result};
4646
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
47-
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
47+
use crate::file::properties::{BloomFilterPosition, WriterProperties, WriterPropertiesPtr};
4848
use crate::file::reader::{ChunkReader, Length};
4949
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
5050
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
@@ -264,7 +264,11 @@ impl<W: Write + Send> ArrowWriter<W> {
264264
chunk.append_to_row_group(&mut row_group_writer)?;
265265
}
266266
let row_group_metadata = row_group_writer.close()?;
267-
self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?;
267+
match self.writer.properties().bloom_filter_position() {
268+
BloomFilterPosition::AfterRowGroup =>
269+
self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?,
270+
BloomFilterPosition::End => (),
271+
}
268272
Ok(())
269273
}
270274

@@ -1757,6 +1761,7 @@ mod tests {
17571761
.set_dictionary_page_size_limit(dictionary_size.max(1))
17581762
.set_encoding(*encoding)
17591763
.set_bloom_filter_enabled(bloom_filter)
1764+
.set_bloom_filter_position(BloomFilterPosition::End)
17601765
.build();
17611766

17621767
files.push(roundtrip_opts(&expected_batch, props))

parquet/src/file/properties.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag
4343
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
4444
/// Default value for [`WriterProperties::max_row_group_size`]
4545
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
46+
/// Default value for [`WriterProperties::bloom_filter_position`]
47+
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
4648
/// Default value for [`WriterProperties::created_by`]
4749
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
4850
/// Default value for [`WriterProperties::column_index_truncate_length`]
@@ -86,6 +88,24 @@ impl FromStr for WriterVersion {
8688
}
8789
}
8890

91+
/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should
92+
/// write Bloom filters
93+
///
94+
/// Basic constant, which is not part of the Thrift definition.
95+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96+
pub enum BloomFilterPosition {
97+
/// Write Bloom Filters of each row group right after the row group
98+
///
99+
/// This saves memory by writing it as soon as it is computed, at the cost
100+
/// of data locality for readers
101+
AfterRowGroup,
102+
/// Write Bloom Filters at the end of the file
103+
///
104+
/// This allows better data locality for readers, at the cost of memory usage
105+
/// for writers.
106+
End,
107+
}
108+
89109
/// Reference counted writer properties.
90110
pub type WriterPropertiesPtr = Arc<WriterProperties>;
91111

@@ -131,6 +151,7 @@ pub struct WriterProperties {
131151
data_page_row_count_limit: usize,
132152
write_batch_size: usize,
133153
max_row_group_size: usize,
154+
bloom_filter_position: BloomFilterPosition,
134155
writer_version: WriterVersion,
135156
created_by: String,
136157
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
@@ -217,6 +238,11 @@ impl WriterProperties {
217238
self.max_row_group_size
218239
}
219240

241+
/// Returns maximum number of rows in a row group.
242+
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
243+
self.bloom_filter_position
244+
}
245+
220246
/// Returns configured writer version.
221247
pub fn writer_version(&self) -> WriterVersion {
222248
self.writer_version
@@ -337,6 +363,7 @@ pub struct WriterPropertiesBuilder {
337363
data_page_row_count_limit: usize,
338364
write_batch_size: usize,
339365
max_row_group_size: usize,
366+
bloom_filter_position: BloomFilterPosition,
340367
writer_version: WriterVersion,
341368
created_by: String,
342369
key_value_metadata: Option<Vec<KeyValue>>,
@@ -356,6 +383,7 @@ impl WriterPropertiesBuilder {
356383
data_page_row_count_limit: usize::MAX,
357384
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
358385
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
386+
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
359387
writer_version: DEFAULT_WRITER_VERSION,
360388
created_by: DEFAULT_CREATED_BY.to_string(),
361389
key_value_metadata: None,
@@ -375,6 +403,7 @@ impl WriterPropertiesBuilder {
375403
data_page_row_count_limit: self.data_page_row_count_limit,
376404
write_batch_size: self.write_batch_size,
377405
max_row_group_size: self.max_row_group_size,
406+
bloom_filter_position: self.bloom_filter_position,
378407
writer_version: self.writer_version,
379408
created_by: self.created_by,
380409
key_value_metadata: self.key_value_metadata,
@@ -479,6 +508,12 @@ impl WriterPropertiesBuilder {
479508
self
480509
}
481510

511+
/// Sets where in the final file Bloom Filters are written
512+
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
513+
self.bloom_filter_position = value;
514+
self
515+
}
516+
482517
/// Sets "created by" property.
483518
pub fn set_created_by(mut self, value: String) -> Self {
484519
self.created_by = value;
@@ -991,6 +1026,7 @@ mod tests {
9911026
);
9921027
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
9931028
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
1029+
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
9941030
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
9951031
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
9961032
assert_eq!(props.key_value_metadata(), None);

parquet/src/file/writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
344344
.map(|v| v.to_thrift())
345345
.collect::<Vec<_>>();
346346

347+
self.write_bloom_filters(&mut row_groups)?;
347348
// Write column indexes and offset indexes
348349
self.write_column_indexes(&mut row_groups)?;
349350
self.write_offset_indexes(&mut row_groups)?;

0 commit comments

Comments
 (0)