Skip to content

Commit ca1d85f

Browse files
authored
Add explicit column mask construction in parquet: ProjectionMask (#1701) (#1716)
* Add explicit column mask construction (#1701) * Fix ParquetRecordBatchStream * Fix docs * Fix async_reader test * Review feedback
1 parent 6fbc9a4 commit ca1d85f

File tree

8 files changed

+237
-265
lines changed

8 files changed

+237
-265
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::arrow::converter::{
3232
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
3333
};
3434
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
35+
use crate::arrow::ProjectionMask;
3536
use crate::basic::Type as PhysicalType;
3637
use crate::data_type::{
3738
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type,
@@ -40,21 +41,15 @@ use crate::data_type::{
4041
use crate::errors::Result;
4142
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
4243

43-
/// Create array reader from parquet schema, column indices, and parquet file reader.
44-
pub fn build_array_reader<T>(
44+
/// Create array reader from parquet schema, projection mask, and parquet file reader.
45+
pub fn build_array_reader(
4546
parquet_schema: SchemaDescPtr,
4647
arrow_schema: SchemaRef,
47-
column_indices: T,
48+
mask: ProjectionMask,
4849
row_groups: Box<dyn RowGroupCollection>,
49-
) -> Result<Box<dyn ArrayReader>>
50-
where
51-
T: IntoIterator<Item = usize>,
52-
{
53-
let field = convert_schema(
54-
parquet_schema.as_ref(),
55-
column_indices,
56-
Some(arrow_schema.as_ref()),
57-
)?;
50+
) -> Result<Box<dyn ArrayReader>> {
51+
let field =
52+
convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?;
5853

5954
match &field {
6055
Some(field) => build_reader(field, row_groups.as_ref()),
@@ -346,6 +341,7 @@ mod tests {
346341
Arc::new(SerializedFileReader::new(file).unwrap());
347342

348343
let file_metadata = file_reader.metadata().file_metadata();
344+
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
349345
let arrow_schema = parquet_to_arrow_schema(
350346
file_metadata.schema_descr(),
351347
file_metadata.key_value_metadata(),
@@ -355,7 +351,7 @@ mod tests {
355351
let array_reader = build_array_reader(
356352
file_reader.metadata().file_metadata().schema_descr_ptr(),
357353
Arc::new(arrow_schema),
358-
vec![0usize].into_iter(),
354+
mask,
359355
Box::new(file_reader),
360356
)
361357
.unwrap();

parquet/src/arrow/array_reader/list_array.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ mod tests {
246246
use crate::arrow::array_reader::build_array_reader;
247247
use crate::arrow::array_reader::list_array::ListArrayReader;
248248
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
249-
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter};
249+
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
250250
use crate::file::properties::WriterProperties;
251251
use crate::file::reader::{FileReader, SerializedFileReader};
252252
use crate::schema::parser::parse_message_type;
@@ -582,10 +582,13 @@ mod tests {
582582
)
583583
.unwrap();
584584

585+
let schema = file_metadata.schema_descr_ptr();
586+
let mask = ProjectionMask::leaves(&schema, vec![0]);
587+
585588
let mut array_reader = build_array_reader(
586-
file_reader.metadata().file_metadata().schema_descr_ptr(),
589+
schema,
587590
Arc::new(arrow_schema),
588-
vec![0usize].into_iter(),
591+
mask,
589592
Box::new(file_reader),
590593
)
591594
.unwrap();

parquet/src/arrow/arrow_reader.rs

Lines changed: 43 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ use arrow::{array::StructArray, error::ArrowError};
2727

2828
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
2929
use crate::arrow::schema::parquet_to_arrow_schema;
30-
use crate::arrow::schema::{
31-
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
32-
};
30+
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
31+
use crate::arrow::ProjectionMask;
3332
use crate::errors::Result;
3433
use crate::file::metadata::{KeyValue, ParquetMetaData};
3534
use crate::file::reader::FileReader;
@@ -44,15 +43,8 @@ pub trait ArrowReader {
4443
fn get_schema(&mut self) -> Result<Schema>;
4544

4645
/// Read parquet schema and convert it into arrow schema.
47-
/// This schema only includes columns identified by `column_indices`.
48-
/// To select leaf columns (i.e. `a.b.c` instead of `a`), set `leaf_columns = true`
49-
fn get_schema_by_columns<T>(
50-
&mut self,
51-
column_indices: T,
52-
leaf_columns: bool,
53-
) -> Result<Schema>
54-
where
55-
T: IntoIterator<Item = usize>;
46+
/// This schema only includes columns identified by `mask`.
47+
fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema>;
5648

5749
/// Returns record batch reader from whole parquet file.
5850
///
@@ -64,19 +56,17 @@ pub trait ArrowReader {
6456
fn get_record_reader(&mut self, batch_size: usize) -> Result<Self::RecordReader>;
6557

6658
/// Returns record batch reader whose record batch contains columns identified by
67-
/// `column_indices`.
59+
/// `mask`.
6860
///
6961
/// # Arguments
7062
///
71-
/// `column_indices`: The columns that should be included in record batches.
63+
/// `mask`: The columns that should be included in record batches.
7264
/// `batch_size`: Please refer to `get_record_reader`.
73-
fn get_record_reader_by_columns<T>(
65+
fn get_record_reader_by_columns(
7466
&mut self,
75-
column_indices: T,
67+
mask: ProjectionMask,
7668
batch_size: usize,
77-
) -> Result<Self::RecordReader>
78-
where
79-
T: IntoIterator<Item = usize>;
69+
) -> Result<Self::RecordReader>;
8070
}
8171

8272
#[derive(Debug, Clone, Default)]
@@ -118,59 +108,34 @@ impl ArrowReader for ParquetFileArrowReader {
118108
parquet_to_arrow_schema(file_metadata.schema_descr(), self.get_kv_metadata())
119109
}
120110

121-
fn get_schema_by_columns<T>(
122-
&mut self,
123-
column_indices: T,
124-
leaf_columns: bool,
125-
) -> Result<Schema>
126-
where
127-
T: IntoIterator<Item = usize>,
128-
{
111+
fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema> {
129112
let file_metadata = self.file_reader.metadata().file_metadata();
130-
if leaf_columns {
131-
parquet_to_arrow_schema_by_columns(
132-
file_metadata.schema_descr(),
133-
column_indices,
134-
self.get_kv_metadata(),
135-
)
136-
} else {
137-
parquet_to_arrow_schema_by_root_columns(
138-
file_metadata.schema_descr(),
139-
column_indices,
140-
self.get_kv_metadata(),
141-
)
142-
}
113+
parquet_to_arrow_schema_by_columns(
114+
file_metadata.schema_descr(),
115+
mask,
116+
self.get_kv_metadata(),
117+
)
143118
}
144119

145120
fn get_record_reader(
146121
&mut self,
147122
batch_size: usize,
148123
) -> Result<ParquetRecordBatchReader> {
149-
let column_indices = 0..self
150-
.file_reader
151-
.metadata()
152-
.file_metadata()
153-
.schema_descr()
154-
.num_columns();
155-
156-
self.get_record_reader_by_columns(column_indices, batch_size)
124+
self.get_record_reader_by_columns(ProjectionMask::all(), batch_size)
157125
}
158126

159-
fn get_record_reader_by_columns<T>(
127+
fn get_record_reader_by_columns(
160128
&mut self,
161-
column_indices: T,
129+
mask: ProjectionMask,
162130
batch_size: usize,
163-
) -> Result<ParquetRecordBatchReader>
164-
where
165-
T: IntoIterator<Item = usize>,
166-
{
131+
) -> Result<ParquetRecordBatchReader> {
167132
let array_reader = build_array_reader(
168133
self.file_reader
169134
.metadata()
170135
.file_metadata()
171136
.schema_descr_ptr(),
172137
Arc::new(self.get_schema()?),
173-
column_indices,
138+
mask,
174139
Box::new(self.file_reader.clone()),
175140
)?;
176141

@@ -296,7 +261,7 @@ mod tests {
296261
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
297262
};
298263
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
299-
use crate::arrow::ArrowWriter;
264+
use crate::arrow::{ArrowWriter, ProjectionMask};
300265
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
301266
use crate::column::writer::get_typed_column_writer_mut;
302267
use crate::data_type::{
@@ -351,12 +316,14 @@ mod tests {
351316
let parquet_file_reader =
352317
get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
353318

354-
let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize;
319+
let file_metadata = parquet_file_reader.metadata().file_metadata();
320+
let max_len = file_metadata.num_rows() as usize;
355321

322+
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [2]);
356323
let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader);
357324

358325
let mut record_batch_reader = arrow_reader
359-
.get_record_reader_by_columns(vec![2], 60)
326+
.get_record_reader_by_columns(mask, 60)
360327
.expect("Failed to read into array!");
361328

362329
// Verify that the schema was correctly parsed
@@ -1040,8 +1007,11 @@ mod tests {
10401007
// (see: ARROW-11452)
10411008
let testdata = arrow::util::test_util::parquet_test_data();
10421009
let path = format!("{}/nested_structs.rust.parquet", testdata);
1043-
let parquet_file_reader =
1044-
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
1010+
let file = File::open(&path).unwrap();
1011+
let parquet_file_reader = SerializedFileReader::try_from(file).unwrap();
1012+
let file_metadata = parquet_file_reader.metadata().file_metadata();
1013+
let schema = file_metadata.schema_descr_ptr();
1014+
10451015
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
10461016
let record_batch_reader = arrow_reader
10471017
.get_record_reader(60)
@@ -1051,12 +1021,11 @@ mod tests {
10511021
batch.unwrap();
10521022
}
10531023

1024+
let mask = ProjectionMask::leaves(&schema, [3, 8, 10]);
10541025
let projected_reader = arrow_reader
1055-
.get_record_reader_by_columns(vec![3, 8, 10], 60)
1056-
.unwrap();
1057-
let projected_schema = arrow_reader
1058-
.get_schema_by_columns(vec![3, 8, 10], true)
1026+
.get_record_reader_by_columns(mask.clone(), 60)
10591027
.unwrap();
1028+
let projected_schema = arrow_reader.get_schema_by_columns(mask).unwrap();
10601029

10611030
let expected_schema = Schema::new(vec![
10621031
Field::new(
@@ -1139,8 +1108,11 @@ mod tests {
11391108
}
11401109

11411110
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1111+
let file_metadata = file_reader.metadata().file_metadata();
1112+
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
1113+
11421114
let mut batch = ParquetFileArrowReader::new(file_reader);
1143-
let reader = batch.get_record_reader_by_columns(vec![0], 1024).unwrap();
1115+
let reader = batch.get_record_reader_by_columns(mask, 1024).unwrap();
11441116

11451117
let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
11461118
"group",
@@ -1178,7 +1150,7 @@ mod tests {
11781150
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
11791151

11801152
let mut record_batch_reader = arrow_reader
1181-
.get_record_reader_by_columns(vec![0], 10)
1153+
.get_record_reader_by_columns(ProjectionMask::all(), 10)
11821154
.unwrap();
11831155

11841156
let error = record_batch_reader.next().unwrap().unwrap_err();
@@ -1414,10 +1386,13 @@ mod tests {
14141386
let path = format!("{}/alltypes_plain.parquet", testdata);
14151387
let file = File::open(&path).unwrap();
14161388
let reader = SerializedFileReader::try_from(file).unwrap();
1417-
let expected_rows = reader.metadata().file_metadata().num_rows() as usize;
1389+
let file_metadata = reader.metadata().file_metadata();
1390+
let expected_rows = file_metadata.num_rows() as usize;
1391+
let schema = file_metadata.schema_descr_ptr();
14181392

14191393
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
1420-
let batch_reader = arrow_reader.get_record_reader_by_columns([], 2).unwrap();
1394+
let mask = ProjectionMask::leaves(&schema, []);
1395+
let batch_reader = arrow_reader.get_record_reader_by_columns(mask, 2).unwrap();
14211396

14221397
let mut total_rows = 0;
14231398
for maybe_batch in batch_reader {

0 commit comments

Comments
 (0)