Skip to content

Commit 42c663d

Browse files
committed
Revert "Revert "Write Bloom filters between row groups instead of the end (#…"
This reverts commit 22e0b44.
1 parent 756b1fb commit 42c663d

File tree

7 files changed

+277
-52
lines changed

7 files changed

+277
-52
lines changed

parquet/Cargo.toml

+8
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false }
6767
twox-hash = { version = "1.6", default-features = false }
6868
paste = { version = "1.0" }
6969
half = { version = "2.1", default-features = false, features = ["num-traits"] }
70+
sysinfo = { version = "0.30.12", optional = true, default-features = false }
7071

7172
[dev-dependencies]
7273
base64 = { version = "0.22", default-features = false, features = ["std"] }
@@ -114,12 +115,19 @@ async = ["futures", "tokio"]
114115
object_store = ["dep:object_store", "async"]
115116
# Group Zstd dependencies
116117
zstd = ["dep:zstd", "zstd-sys"]
118+
# Display memory in example/write_parquet.rs
119+
sysinfo = ["dep:sysinfo"]
117120

118121
[[example]]
119122
name = "read_parquet"
120123
required-features = ["arrow"]
121124
path = "./examples/read_parquet.rs"
122125

126+
[[example]]
127+
name = "write_parquet"
128+
required-features = ["cli", "sysinfo"]
129+
path = "./examples/write_parquet.rs"
130+
123131
[[example]]
124132
name = "async_read_parquet"
125133
required-features = ["arrow", "async"]

parquet/examples/write_parquet.rs

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::fs::File;
19+
use std::path::PathBuf;
20+
use std::sync::Arc;
21+
use std::time::{Duration, Instant};
22+
23+
use arrow::array::{StructArray, UInt64Builder};
24+
use arrow::datatypes::DataType::UInt64;
25+
use arrow::datatypes::{Field, Schema};
26+
use clap::{Parser, ValueEnum};
27+
use parquet::arrow::ArrowWriter as ParquetWriter;
28+
use parquet::basic::Encoding;
29+
use parquet::errors::Result;
30+
use parquet::file::properties::{BloomFilterPosition, WriterProperties};
31+
use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
32+
33+
#[derive(ValueEnum, Clone)]
34+
enum BloomFilterPositionArg {
35+
End,
36+
AfterRowGroup,
37+
}
38+
39+
#[derive(Parser)]
40+
#[command(version)]
41+
/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage.
42+
struct Args {
43+
#[arg(long, default_value_t = 1000)]
44+
/// Number of batches to write
45+
iterations: u64,
46+
47+
#[arg(long, default_value_t = 1000000)]
48+
/// Number of rows in each batch
49+
batch: u64,
50+
51+
#[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)]
52+
/// Where to write Bloom Filters
53+
bloom_filter_position: BloomFilterPositionArg,
54+
55+
/// Path to the file to write
56+
path: PathBuf,
57+
}
58+
59+
fn now() -> String {
60+
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
61+
}
62+
63+
fn mem(system: &mut System) -> String {
64+
let pid = Pid::from(std::process::id() as usize);
65+
system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory());
66+
system
67+
.process(pid)
68+
.map(|proc| format!("{}MB", proc.memory() / 1_000_000))
69+
.unwrap_or("N/A".to_string())
70+
}
71+
72+
fn main() -> Result<()> {
73+
let args = Args::parse();
74+
75+
let bloom_filter_position = match args.bloom_filter_position {
76+
BloomFilterPositionArg::End => BloomFilterPosition::End,
77+
BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup,
78+
};
79+
80+
let properties = WriterProperties::builder()
81+
.set_column_bloom_filter_enabled("id".into(), true)
82+
.set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
83+
.set_bloom_filter_position(bloom_filter_position)
84+
.build();
85+
let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
86+
// Create parquet file that will be read.
87+
let file = File::create(args.path).unwrap();
88+
let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?;
89+
90+
let mut system =
91+
System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything()));
92+
eprintln!(
93+
"{} Writing {} batches of {} rows. RSS = {}",
94+
now(),
95+
args.iterations,
96+
args.batch,
97+
mem(&mut system)
98+
);
99+
100+
let mut array_builder = UInt64Builder::new();
101+
let mut last_log = Instant::now();
102+
for i in 0..args.iterations {
103+
if Instant::now() - last_log > Duration::new(10, 0) {
104+
last_log = Instant::now();
105+
eprintln!(
106+
"{} Iteration {}/{}. RSS = {}",
107+
now(),
108+
i + 1,
109+
args.iterations,
110+
mem(&mut system)
111+
);
112+
}
113+
for j in 0..args.batch {
114+
array_builder.append_value(i + j);
115+
}
116+
writer.write(
117+
&StructArray::new(
118+
schema.fields().clone(),
119+
vec![Arc::new(array_builder.finish())],
120+
None,
121+
)
122+
.into(),
123+
)?;
124+
}
125+
writer.flush()?;
126+
writer.close()?;
127+
128+
eprintln!("{} Done. RSS = {}", now(), mem(&mut system));
129+
130+
Ok(())
131+
}

