Skip to content

Commit f23759a

Browse files
committed
Mutate the right row group metadata
When using BloomFilterPosition::AfterRowGroup this was only writing the Bloom Filter offset to a temporary clone of the metadata, causing the Bloom Filter to never be seen by readers
1 parent 6effa7f commit f23759a

File tree

3 files changed

+84
-64
lines changed

3 files changed

+84
-64
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ use crate::column::writer::{
4343
};
4444
use crate::data_type::{ByteArray, FixedLenByteArray};
4545
use crate::errors::{ParquetError, Result};
46-
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
47-
use crate::file::properties::{BloomFilterPosition, WriterProperties, WriterPropertiesPtr};
46+
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
47+
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
4848
use crate::file::reader::{ChunkReader, Length};
4949
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
5050
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
@@ -185,7 +185,7 @@ impl<W: Write + Send> ArrowWriter<W> {
185185
}
186186

187187
/// Returns metadata for any flushed row groups
188-
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
188+
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
189189
self.writer.flushed_row_groups()
190190
}
191191

@@ -263,12 +263,7 @@ impl<W: Write + Send> ArrowWriter<W> {
263263
for chunk in in_progress.close()? {
264264
chunk.append_to_row_group(&mut row_group_writer)?;
265265
}
266-
let row_group_metadata = row_group_writer.close()?;
267-
match self.writer.properties().bloom_filter_position() {
268-
BloomFilterPosition::AfterRowGroup =>
269-
self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?,
270-
BloomFilterPosition::End => (),
271-
}
266+
row_group_writer.close()?;
272267
Ok(())
273268
}
274269

