Skip to content

Commit a2818bf

Browse files
committed
Add ParquetMetadtaBuilder
1 parent 2cc0c16 commit a2818bf

File tree

3 files changed

+170
-41
lines changed

3 files changed

+170
-41
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,13 +1554,16 @@ mod tests {
15541554
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
15551555
.expect("reading offset index");
15561556

1557-
let row_group_meta = metadata.row_group(0).clone();
1558-
let metadata = ParquetMetaData::new_with_page_index(
1559-
metadata.file_metadata().clone(),
1560-
vec![row_group_meta],
1561-
None,
1562-
Some(vec![offset_index.clone()]),
1563-
);
1557+
let mut metadata_builder = metadata.into_builder();
1558+
let mut row_groups = metadata_builder.take_row_groups();
1559+
row_groups.truncate(1);
1560+
let row_group_meta = row_groups.pop().unwrap();
1561+
1562+
let metadata = metadata_builder
1563+
.add_row_group(row_group_meta)
1564+
.set_column_index(None)
1565+
.set_offset_index(Some(vec![offset_index.clone()]))
1566+
.build();
15641567

15651568
let metadata = Arc::new(metadata);
15661569

parquet/src/file/metadata/mod.rs

Lines changed: 143 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,16 @@ pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
156156
/// defined by [`parquet.thrift`].
157157
///
158158
/// # Overview
159+
/// The fields of this structure are:
159160
/// * [`FileMetaData`]: Information about the overall file (such as the schema) (See [`Self::file_metadata`])
160161
/// * [`RowGroupMetaData`]: Information about each Row Group (see [`Self::row_groups`])
161162
/// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`])
162163
///
163164
/// This structure is read by the various readers in this crate or can be read
164165
/// directly from a file using the [`ParquetMetaDataReader`] struct.
165166
///
167+
/// See the [`ParquetMetaDataBuilder`] to create and modify this structure.
168+
///
166169
/// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
167170
#[derive(Debug, Clone, PartialEq)]
168171
pub struct ParquetMetaData {
@@ -204,6 +207,11 @@ impl ParquetMetaData {
204207
}
205208
}
206209

210+
/// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`]
211+
pub fn into_builder(self) -> ParquetMetaDataBuilder {
212+
self.into()
213+
}
214+
207215
/// Returns file metadata as reference.
208216
pub fn file_metadata(&self) -> &FileMetaData {
209217
&self.file_metadata
@@ -290,6 +298,117 @@ impl ParquetMetaData {
290298
}
291299
}
292300

301+
/// A builder for creating / manipulating [`ParquetMetaData`]
302+
///
303+
/// # Example creating a new [`ParquetMetaData`]
304+
///
305+
///```no_run
306+
/// # use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataBuilder, RowGroupMetaData, RowGroupMetaDataBuilder};
307+
/// # fn get_file_metadata() -> FileMetaData { unimplemented!(); }
308+
/// // Create a new builder given the file metadata
309+
/// let file_metadata = get_file_metadata();
310+
/// // Create a row group
311+
/// let row_group = RowGroupMetaData::builder(file_metadata.schema_descr_ptr())
312+
/// .set_num_rows(100)
313+
/// // ... (A real row group needs more than just the number of rows)
314+
/// .build()
315+
/// .unwrap();
316+
/// // Create the final metadata
317+
/// let metadata: ParquetMetaData = ParquetMetaDataBuilder::new(file_metadata)
318+
/// .add_row_group(row_group)
319+
/// .build();
320+
/// ```
321+
///
322+
/// # Example modifying an existing [`ParquetMetaData`]
323+
/// ```no_run
324+
/// # use parquet::file::metadata::ParquetMetaData;
325+
/// # fn load_metadata() -> ParquetMetaData { unimplemented!(); }
326+
/// // Modify the metadata so only the last RowGroup remains
327+
/// let metadata: ParquetMetaData = load_metadata();
328+
/// let mut builder = metadata.into_builder();
329+
///
330+
/// // Take existing row groups to modify
331+
/// let mut row_groups = builder.take_row_groups();
332+
/// let last_row_group = row_groups.pop().unwrap();
333+
///
334+
/// let metadata = builder
335+
/// .add_row_group(last_row_group)
336+
/// .build();
337+
/// ```
338+
pub struct ParquetMetaDataBuilder(ParquetMetaData);
339+
340+
impl ParquetMetaDataBuilder {
341+
/// Create a new builder from a file metadata, with no row groups
342+
pub fn new(file_meta_data: FileMetaData) -> Self {
343+
Self(ParquetMetaData::new(file_meta_data, vec![]))
344+
}
345+
346+
/// Create a new builder from an exising ParquetMetaData
347+
pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
348+
Self(metadata)
349+
}
350+
351+
/// Adds a row group to the metadata
352+
pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
353+
self.0.row_groups.push(row_group);
354+
self
355+
}
356+
357+
/// Sets all the row groups to the specified list
358+
pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
359+
self.0.row_groups = row_groups;
360+
self
361+
}
362+
363+
/// Takes ownership of the row groups in this builder, and clears the list
364+
/// of row groups.
365+
///
366+
/// This can be used for more efficient creation of a new ParquetMetaData
367+
/// from an existing one.
368+
pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
369+
std::mem::take(&mut self.0.row_groups)
370+
}
371+
372+
/// Return a reference to the current row groups
373+
pub fn row_groups(&self) -> &[RowGroupMetaData] {
374+
&self.0.row_groups
375+
}
376+
377+
/// Sets the column index
378+
pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
379+
self.0.column_index = column_index;
380+
self
381+
}
382+
383+
/// Returns the current column index from the builder, replacing it with `None`
384+
pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
385+
std::mem::take(&mut self.0.column_index)
386+
}
387+
388+
/// Sets the offset index
389+
pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
390+
self.0.offset_index = offset_index;
391+
self
392+
}
393+
394+
/// Returns the current offset index from the builder, replacing it with `None`
395+
pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
396+
std::mem::take(&mut self.0.offset_index)
397+
}
398+
399+
/// Creates a new ParquetMetaData from the builder
400+
pub fn build(self) -> ParquetMetaData {
401+
let Self(metadata) = self;
402+
metadata
403+
}
404+
}
405+
406+
impl From<ParquetMetaData> for ParquetMetaDataBuilder {
407+
fn from(meta_data: ParquetMetaData) -> Self {
408+
Self(meta_data)
409+
}
410+
}
411+
293412
pub type KeyValue = crate::format::KeyValue;
294413