parquet/src/arrow/arrow_writer/mod.rs

+25-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::column::writer::{
4343
};
4444
use crate::data_type::{ByteArray, FixedLenByteArray};
4545
use crate::errors::{ParquetError, Result};
46-
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
46+
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
4747
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
4848
use crate::file::reader::{ChunkReader, Length};
4949
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
@@ -204,7 +204,7 @@ impl<W: Write + Send> ArrowWriter<W> {
204204
}
205205

206206
/// Returns metadata for any flushed row groups
207-
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
207+
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
208208
self.writer.flushed_row_groups()
209209
}
210210

@@ -1097,7 +1097,9 @@ mod tests {
10971097
use crate::file::metadata::ParquetMetaData;
10981098
use crate::file::page_index::index::Index;
10991099
use crate::file::page_index::index_reader::read_pages_locations;
1100-
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
1100+
use crate::file::properties::{
1101+
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1102+
};
11011103
use crate::file::serialized_reader::ReadOptionsBuilder;
11021104
use crate::file::{
11031105
reader::{FileReader, SerializedFileReader},
@@ -1745,6 +1747,7 @@ mod tests {
17451747
values: ArrayRef,
17461748
schema: SchemaRef,
17471749
bloom_filter: bool,
1750+
bloom_filter_position: BloomFilterPosition,
17481751
}
17491752

17501753
impl RoundTripOptions {
@@ -1755,6 +1758,7 @@ mod tests {
17551758
values,
17561759
schema: Arc::new(schema),
17571760
bloom_filter: false,
1761+
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
17581762
}
17591763
}
17601764
}
@@ -1774,6 +1778,7 @@ mod tests {
17741778
values,
17751779
schema,
17761780
bloom_filter,
1781+
bloom_filter_position,
17771782
} = options;
17781783

17791784
let encodings = match values.data_type() {
@@ -1814,6 +1819,7 @@ mod tests {
18141819
.set_dictionary_page_size_limit(dictionary_size.max(1))
18151820
.set_encoding(*encoding)
18161821
.set_bloom_filter_enabled(bloom_filter)
1822+
.set_bloom_filter_position(bloom_filter_position)
18171823
.build();
18181824

18191825
files.push(roundtrip_opts(&expected_batch, props))
@@ -2171,6 +2177,22 @@ mod tests {
21712177
values_required::<BinaryViewArray, _>(many_vecs_iter);
21722178
}
21732179

2180+
#[test]
2181+
fn i32_column_bloom_filter_at_end() {
2182+
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2183+
let mut options = RoundTripOptions::new(array, false);
2184+
options.bloom_filter = true;
2185+
options.bloom_filter_position = BloomFilterPosition::End;
2186+
2187+
let files = one_column_roundtrip_with_options(options);
2188+
check_bloom_filter(
2189+
files,
2190+
"col".to_string(),
2191+
(0..SMALL_SIZE as i32).collect(),
2192+
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2193+
);
2194+
}
2195+
21742196
#[test]
21752197
fn i32_column_bloom_filter() {
21762198
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));

parquet/src/arrow/async_writer/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use crate::{
5454
arrow::arrow_writer::ArrowWriterOptions,
5555
arrow::ArrowWriter,
5656
errors::{ParquetError, Result},
57-
file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
57+
file::{metadata::RowGroupMetaData, properties::WriterProperties},
5858
format::{FileMetaData, KeyValue},
5959
};
6060
use arrow_array::RecordBatch;
@@ -172,7 +172,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
172172
}
173173

174174
/// Returns metadata for any flushed row groups
175-
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
175+
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
176176
self.sync_writer.flushed_row_groups()
177177
}
178178