@@ -1044,7 +1039,9 @@ mod tests {
10441039
use crate::file::metadata::ParquetMetaData;
10451040
use crate::file::page_index::index::Index;
10461041
use crate::file::page_index::index_reader::read_pages_locations;
1047-
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
1042+
use crate::file::properties::{
1043+
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1044+
};
10481045
use crate::file::serialized_reader::ReadOptionsBuilder;
10491046
use crate::file::{
10501047
reader::{FileReader, SerializedFileReader},
@@ -1761,7 +1758,7 @@ mod tests {
17611758
.set_dictionary_page_size_limit(dictionary_size.max(1))
17621759
.set_encoding(*encoding)
17631760
.set_bloom_filter_enabled(bloom_filter)
1764-
.set_bloom_filter_position(BloomFilterPosition::End)
1761+
.set_bloom_filter_position(BloomFilterPosition::AfterRowGroup)
17651762
.build();
17661763

17671764
files.push(roundtrip_opts(&expected_batch, props))

parquet/src/file/metadata.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,11 @@ impl RowGroupMetaData {
324324
&self.columns
325325
}
326326

327+
/// Returns mutable slice of column chunk metadata.
328+
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
329+
&mut self.columns
330+
}
331+
327332
/// Number of rows in this row group.
328333
pub fn num_rows(&self) -> i64 {
329334
self.num_rows

parquet/src/file/writer.rs

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ use crate::column::{
3434
};
3535
use crate::data_type::DataType;
3636
use crate::errors::{ParquetError, Result};
37+
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
3738
use crate::file::reader::ChunkReader;
38-
use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC};
39+
use crate::file::{metadata::*, PARQUET_MAGIC};
3940
use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
4041

4142
/// A wrapper around a [`Write`] that keeps track of the number
@@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()
115116
/// - the row group metadata
116117
/// - the column index for each column chunk
117118
/// - the offset index for each column chunk
118-
pub type OnCloseRowGroup<'a> = Box<
119+
pub type OnCloseRowGroup<'a, W> = Box<
119120
dyn FnOnce(
120-
RowGroupMetaDataPtr,
121+
&'a mut TrackedWrite<W>,
122+
RowGroupMetaData,
121123
Vec<Option<Sbbf>>,
122124
Vec<Option<ColumnIndex>>,
123125
Vec<Option<OffsetIndex>>,
@@ -143,7 +145,7 @@ pub struct SerializedFileWriter<W: Write> {
143145
schema: TypePtr,
144146
descr: SchemaDescPtr,
145147
props: WriterPropertiesPtr,
146-
row_groups: Vec<RowGroupMetaDataPtr>,
148+
row_groups: Vec<RowGroupMetaData>,
147149
bloom_filters: Vec<Vec<Option<Sbbf>>>,
148150
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
149151
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
@@ -197,18 +199,28 @@ impl<W: Write + Send> SerializedFileWriter<W> {
197199

198200
self.row_group_index += 1;
199201

202+
let bloom_filter_position = self.properties().bloom_filter_position();
200203
let row_groups = &mut self.row_groups;
201204
let row_bloom_filters = &mut self.bloom_filters;
202205
let row_column_indexes = &mut self.column_indexes;
203206
let row_offset_indexes = &mut self.offset_indexes;
204-
let on_close =
205-
|metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| {
206-
row_groups.push(metadata);
207-
row_bloom_filters.push(row_group_bloom_filter);
208-
row_column_indexes.push(row_group_column_index);
209-
row_offset_indexes.push(row_group_offset_index);
210-
Ok(())
207+
let on_close = move |buf,
208+
mut metadata,
209+
row_group_bloom_filter,
210+
row_group_column_index,
211+
row_group_offset_index| {
212+
row_bloom_filters.push(row_group_bloom_filter);
213+
row_column_indexes.push(row_group_column_index);
214+
row_offset_indexes.push(row_group_offset_index);
215+
match bloom_filter_position {
216+
BloomFilterPosition::AfterRowGroup => {
217+
write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
218+
}
219+
BloomFilterPosition::End => (),
211220
};
221+
row_groups.push(metadata);
222+
Ok(())
223+
};
212224

213225
let row_group_writer = SerializedRowGroupWriter::new(
214226
self.descr.clone(),
@@ -221,7 +233,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
221233
}
222234

223235
/// Returns metadata for any flushed row groups
224-
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
236+
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
225237
&self.row_groups
226238
}
227239

@@ -273,40 +285,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
273285
Ok(())
274286
}
275287

276-
/// Serialize all the bloom filter to the file
277-
pub fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
278-
// iter row group
279-
// iter each column
280-
// write bloom filter to the file
281-
for row_group in row_groups.iter_mut() {
282-
let row_group_idx: u16 = row_group
283-
.ordinal
284-
.expect("Missing row group ordinal")
285-
.try_into()
286-
.expect("Negative row group ordinal");
287-
let row_group_idx = row_group_idx as usize;
288-
for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
289-
match self.bloom_filters[row_group_idx][column_idx].take() {
290-
Some(bloom_filter) => {
291-
let start_offset = self.buf.bytes_written();
292-
bloom_filter.write(&mut self.buf)?;
293-
let end_offset = self.buf.bytes_written();
294-
// set offset and index for bloom filter
295-
let column_chunk_meta = column_chunk
296-
.meta_data
297-
.as_mut()
298-
.expect("can't have bloom filter without column metadata");
299-
column_chunk_meta.bloom_filter_offset = Some(start_offset as i64);
300-
column_chunk_meta.bloom_filter_length =
301-
Some((end_offset - start_offset) as i32);
302-
}
303-
None => {}
304-
}
305-
}
306-
}
307-
Ok(())
308-
}
309-
310288
/// Serialize all the column index to the file
311289
fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
312290
// iter row group
@@ -337,14 +315,17 @@ impl<W: Write + Send> SerializedFileWriter<W> {
337315
self.finished = true;
338316
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
339317

318+
for row_group in &mut self.row_groups {
319+
write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
320+
}
321+
340322
let mut row_groups = self
341323
.row_groups
342324
.as_slice()
343325
.iter()
344326
.map(|v| v.to_thrift())
345327
.collect::<Vec<_>>();
346328

347-
self.write_bloom_filters(&mut row_groups)?;
348329
// Write column indexes and offset indexes
349330
self.write_column_indexes(&mut row_groups)?;
350331
self.write_offset_indexes(&mut row_groups)?;
@@ -449,6 +430,43 @@ impl<W: Write + Send> SerializedFileWriter<W> {
449430
}
450431
}
451432

433+
/// Serialize all the bloom filters of the given row group to the given buffer,
434+
/// and returns the updated row group metadata.
435+
fn write_bloom_filters<W: Write + Send>(
436+
buf: &mut TrackedWrite<W>,
437+
bloom_filters: &mut Vec<Vec<Option<Sbbf>>>,
438+
row_group: &mut RowGroupMetaData,
439+
) -> Result<()> {
440+
// iter row group
441+
// iter each column
442+
// write bloom filter to the file
443+
444+
let row_group_idx: u16 = row_group
445+
.ordinal()
446+
.expect("Missing row group ordinal")
447+
.try_into()
448+
.expect("Negative row group ordinal");
449+
let row_group_idx = row_group_idx as usize;
450+
for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
451+
match bloom_filters[row_group_idx][column_idx].take() {
452+
Some(bloom_filter) => {
453+
let start_offset = buf.bytes_written();
454+
bloom_filter.write(&mut *buf)?;
455+
let end_offset = buf.bytes_written();
456+
// set offset and index for bloom filter
457+
*column_chunk = column_chunk
458+
.clone()
459+
.into_builder()
460+
.set_bloom_filter_offset(Some(start_offset as i64))
461+
.set_bloom_filter_length(Some((end_offset - start_offset) as i32))
462+
.build()?;
463+
}
464+
None => {}
465+
}
466+
}
467+
Ok(())
468+
}
469+
452470
/// Parquet row group writer API.
453471
/// Provides methods to access column writers in an iterator-like fashion, order is
454472
/// guaranteed to match the order of schema leaves (column descriptors).
@@ -474,7 +492,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
474492
offset_indexes: Vec<Option<OffsetIndex>>,
475493
row_group_index: i16,
476494
file_offset: i64,
477-
on_close: Option<OnCloseRowGroup<'a>>,
495+
on_close: Option<OnCloseRowGroup<'a, W>>,
478496
}
479497

