Skip to content

Commit 301f27d

Browse files
committed
Add ParquetAccessPlan that describes which part of the parquet files to read
1 parent a0773cd commit 301f27d

File tree

5 files changed

+351
-213
lines changed

5 files changed

+351
-213
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
19+
20+
/// Specifies a selection of rows and row groups within a ParquetFile to decode.
21+
///
22+
/// A `ParquetAccessPlan` is used to limits the row groups and data pages a `ParquetExec`
23+
/// will read and decode and this improve performance.
24+
///
25+
/// Note that page level pruning based on ArrowPredicate is applied after all of
26+
/// these selections
27+
///
28+
/// This looks like:
29+
/// (TODO diagram)
30+
#[derive(Debug, Clone, PartialEq)]
31+
pub struct ParquetAccessPlan {
32+
/// How to access the i-th row group
33+
row_groups: Vec<RowGroupAccess>,
34+
}
35+
36+
#[derive(Debug, Clone, PartialEq)]
37+
pub enum RowGroupAccess {
38+
/// The row group should not be read at all
39+
Skip,
40+
/// The row group should be scanned fully
41+
Scan,
42+
/// Only the specified rows within the row group should be scanned
43+
Selection(RowSelection),
44+
}
45+
46+
impl RowGroupAccess {
47+
/// return true if this row group should be scanned
48+
pub fn should_scan(&self) -> bool {
49+
match self {
50+
RowGroupAccess::Skip => false,
51+
RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
52+
}
53+
}
54+
}
55+
56+
impl ParquetAccessPlan {
57+
/// Create a new `ParquetAccessPlan` to scan all row groups
58+
pub fn new_all(row_group_count: usize) -> Self {
59+
Self {
60+
row_groups: vec![RowGroupAccess::Scan; row_group_count],
61+
}
62+
}
63+
64+
/// Set the i-th row group to false (should not be scanned)
65+
pub fn do_not_scan(&mut self, idx: usize) {
66+
self.row_groups[idx] = RowGroupAccess::Skip;
67+
}
68+
69+
/// Return true if the i-th row group should be scanned
70+
pub fn should_scan(&self, idx: usize) -> bool {
71+
self.row_groups[idx].should_scan()
72+
}
73+
74+
/// Set to scan only the [`RowSelection`] in the specified row group.
75+
///
76+
/// Based on the existing row groups plan:
77+
/// * Skip: does nothing
78+
/// * Scan: Updates to scan only the rows in the `RowSelection`
79+
/// * Selection: Updates to scan only the specified in the exising selection and the new selection
80+
pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
81+
self.row_groups[idx] = match &self.row_groups[idx] {
82+
// already skipping the entire row group
83+
RowGroupAccess::Skip => RowGroupAccess::Skip,
84+
RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
85+
RowGroupAccess::Selection(existing_selection) => {
86+
RowGroupAccess::Selection(existing_selection.intersection(&selection))
87+
}
88+
}
89+
}
90+
91+
/// Return the overall RowSelection for all scanned row groups, if
92+
/// there are any RowGroupAccess::Selection;
93+
///
94+
///
95+
/// TODO better doc / explanation
96+
pub fn overall_row_selection(&self) -> Option<RowSelection> {
97+
if !self
98+
.row_groups
99+
.iter()
100+
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
101+
{
102+
return None;
103+
}
104+
105+
let total_selection: RowSelection = self
106+
.row_groups
107+
.iter()
108+
.flat_map(|rg| {
109+
match rg {
110+
RowGroupAccess::Skip => vec![],
111+
RowGroupAccess::Scan => {
112+
// need a row group access to scan the entire row group (need row group counts)
113+
// This is clearly not tested TODO
114+
todo!();
115+
}
116+
RowGroupAccess::Selection(selection) => {
117+
// todo avoid these clones
118+
let selection: Vec<RowSelector> = selection.clone().into();
119+
selection
120+
}
121+
}
122+
})
123+
.collect();
124+
125+
Some(total_selection)
126+
}
127+
128+
/// Return an iterator over the row group indexes that should be scanned
129+
pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
130+
self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
131+
if b.should_scan() {
132+
Some(idx)
133+
} else {
134+
None
135+
}
136+
})
137+
}
138+
139+
/// Return a vec of all row group indexes to scan
140+
pub fn row_group_indexes(&self) -> Vec<usize> {
141+
self.row_group_index_iter().collect()
142+
}
143+
144+
/// Return the total number of row groups (not the total number to be scanned)
145+
pub fn len(&self) -> usize {
146+
self.row_groups.len()
147+
}
148+
149+
/// Return true if there are no row groups
150+
pub fn is_empty(&self) -> bool {
151+
self.row_groups.is_empty()
152+
}
153+
}

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use log::debug;
4747
use parquet::basic::{ConvertedType, LogicalType};
4848
use parquet::schema::types::ColumnDescriptor;
4949

