Skip to content

Commit f087efe

Browse files
committed
fix empty accumulators mode.
1 parent 1960084 commit f087efe

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

+15-11
Original file line numberDiff line numberDiff line change
@@ -512,14 +512,18 @@ impl GroupedHashAggregateStream {
512512
};
513513

514514
// We don't support blocked emission in steaming aggregation mode
515-
let emit_tos_builder = if matches!(group_ordering, GroupOrdering::None) {
516-
let group_values_support_blocked_emission =
517-
group_values.supports_blocked_emission();
518-
let accumulators_support_blocked_emission =
515+
// TODO: I am not sure, if we should disable blocked mode if `accumulator`s are empty.
516+
let group_states_ctx = if matches!(group_ordering, GroupOrdering::None)
517+
&& !accumulators.is_empty()
518+
{
519+
let is_blocked_group_values = group_values.supports_blocked_emission();
520+
let is_blocked_accumulators =
519521
accumulators.iter().all(|a| a.supports_blocked_emission());
522+
523+
// TODO: if the batch size is too small, maybe we should fallback to single block mode.
520524
GroupStatesContext::new(
521-
group_values_support_blocked_emission,
522-
accumulators_support_blocked_emission,
525+
is_blocked_group_values,
526+
is_blocked_accumulators,
523527
batch_size,
524528
)
525529
} else {
@@ -546,7 +550,7 @@ impl GroupedHashAggregateStream {
546550
spill_state,
547551
group_values_soft_limit: agg.limit,
548552
skip_aggregation_probe,
549-
group_states_ctx: emit_tos_builder,
553+
group_states_ctx,
550554
})
551555
}
552556
}
@@ -1117,13 +1121,13 @@ pub struct GroupStatesContext {
11171121

11181122
impl GroupStatesContext {
11191123
pub fn new(
1120-
group_values_support_blocked_emission: bool,
1121-
accumulators_support_blocked_emission: bool,
1124+
is_blocked_group_values: bool,
1125+
is_blocked_accumulators: bool,
11221126
block_size: usize,
11231127
) -> Self {
11241128
Self {
1125-
is_blocked_group_values: group_values_support_blocked_emission,
1126-
is_blocked_accumulators: accumulators_support_blocked_emission,
1129+
is_blocked_group_values,
1130+
is_blocked_accumulators,
11271131
block_size,
11281132
}
11291133
}

0 commit comments

Comments
 (0)