Skip to content

Commit

Permalink
only check should_skip_aggregation in partial aggr.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Sep 28, 2024
1 parent 60abcee commit 8941b16
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,12 @@ impl Stream for GroupedHashAggregateStream {
(
if self.input_done {
ExecutionState::Done
} else if self.should_skip_aggregation() {
}
// In Partial aggregation, we also need to check
// if we should trigger partial skipping
else if self.mode == AggregateMode::Partial
&& self.should_skip_aggregation()
{
ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
Expand Down Expand Up @@ -1038,24 +1043,21 @@ impl GroupedHashAggregateStream {
}

/// Updates skip aggregation probe state.
///
/// Notice: It should only be called in Partial aggregation
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
// Skip aggregation probe is only supported and called in Partial aggregation.
// And it is not supported if stream has any spills even in Partial aggregation.
// Although currently spilling is actually not supported in Partial aggregation,
// it is possible to be supported in future, so we also add an assertion for it.
assert!(self.spill_state.spills.is_empty());

// As mention above, skip aggregation probe will only be called in Partial aggregation.
// And naturally, in Partial aggregation, we should ensure `skip_aggregation_probe`
// is not `None`, so it is safe to unwrap here.
self.skip_aggregation_probe
.as_mut()
.expect("skip_aggregation_probe must be some in partial aggregation")
.update_state(input_rows, self.group_values.len());
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
// Skip aggregation probe is not supported if stream has any spills,
// currently spilling is not supported for Partial aggregation
assert!(self.spill_state.spills.is_empty());
probe.update_state(input_rows, self.group_values.len());
};
}

/// In case the probe indicates that aggregation may be
/// skipped, forces stream to produce currently accumulated output.
///
/// Notice: It should only be called in Partial aggregation
fn switch_to_skip_aggregation(&mut self) -> Result<()> {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if probe.should_skip() {
Expand All @@ -1069,6 +1071,8 @@ impl GroupedHashAggregateStream {

/// Returns true if the aggregation probe indicates that aggregation
/// should be skipped.
///
/// Notice: It should only be called in Partial aggregation
fn should_skip_aggregation(&self) -> bool {
self.skip_aggregation_probe
.as_ref()
Expand Down

0 comments on commit 8941b16

Please sign in to comment.