295414
/// Reference counted pointer for [`FileMetaData`].
@@ -566,12 +685,27 @@ impl RowGroupMetaDataBuilder {
566685
self
567686
}
568687

688+
/// Takes ownership of the the column metadata in this builder, and clears
689+
/// the list of columns.
690+
///
691+
/// This can be used for more efficient creation of a new RowGroupMetaData
692+
/// from an existing one.
693+
pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
694+
std::mem::take(&mut self.0.columns)
695+
}
696+
569697
/// Sets column metadata for this row group.
570698
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
571699
self.0.columns = value;
572700
self
573701
}
574702

703+
/// Adds a column metadata to this row group
704+
pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
705+
self.0.columns.push(value);
706+
self
707+
}
708+
575709
/// Sets ordinal for this row group.
576710
pub fn set_ordinal(mut self, value: i16) -> Self {
577711
self.0.ordinal = Some(value);
@@ -1672,7 +1806,9 @@ mod tests {
16721806
.unwrap();
16731807
let row_group_meta_with_stats = vec![row_group_meta_with_stats];
16741808

1675-
let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta_with_stats);
1809+
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
1810+
.set_row_groups(row_group_meta_with_stats)
1811+
.build();
16761812
let base_expected_size = 2312;
16771813

16781814
assert_eq!(parquet_meta.memory_size(), base_expected_size);
@@ -1692,14 +1828,13 @@ mod tests {
16921828
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
16931829
let offset_index = offset_index.build_to_thrift();
16941830

1695-
let parquet_meta = ParquetMetaData::new_with_page_index(
1696-
file_metadata,
1697-
row_group_meta,
1698-
Some(vec![vec![Index::BOOLEAN(native_index)]]),
1699-
Some(vec![vec![
1831+
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
1832+
.set_row_groups(row_group_meta)
1833+
.set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
1834+
.set_offset_index(Some(vec![vec![
17001835
OffsetIndexMetaData::try_new(offset_index).unwrap()
1701-
]]),
1702-
);
1836+
]]))
1837+
.build();
17031838

17041839
let bigger_expected_size = 2816;
17051840
// more set fields means more memory usage

parquet/src/file/serialized_reader.rs

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,13 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
191191
/// Creates file reader from a Parquet file with read options.
192192
/// Returns error if Parquet file does not exist or is corrupt.
193193
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
194-
let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
194+
let mut metadata_builder = ParquetMetaDataReader::new()
195+
.parse_and_finish(&chunk_reader)?
196+
.into_builder();
195197
let mut predicates = options.predicates;
196-
let row_groups = metadata.row_groups().to_vec();
197-
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
198-
for (i, rg_meta) in row_groups.into_iter().enumerate() {
198+
199+
// Filter row groups based on the predicates
200+
for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
199201
let mut keep = true;
200202
for predicate in &mut predicates {
201203
if !predicate(&rg_meta, i) {
@@ -204,41 +206,30 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
204206
}
205207
}
206208
if keep {
207-
filtered_row_groups.push(rg_meta);
209+
metadata_builder = metadata_builder.add_row_group(rg_meta);
208210
}
209211
}
210212

211213
if options.enable_page_index {
212214
let mut columns_indexes = vec![];
213215
let mut offset_indexes = vec![];
214216

215-
for rg in &mut filtered_row_groups {
217+
for rg in metadata_builder.row_groups().iter() {
216218
let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
217219
let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
218220
columns_indexes.push(column_index);
219221
offset_indexes.push(offset_index);
220222
}
221-
222-
Ok(Self {
223-
chunk_reader: Arc::new(chunk_reader),
224-
metadata: Arc::new(ParquetMetaData::new_with_page_index(
225-
metadata.file_metadata().clone(),
226-
filtered_row_groups,
227-
Some(columns_indexes),
228-
Some(offset_indexes),
229-
)),
230-
props: Arc::new(options.props),
231-
})
232-
} else {
233-
Ok(Self {
234-
chunk_reader: Arc::new(chunk_reader),
235-
metadata: Arc::new(ParquetMetaData::new(
236-
metadata.file_metadata().clone(),
237-
filtered_row_groups,
238-
)),
239-
props: Arc::new(options.props),
240-
})
223+
metadata_builder = metadata_builder
224+
.set_column_index(Some(columns_indexes))
225+
.set_offset_index(Some(offset_indexes));
241226
}
227+
228+
Ok(Self {
229+
chunk_reader: Arc::new(chunk_reader),
230+
metadata: Arc::new(metadata_builder.build()),
231+
props: Arc::new(options.props),
232+
})
242233
}
243234
}
244235

0 commit comments

Comments
 (0)