50+
mod access_plan;
5051
mod metrics;
5152
mod opener;
5253
mod page_filter;
@@ -59,6 +60,7 @@ mod writer;
5960
use crate::datasource::schema_adapter::{
6061
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
6162
};
63+
pub use access_plan::ParquetAccessPlan;
6264
pub use metrics::ParquetFileMetrics;
6365
use opener::ParquetOpener;
6466
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
@@ -152,8 +154,9 @@ pub use writer::plan_to_parquet;
152154
/// the file.
153155
///
154156
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
155-
/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
156-
/// to determine what pages must be read.
157+
/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
158+
/// applying predicates. The plan and projections are used to determine what
159+
/// pages must be read.
157160
///
158161
/// * Step 4: The stream begins reading data, fetching the required pages
159162
/// and incrementally decoding them.

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! [`ParquetOpener`] for opening Parquet files
1919
2020
use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
21-
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
21+
use crate::datasource::physical_plan::parquet::row_groups::RowGroupPlanBuilder;
2222
use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index};
2323
use crate::datasource::physical_plan::{
2424
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -137,7 +137,7 @@ impl FileOpener for ParquetOpener {
137137
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
138138
let rg_metadata = file_metadata.row_groups();
139139
// track which row groups to actually read
140-
let mut row_groups = RowGroupSet::new(rg_metadata.len());
140+
let mut row_groups = RowGroupPlanBuilder::new(rg_metadata.len());
141141
// if there is a range restricting what parts of the file to read
142142
if let Some(range) = file_range.as_ref() {
143143
row_groups.prune_by_range(rg_metadata, range);
@@ -164,32 +164,35 @@ impl FileOpener for ParquetOpener {
164164
}
165165
}
166166

167+
let mut access_plan = row_groups.build();
168+
167169
// page index pruning: if all data on individual pages can
168170
// be ruled using page metadata, rows from other columns
169171
// with that range can be skipped as well
170-
if enable_page_index && !row_groups.is_empty() {
172+
if enable_page_index && !access_plan.is_empty() {
171173
if let Some(p) = page_pruning_predicate {
172-
let pruned = p.prune(
174+
access_plan = p.prune(
173175
&file_schema,
174176
builder.parquet_schema(),
175-
&row_groups,
177+
access_plan,
176178
file_metadata.as_ref(),
177179
&file_metrics,
178-
)?;
179-
if let Some(row_selection) = pruned {
180-
builder = builder.with_row_selection(row_selection);
181-
}
180+
);
182181
}
183182
}
184183

184+
if let Some(row_selection) = access_plan.overall_row_selection() {
185+
builder = builder.with_row_selection(row_selection);
186+
}
187+
185188
if let Some(limit) = limit {
186189
builder = builder.with_limit(limit)
187190
}
188191

189192
let stream = builder
190193
.with_projection(mask)
191194
.with_batch_size(batch_size)
192-
.with_row_groups(row_groups.indexes())
195+
.with_row_groups(access_plan.row_group_indexes())
193196
.build()?;
194197

195198
let adapted = stream

0 commit comments

Comments
 (0)