Skip to content

Cleanup WindowExpr and AggregateExpr trait #12069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions datafusion/functions-aggregate-common/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,32 +135,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet")
}

/// Returns all expressions used in the [`AggregateExpr`].
/// These expressions are (1)function arguments, (2) order by expressions.
fn all_expressions(&self) -> AggregatePhysicalExpressions {
let args = self.expressions();
let order_bys = self.order_bys().unwrap_or(&[]);
let order_by_exprs = order_bys
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
AggregatePhysicalExpressions {
args,
order_by_exprs,
}
}

/// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent
/// with the return value of the [`AggregateExpr::all_expressions`] method.
/// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
fn with_new_expressions(
&self,
_args: Vec<Arc<dyn PhysicalExpr>>,
_order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn AggregateExpr>> {
None
}

/// If this function is max, return (output_field, true)
/// if the function is min, return (output_field, false)
/// otherwise return None (the default)
Expand All @@ -172,11 +146,3 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
None
}
}

/// Stores the physical expressions used inside the `AggregateExpr`.
pub struct AggregatePhysicalExpressions {
/// Aggregate function arguments
pub args: Vec<Arc<dyn PhysicalExpr>>,
/// Order by expressions
pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
}
4 changes: 1 addition & 3 deletions datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ pub mod execution_props {

pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState};
pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
pub use datafusion_functions_aggregate_common::aggregate::{
AggregateExpr, AggregatePhysicalExpressions,
};
pub use datafusion_functions_aggregate_common::aggregate::AggregateExpr;
pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties};
pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{
Expand Down
25 changes: 0 additions & 25 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,31 +141,6 @@ impl WindowExpr for SlidingAggregateWindowExpr {
fn uses_bounded_memory(&self) -> bool {
!self.window_frame.end_bound.is_unbounded()
}

fn with_new_expressions(
&self,
args: Vec<Arc<dyn PhysicalExpr>>,
partition_bys: Vec<Arc<dyn PhysicalExpr>>,
order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn WindowExpr>> {
debug_assert_eq!(self.order_by.len(), order_by_exprs.len());

let new_order_by = self
.order_by
.iter()
.zip(order_by_exprs)
.map(|(req, new_expr)| PhysicalSortExpr {
expr: new_expr,
options: req.options,
})
.collect::<Vec<_>>();
Some(Arc::new(SlidingAggregateWindowExpr {
aggregate: self.aggregate.with_new_expressions(args, vec![])?,
partition_by: partition_bys,
order_by: new_order_by,
window_frame: Arc::clone(&self.window_frame),
}))
}
}

impl AggregateWindowExpr for SlidingAggregateWindowExpr {
Expand Down
39 changes: 0 additions & 39 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,45 +128,6 @@ pub trait WindowExpr: Send + Sync + Debug {

/// Get the reverse expression of this [WindowExpr].
fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;

/// Returns all expressions used in the [`WindowExpr`].
/// These expressions are (1) function arguments, (2) partition by expressions, (3) order by expressions.
fn all_expressions(&self) -> WindowPhysicalExpressions {
let args = self.expressions();
let partition_by_exprs = self.partition_by().to_vec();
let order_by_exprs = self
.order_by()
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
WindowPhysicalExpressions {
args,
partition_by_exprs,
order_by_exprs,
}
}

/// Rewrites [`WindowExpr`], with new expressions given. The argument should be consistent
/// with the return value of the [`WindowExpr::all_expressions`] method.
/// Returns `Some(Arc<dyn WindowExpr>)` if re-write is supported, otherwise returns `None`.
fn with_new_expressions(
&self,
_args: Vec<Arc<dyn PhysicalExpr>>,
_partition_bys: Vec<Arc<dyn PhysicalExpr>>,
_order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn WindowExpr>> {
None
}
}

/// Stores the physical expressions used inside the `WindowExpr`.
pub struct WindowPhysicalExpressions {
/// Window function arguments
pub args: Vec<Arc<dyn PhysicalExpr>>,
/// PARTITION BY expressions
pub partition_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
/// ORDER BY expressions
pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
}

/// Extension trait that adds common functionality to [`AggregateWindowExpr`]s
Expand Down