Skip to content

Commit 524e56d

Browse files
lewiszlwalamb
andauthored
Simplify update_skip_aggregation_probe method (#12332)
* Simplify update_skip_aggregation_probe method * Add assert & doc --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 26c8004 commit 524e56d

File tree

2 files changed

+5
-19
lines changed

2 files changed

+5
-19
lines changed

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl GroupsAccumulatorAdapter {
258258
opt_filter.as_ref().map(|f| f.as_boolean()),
259259
offsets,
260260
)?;
261-
(f)(state.accumulator.as_mut(), &values_to_accumulate)?;
261+
f(state.accumulator.as_mut(), &values_to_accumulate)?;
262262

263263
// clear out the state so they are empty for next
264264
// iteration

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,6 @@ impl SkipAggregationProbe {
187187
self.should_skip
188188
}
189189

190-
/// Provides an ability to externally set `should_skip` flag
191-
/// to `false` and prohibit further state updates
192-
fn forbid_skipping(&mut self) {
193-
self.should_skip = false;
194-
self.is_locked = true;
195-
}
196-
197190
/// Record the number of rows that were output directly without aggregation
198191
fn record_skipped(&mut self, batch: &RecordBatch) {
199192
self.skipped_aggregation_rows.add(batch.num_rows());
@@ -1009,19 +1002,12 @@ impl GroupedHashAggregateStream {
10091002
}
10101003

10111004
/// Updates skip aggregation probe state.
1012-
///
1013-
/// In case stream has any spills, the probe is forcefully set to
1014-
/// forbid aggregation skipping, and locked, since spilling resets
1015-
/// total number of unique groups.
1016-
///
1017-
/// Note: currently spilling is not supported for Partial aggregation
10181005
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
10191006
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
1020-
if !self.spill_state.spills.is_empty() {
1021-
probe.forbid_skipping();
1022-
} else {
1023-
probe.update_state(input_rows, self.group_values.len());
1024-
}
1007+
// Skip aggregation probe is not supported if stream has any spills,
1008+
// currently spilling is not supported for Partial aggregation
1009+
assert!(self.spill_state.spills.is_empty());
1010+
probe.update_state(input_rows, self.group_values.len());
10251011
};
10261012
}
10271013

0 commit comments

Comments
 (0)