Skip to content

Commit

Permalink
Remove BuiltInWindowFunction (LogicalPlans) (#13393)
Browse files Browse the repository at this point in the history
* Remove BuiltInWindowFunction

* fix docs

* Fix typo
  • Loading branch information
alamb authored Nov 15, 2024
1 parent e25f5e7 commit 75a27a8
Show file tree
Hide file tree
Showing 12 changed files with 27 additions and 363 deletions.
131 changes: 0 additions & 131 deletions datafusion/expr/src/built_in_window_function.rs

This file was deleted.

45 changes: 10 additions & 35 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use crate::expr_fn::binary_expr;
use crate::logical_plan::Subquery;
use crate::utils::expr_to_columns;
use crate::Volatility;
use crate::{
udaf, BuiltInWindowFunction, ExprSchemable, Operator, Signature, WindowFrame,
WindowUDF,
};
use crate::{udaf, ExprSchemable, Operator, Signature, WindowFrame, WindowUDF};

use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::cse::HashNode;
Expand Down Expand Up @@ -697,13 +694,13 @@ impl AggregateFunction {
}
}

/// WindowFunction
/// A function used as a SQL window function
///
/// In SQL, you can use:
/// - Actual window functions ([`WindowUDF`])
/// - Normal aggregate functions ([`AggregateUDF`])
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
/// Defines which implementation of an aggregate function DataFusion should call.
pub enum WindowFunctionDefinition {
/// A built in aggregate function that leverages an aggregate function
/// A a built-in window function
BuiltInWindowFunction(BuiltInWindowFunction),
/// A user defined aggregate function
AggregateUDF(Arc<crate::AggregateUDF>),
/// A user defined aggregate function
Expand All @@ -719,9 +716,6 @@ impl WindowFunctionDefinition {
display_name: &str,
) -> Result<DataType> {
match self {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
fun.return_type(input_expr_types)
}
WindowFunctionDefinition::AggregateUDF(fun) => {
fun.return_type(input_expr_types)
}
Expand All @@ -734,7 +728,6 @@ impl WindowFunctionDefinition {
/// The signatures supported by the function `fun`.
pub fn signature(&self) -> Signature {
match self {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => fun.signature(),
WindowFunctionDefinition::AggregateUDF(fun) => fun.signature().clone(),
WindowFunctionDefinition::WindowUDF(fun) => fun.signature().clone(),
}
Expand All @@ -743,7 +736,6 @@ impl WindowFunctionDefinition {
/// Function's name for display
pub fn name(&self) -> &str {
match self {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => fun.name(),
WindowFunctionDefinition::WindowUDF(fun) => fun.name(),
WindowFunctionDefinition::AggregateUDF(fun) => fun.name(),
}
Expand All @@ -753,19 +745,12 @@ impl WindowFunctionDefinition {
impl Display for WindowFunctionDefinition {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => Display::fmt(fun, f),
WindowFunctionDefinition::AggregateUDF(fun) => Display::fmt(fun, f),
WindowFunctionDefinition::WindowUDF(fun) => Display::fmt(fun, f),
}
}
}

impl From<BuiltInWindowFunction> for WindowFunctionDefinition {
fn from(value: BuiltInWindowFunction) -> Self {
Self::BuiltInWindowFunction(value)
}
}

impl From<Arc<crate::AggregateUDF>> for WindowFunctionDefinition {
fn from(value: Arc<crate::AggregateUDF>) -> Self {
Self::AggregateUDF(value)
Expand All @@ -780,26 +765,16 @@ impl From<Arc<WindowUDF>> for WindowFunctionDefinition {

/// Window function
///
/// Holds the actual actual function to call [`WindowFunction`] as well as its
/// Holds the actual function to call [`WindowFunction`] as well as its
/// arguments (`args`) and the contents of the `OVER` clause:
///
/// 1. `PARTITION BY`
/// 2. `ORDER BY`
/// 3. Window frame (e.g. `ROWS 1 PRECEDING AND 1 FOLLOWING`)
///
/// # Example
/// ```
/// # use datafusion_expr::{Expr, BuiltInWindowFunction, col, ExprFunctionExt};
/// # use datafusion_expr::expr::WindowFunction;
/// // Create FIRST_VALUE(a) OVER (PARTITION BY b ORDER BY c)
/// let expr = Expr::WindowFunction(
/// WindowFunction::new(BuiltInWindowFunction::FirstValue, vec![col("a")])
/// )
/// .partition_by(vec![col("b")])
/// .order_by(vec![col("b").sort(true, true)])
/// .build()
/// .unwrap();
/// ```
/// See [`ExprFunctionExt`] for examples of how to create a `WindowFunction`.
///
/// [`ExprFunctionExt`]: crate::ExprFunctionExt
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct WindowFunction {
/// Name of the function
Expand Down
6 changes: 0 additions & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,6 @@ impl Expr {
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
match fun {
WindowFunctionDefinition::BuiltInWindowFunction(window_fun) => {
let return_type = window_fun.return_type(&data_types)?;
let nullable =
!["RANK", "NTILE", "CUME_DIST"].contains(&window_fun.name());
Ok((return_type, nullable))
}
WindowFunctionDefinition::AggregateUDF(udaf) => {
let new_types = data_types_with_aggregate_udf(&data_types, udaf)
.map_err(|err| {
Expand Down
2 changes: 0 additions & 2 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
//!
//! The [expr_fn] module contains functions for creating expressions.

mod built_in_window_function;
mod literal;
mod operation;
mod partition_evaluator;
Expand Down Expand Up @@ -67,7 +66,6 @@ pub mod var_provider;
pub mod window_frame;
pub mod window_state;

pub use built_in_window_function::BuiltInWindowFunction;
pub use datafusion_expr_common::accumulator::Accumulator;
pub use datafusion_expr_common::columnar_value::ColumnarValue;
pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
Expand Down
12 changes: 10 additions & 2 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,16 @@ use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;

/// Logical representation of a user-defined window function (UDWF)
/// A UDWF is different from a UDF in that it is stateful across batches.
/// Logical representation of a user-defined window function (UDWF).
///
/// A Window Function is called via the SQL `OVER` clause:
///
/// ```sql
/// SELECT first_value(col) OVER (PARTITION BY a, b ORDER BY c) FROM foo;
/// ```
///
/// A UDWF is different from a user defined function (UDF) in that it is
/// stateful across batches.
///
/// See the documentation on [`PartitionEvaluator`] for more details
///
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ pub fn create_window_expr(
ignore_nulls: bool,
) -> Result<Arc<dyn WindowExpr>> {
Ok(match fun {
WindowFunctionDefinition::BuiltInWindowFunction(_fun) => {
unreachable!()
}
WindowFunctionDefinition::AggregateUDF(fun) => {
let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
.schema(Arc::new(input_schema.clone()))
Expand All @@ -120,7 +117,6 @@ pub fn create_window_expr(
aggregate,
)
}
// TODO: Ordering not supported for Window UDFs yet
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new(
create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
Expand Down
19 changes: 2 additions & 17 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -507,24 +507,9 @@ message ScalarUDFExprNode {
optional bytes fun_definition = 3;
}

enum BuiltInWindowFunction {
UNSPECIFIED = 0; // https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum
// ROW_NUMBER = 0;
// RANK = 1;
// DENSE_RANK = 2;
// PERCENT_RANK = 3;
// CUME_DIST = 4;
// NTILE = 5;
// LAG = 6;
// LEAD = 7;
// FIRST_VALUE = 8;
// LAST_VALUE = 9;
// NTH_VALUE = 10;
}

message WindowExprNode {
oneof window_function {
BuiltInWindowFunction built_in_function = 2;
// BuiltInWindowFunction built_in_function = 2;
string udaf = 3;
string udwf = 9;
}
Expand Down Expand Up @@ -866,7 +851,7 @@ message PhysicalAggregateExprNode {

message PhysicalWindowExprNode {
oneof window_function {
BuiltInWindowFunction built_in_function = 2;
// BuiltInWindowFunction built_in_function = 2;
string user_defined_aggr_function = 3;
}
repeated PhysicalExprNode args = 4;
Expand Down
Loading

0 comments on commit 75a27a8

Please sign in to comment.