480498
impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
@@ -491,7 +509,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
491509
properties: WriterPropertiesPtr,
492510
buf: &'a mut TrackedWrite<W>,
493511
row_group_index: i16,
494-
on_close: Option<OnCloseRowGroup<'a>>,
512+
on_close: Option<OnCloseRowGroup<'a, W>>,
495513
) -> Self {
496514
let num_columns = schema_descr.num_columns();
497515
let file_offset = buf.bytes_written() as i64;
@@ -675,12 +693,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
675693
.set_file_offset(self.file_offset)
676694
.build()?;
677695

678-
let metadata = Arc::new(row_group_metadata);
679-
self.row_group_metadata = Some(metadata.clone());
696+
self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
680697

681698
if let Some(on_close) = self.on_close.take() {
682699
on_close(
683-
metadata,
700+
self.buf,
701+
row_group_metadata,
684702
self.bloom_filters,
685703
self.column_indexes,
686704
self.offset_indexes,
@@ -1452,7 +1470,7 @@ mod tests {
14521470
assert_eq!(flushed.len(), idx + 1);
14531471
assert_eq!(Some(idx as i16), last_group.ordinal());
14541472
assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1455-
assert_eq!(flushed[idx].as_ref(), last_group.as_ref());
1473+
assert_eq!(flushed[idx], Arc::unwrap_or_clone(last_group));
14561474
}
14571475
let file_metadata = file_writer.close().unwrap();
14581476

0 commit comments

Comments
 (0)