Skip to content

Commit 68f8476

Browse files
authored
Add support for AggregateExpr, WindowExpr rewrite. (#10742)
* Initial commit * Minor changes * Minor changes * Update comments
1 parent 7fd286b commit 68f8476

File tree

5 files changed

+116
-1
lines changed

5 files changed

+116
-1
lines changed

datafusion/physical-expr-common/src/aggregate/mod.rs

+34
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,40 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
185185
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
186186
not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet")
187187
}
188+
189+
/// Returns all expressions used in the [`AggregateExpr`].
190+
/// These expressions are (1)function arguments, (2) order by expressions.
191+
fn all_expressions(&self) -> AggregatePhysicalExpressions {
192+
let args = self.expressions();
193+
let order_bys = self.order_bys().unwrap_or(&[]);
194+
let order_by_exprs = order_bys
195+
.iter()
196+
.map(|sort_expr| sort_expr.expr.clone())
197+
.collect::<Vec<_>>();
198+
AggregatePhysicalExpressions {
199+
args,
200+
order_by_exprs,
201+
}
202+
}
203+
204+
/// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent
205+
/// with the return value of the [`AggregateExpr::all_expressions`] method.
206+
/// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
207+
fn with_new_expressions(
208+
&self,
209+
_args: Vec<Arc<dyn PhysicalExpr>>,
210+
_order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
211+
) -> Option<Arc<dyn AggregateExpr>> {
212+
None
213+
}
214+
}
215+
216+
/// Stores the physical expressions used inside the `AggregateExpr`.
217+
pub struct AggregatePhysicalExpressions {
218+
/// Aggregate function arguments
219+
pub args: Vec<Arc<dyn PhysicalExpr>>,
220+
/// Order by expressions
221+
pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
188222
}
189223

190224
/// Physical aggregate expression of a UDAF.

datafusion/physical-expr/src/aggregate/count.rs

+15
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,21 @@ impl AggregateExpr for Count {
260260
// instantiate specialized accumulator
261261
Ok(Box::new(CountGroupsAccumulator::new()))
262262
}
263+
264+
fn with_new_expressions(
265+
&self,
266+
args: Vec<Arc<dyn PhysicalExpr>>,
267+
order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
268+
) -> Option<Arc<dyn AggregateExpr>> {
269+
debug_assert_eq!(self.exprs.len(), args.len());
270+
debug_assert!(order_by_exprs.is_empty());
271+
Some(Arc::new(Count {
272+
name: self.name.clone(),
273+
data_type: self.data_type.clone(),
274+
nullable: self.nullable,
275+
exprs: args,
276+
}))
277+
}
263278
}
264279

265280
impl PartialEq<dyn Any> for Count {

datafusion/physical-expr/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ pub mod execution_props {
4141

4242
pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState};
4343
pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
44-
pub use datafusion_physical_expr_common::aggregate::AggregateExpr;
44+
pub use datafusion_physical_expr_common::aggregate::{
45+
AggregateExpr, AggregatePhysicalExpressions,
46+
};
4547
pub use equivalence::EquivalenceProperties;
4648
pub use partitioning::{Distribution, Partitioning};
4749
pub use physical_expr::{

datafusion/physical-expr/src/window/sliding_aggregate.rs

+25
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,31 @@ impl WindowExpr for SlidingAggregateWindowExpr {
141141
fn uses_bounded_memory(&self) -> bool {
142142
!self.window_frame.end_bound.is_unbounded()
143143
}
144+
145+
fn with_new_expressions(
146+
&self,
147+
args: Vec<Arc<dyn PhysicalExpr>>,
148+
partition_bys: Vec<Arc<dyn PhysicalExpr>>,
149+
order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
150+
) -> Option<Arc<dyn WindowExpr>> {
151+
debug_assert_eq!(self.order_by.len(), order_by_exprs.len());
152+
153+
let new_order_by = self
154+
.order_by
155+
.iter()
156+
.zip(order_by_exprs)
157+
.map(|(req, new_expr)| PhysicalSortExpr {
158+
expr: new_expr,
159+
options: req.options,
160+
})
161+
.collect::<Vec<_>>();
162+
Some(Arc::new(SlidingAggregateWindowExpr {
163+
aggregate: self.aggregate.with_new_expressions(args, vec![])?,
164+
partition_by: partition_bys,
165+
order_by: new_order_by,
166+
window_frame: self.window_frame.clone(),
167+
}))
168+
}
144169
}
145170

146171
impl AggregateWindowExpr for SlidingAggregateWindowExpr {

datafusion/physical-expr/src/window/window_expr.rs

+39
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,45 @@ pub trait WindowExpr: Send + Sync + Debug {
128128

129129
/// Get the reverse expression of this [WindowExpr].
130130
fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;
131+
132+
/// Returns all expressions used in the [`WindowExpr`].
133+
/// These expressions are (1) function arguments, (2) partition by expressions, (3) order by expressions.
134+
fn all_expressions(&self) -> WindowPhysicalExpressions {
135+
let args = self.expressions();
136+
let partition_by_exprs = self.partition_by().to_vec();
137+
let order_by_exprs = self
138+
.order_by()
139+
.iter()
140+
.map(|sort_expr| sort_expr.expr.clone())
141+
.collect::<Vec<_>>();
142+
WindowPhysicalExpressions {
143+
args,
144+
partition_by_exprs,
145+
order_by_exprs,
146+
}
147+
}
148+
149+
/// Rewrites [`WindowExpr`], with new expressions given. The argument should be consistent
150+
/// with the return value of the [`WindowExpr::all_expressions`] method.
151+
/// Returns `Some(Arc<dyn WindowExpr>)` if re-write is supported, otherwise returns `None`.
152+
fn with_new_expressions(
153+
&self,
154+
_args: Vec<Arc<dyn PhysicalExpr>>,
155+
_partition_bys: Vec<Arc<dyn PhysicalExpr>>,
156+
_order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
157+
) -> Option<Arc<dyn WindowExpr>> {
158+
None
159+
}
160+
}
161+
162+
/// Stores the physical expressions used inside the `WindowExpr`.
163+
pub struct WindowPhysicalExpressions {
164+
/// Window function arguments
165+
pub args: Vec<Arc<dyn PhysicalExpr>>,
166+
/// PARTITION BY expressions
167+
pub partition_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
168+
/// ORDER BY expressions
169+
pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
131170
}
132171

133172
/// Extension trait that adds common functionality to [`AggregateWindowExpr`]s

0 commit comments

Comments
 (0)