Skip to content

Commit e02c35d

Browse files
committed
POC: Demonstrate new GroupHashAggregate stream approach
1 parent 90b38b0 commit e02c35d

File tree

8 files changed

+801
-4
lines changed

8 files changed

+801
-4
lines changed

datafusion/core/src/physical_plan/aggregates/mod.rs

+17-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use std::sync::Arc;
4949
mod bounded_aggregate_stream;
5050
mod no_grouping;
5151
mod row_hash;
52+
mod row_hash2;
5253
mod utils;
5354

5455
pub use datafusion_expr::AggregateFunction;
@@ -58,6 +59,8 @@ use datafusion_physical_expr::utils::{
5859
get_finer_ordering, ordering_satisfy_requirement_concrete,
5960
};
6061

62+
use self::row_hash2::GroupedHashAggregateStream2;
63+
6164
/// Hash aggregate modes
6265
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6366
pub enum AggregateMode {
@@ -196,6 +199,7 @@ impl PartialEq for PhysicalGroupBy {
196199
enum StreamType {
197200
AggregateStream(AggregateStream),
198201
GroupedHashAggregateStream(GroupedHashAggregateStream),
202+
GroupedHashAggregateStream2(GroupedHashAggregateStream2),
199203
BoundedAggregate(BoundedAggregateStream),
200204
}
201205

@@ -204,6 +208,7 @@ impl From<StreamType> for SendableRecordBatchStream {
204208
match stream {
205209
StreamType::AggregateStream(stream) => Box::pin(stream),
206210
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
211+
StreamType::GroupedHashAggregateStream2(stream) => Box::pin(stream),
207212
StreamType::BoundedAggregate(stream) => Box::pin(stream),
208213
}
209214
}
@@ -711,12 +716,23 @@ impl AggregateExec {
711716
partition,
712717
aggregation_ordering,
713718
)?))
719+
} else if self.use_poc_group_by() {
720+
Ok(StreamType::GroupedHashAggregateStream2(
721+
GroupedHashAggregateStream2::new(self, context, partition)?,
722+
))
714723
} else {
715724
Ok(StreamType::GroupedHashAggregateStream(
716725
GroupedHashAggregateStream::new(self, context, partition)?,
717726
))
718727
}
719728
}
729+
730+
/// Returns true if we should use the POC group by stream
731+
/// TODO: check for actually supported aggregates, etc
732+
fn use_poc_group_by(&self) -> bool {
733+
//info!("AAL Checking POC group by: {self:#?}");
734+
true
735+
}
720736
}
721737

722738
impl ExecutionPlan for AggregateExec {
@@ -980,7 +996,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
980996
Arc::new(Schema::new(group_fields))
981997
}
982998

983-
/// returns physical expressions to evaluate against a batch
999+
/// returns physical expressions for arguments to evaluate against a batch
9841000
/// The expressions are different depending on `mode`:
9851001
/// * Partial: AggregateExpr::expressions
9861002
/// * Final: columns of `AggregateExpr::state_fields()`

datafusion/core/src/physical_plan/aggregates/row_hash.rs

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Hash aggregation through row format
1919
20+
use log::info;
2021
use std::cmp::min;
2122
use std::ops::Range;
2223
use std::sync::Arc;
@@ -119,6 +120,7 @@ impl GroupedHashAggregateStream {
119120
context: Arc<TaskContext>,
120121
partition: usize,
121122
) -> Result<Self> {
123+
info!("Creating GroupedHashAggregateStream");
122124
let agg_schema = Arc::clone(&agg.schema);
123125
let agg_group_by = agg.group_by.clone();
124126
let agg_filter_expr = agg.filter_expr.clone();

0 commit comments

Comments
 (0)