parquet/src/file/metadata/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,11 @@ impl RowGroupMetaData {
358358
&self.columns
359359
}
360360

361+
/// Returns mutable slice of column chunk metadata.
362+
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
363+
&mut self.columns
364+
}
365+
361366
/// Number of rows in this row group.
362367
pub fn num_rows(&self) -> i64 {
363368
self.num_rows

parquet/src/file/properties.rs

+36
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag
4545
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
4646
/// Default value for [`WriterProperties::max_row_group_size`]
4747
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
48+
/// Default value for [`WriterProperties::bloom_filter_position`]
49+
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
4850
/// Default value for [`WriterProperties::created_by`]
4951
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
5052
/// Default value for [`WriterProperties::column_index_truncate_length`]
@@ -88,6 +90,24 @@ impl FromStr for WriterVersion {
8890
}
8991
}
9092

93+
/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should
94+
/// write Bloom filters
95+
///
96+
/// Basic constant, which is not part of the Thrift definition.
97+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98+
pub enum BloomFilterPosition {
99+
/// Write Bloom Filters of each row group right after the row group
100+
///
101+
/// This saves memory by writing it as soon as it is computed, at the cost
102+
/// of data locality for readers
103+
AfterRowGroup,
104+
/// Write Bloom Filters at the end of the file
105+
///
106+
/// This allows better data locality for readers, at the cost of memory usage
107+
/// for writers.
108+
End,
109+
}
110+
91111
/// Reference counted writer properties.
92112
pub type WriterPropertiesPtr = Arc<WriterProperties>;
93113

@@ -132,6 +152,7 @@ pub struct WriterProperties {
132152
data_page_row_count_limit: usize,
133153
write_batch_size: usize,
134154
max_row_group_size: usize,
155+
bloom_filter_position: BloomFilterPosition,
135156
writer_version: WriterVersion,
136157
created_by: String,
137158
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
@@ -219,6 +240,11 @@ impl WriterProperties {
219240
self.max_row_group_size
220241
}
221242

243+
/// Returns maximum number of rows in a row group.
244+
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
245+
self.bloom_filter_position
246+
}
247+
222248
/// Returns configured writer version.
223249
pub fn writer_version(&self) -> WriterVersion {
224250
self.writer_version
@@ -340,6 +366,7 @@ pub struct WriterPropertiesBuilder {
340366
data_page_row_count_limit: usize,
341367
write_batch_size: usize,
342368
max_row_group_size: usize,
369+
bloom_filter_position: BloomFilterPosition,
343370
writer_version: WriterVersion,
344371
created_by: String,
345372
key_value_metadata: Option<Vec<KeyValue>>,
@@ -359,6 +386,7 @@ impl WriterPropertiesBuilder {
359386
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
360387
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
361388
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
389+
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
362390
writer_version: DEFAULT_WRITER_VERSION,
363391
created_by: DEFAULT_CREATED_BY.to_string(),
364392
key_value_metadata: None,
@@ -378,6 +406,7 @@ impl WriterPropertiesBuilder {
378406
data_page_row_count_limit: self.data_page_row_count_limit,
379407
write_batch_size: self.write_batch_size,
380408
max_row_group_size: self.max_row_group_size,
409+
bloom_filter_position: self.bloom_filter_position,
381410
writer_version: self.writer_version,
382411
created_by: self.created_by,
383412
key_value_metadata: self.key_value_metadata,
@@ -489,6 +518,12 @@ impl WriterPropertiesBuilder {
489518
self
490519
}
491520

521+
/// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`)
522+
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
523+
self.bloom_filter_position = value;
524+
self
525+
}
526+
492527
/// Sets "created by" property (defaults to `parquet-rs version <VERSION>`).
493528
pub fn set_created_by(mut self, value: String) -> Self {
494529
self.created_by = value;
@@ -1054,6 +1089,7 @@ mod tests {
10541089
);
10551090
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
10561091
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
1092+
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
10571093
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
10581094
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
10591095
assert_eq!(props.key_value_metadata(), None);

0 commit comments

Comments
 (0)