Skip to content

Commit f301a73

Browse files
committed
Rewrite page filter in terms of ParquetAccessPlan
1 parent 66a5936 commit f301a73

File tree

3 files changed

+181
-141
lines changed

3 files changed

+181
-141
lines changed

datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs

+55-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use parquet::arrow::arrow_reader::RowSelection;
18+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1919

2020
/// Specifies a selection of rows and row groups within a ParquetFile to decode.
2121
///
@@ -71,6 +71,60 @@ impl ParquetAccessPlan {
7171
self.row_groups[idx].should_scan()
7272
}
7373

74+
/// Set to scan only the [`RowSelection`] in the specified row group.
75+
///
76+
/// Based on the existing row groups plan:
77+
/// * Skip: does nothing
78+
/// * Scan: Updates to scan only the rows in the `RowSelection`
79+
/// * Selection: Updates to scan only the specified in the exising selection and the new selection
80+
pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
81+
self.row_groups[idx] = match &self.row_groups[idx] {
82+
// already skipping the entire row group
83+
RowGroupAccess::Skip => RowGroupAccess::Skip,
84+
RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
85+
RowGroupAccess::Selection(existing_selection) => {
86+
RowGroupAccess::Selection(existing_selection.intersection(&selection))
87+
}
88+
}
89+
}
90+
91+
/// Return the overall RowSelection for all scanned row groups, if
92+
/// there are any RowGroupAccess::Selection;
93+
///
94+
///
95+
/// TODO better doc / explanation
96+
pub fn overall_row_selection(&self) -> Option<RowSelection> {
97+
if !self
98+
.row_groups
99+
.iter()
100+
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
101+
{
102+
return None;
103+
}
104+
105+
let total_selection: RowSelection = self
106+
.row_groups
107+
.iter()
108+
.flat_map(|rg| {
109+
match rg {
110+
RowGroupAccess::Skip => vec![],
111+
RowGroupAccess::Scan => {
112+
// need a row group access to scan the entire row group (need row group counts)
113+
// This is clearly not tested TODO
114+
todo!();
115+
}
116+
RowGroupAccess::Selection(selection) => {
117+
// todo avoid these clones
118+
let selection: Vec<RowSelector> = selection.clone().into();
119+
selection
120+
}
121+
}
122+
})
123+
.collect();
124+
125+
Some(total_selection)
126+
}
127+
74128
/// Return an iterator over the row group indexes that should be scanned
75129
pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
76130
self.row_groups.iter().enumerate().filter_map(|(idx, b)| {

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -164,26 +164,27 @@ impl FileOpener for ParquetOpener {
164164
}
165165
}
166166

167-
let access_plan = row_groups.build();
167+
let mut access_plan = row_groups.build();
168168

169169
// page index pruning: if all data on individual pages can
170170
// be ruled using page metadata, rows from other columns
171171
// with that range can be skipped as well
172172
if enable_page_index && !access_plan.is_empty() {
173173
if let Some(p) = page_pruning_predicate {
174-
let pruned = p.prune(
174+
access_plan = p.prune(
175175
&file_schema,
176176
builder.parquet_schema(),
177-
&access_plan,
177+
access_plan,
178178
file_metadata.as_ref(),
179179
&file_metrics,
180-
)?;
181-
if let Some(row_selection) = pruned {
182-
builder = builder.with_row_selection(row_selection);
183-
}
180+
);
184181
}
185182
}
186183

184+
if let Some(row_selection) = access_plan.overall_row_selection() {
185+
builder = builder.with_row_selection(row_selection);
186+
}
187+
187188
if let Some(limit) = limit {
188189
builder = builder.with_limit(limit)
189190
}

0 commit comments

Comments
 (0)