Skip to content

Commit 02b9693

Browse files
authored
Convert ntile builtIn function to UDWF (#13040)
* converting to ntile udwf * updated the window functions documentation file * wip: update the ntile udwf function * fix the roundtrip_logical_plan.rs * removed builtIn ntile function * fixed field name issue * fixing the return type of ntile udwf * error if UInt64 conversion fails * handling if null is found * handling if value is zero or less than zero * removed unused import * updated prost.rs file * removed dead code * fixed clippy error * added inner doc comment * minor fixes and added roundtrip logical plan test * removed parse_expr in ntile
1 parent 13a4225 commit 02b9693

File tree

19 files changed

+221
-276
lines changed

19 files changed

+221
-276
lines changed

datafusion/expr/src/built_in_window_function.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
4040
/// [Window Function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
4141
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
4242
pub enum BuiltInWindowFunction {
43-
/// Integer ranging from 1 to the argument value, dividing the partition as equally as possible
44-
Ntile,
4543
/// returns value evaluated at the row that is the first row of the window frame
4644
FirstValue,
4745
/// Returns value evaluated at the row that is the last row of the window frame
@@ -54,7 +52,6 @@ impl BuiltInWindowFunction {
5452
pub fn name(&self) -> &str {
5553
use BuiltInWindowFunction::*;
5654
match self {
57-
Ntile => "NTILE",
5855
FirstValue => "first_value",
5956
LastValue => "last_value",
6057
NthValue => "NTH_VALUE",
@@ -66,7 +63,6 @@ impl FromStr for BuiltInWindowFunction {
6663
type Err = DataFusionError;
6764
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
6865
Ok(match name.to_uppercase().as_str() {
69-
"NTILE" => BuiltInWindowFunction::Ntile,
7066
"FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
7167
"LAST_VALUE" => BuiltInWindowFunction::LastValue,
7268
"NTH_VALUE" => BuiltInWindowFunction::NthValue,
@@ -97,7 +93,6 @@ impl BuiltInWindowFunction {
9793
})?;
9894

9995
match self {
100-
BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
10196
BuiltInWindowFunction::FirstValue
10297
| BuiltInWindowFunction::LastValue
10398
| BuiltInWindowFunction::NthValue => Ok(input_expr_types[0].clone()),
@@ -111,20 +106,6 @@ impl BuiltInWindowFunction {
111106
BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue => {
112107
Signature::any(1, Volatility::Immutable)
113108
}
114-
BuiltInWindowFunction::Ntile => Signature::uniform(
115-
1,
116-
vec![
117-
DataType::UInt64,
118-
DataType::UInt32,
119-
DataType::UInt16,
120-
DataType::UInt8,
121-
DataType::Int64,
122-
DataType::Int32,
123-
DataType::Int16,
124-
DataType::Int8,
125-
],
126-
Volatility::Immutable,
127-
),
128109
BuiltInWindowFunction::NthValue => Signature::any(2, Volatility::Immutable),
129110
}
130111
}

datafusion/expr/src/expr.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2567,18 +2567,9 @@ mod test {
25672567
Ok(())
25682568
}
25692569

2570-
#[test]
2571-
fn test_ntile_return_type() -> Result<()> {
2572-
let fun = find_df_window_func("ntile").unwrap();
2573-
let observed = fun.return_type(&[DataType::Int16], &[true], "")?;
2574-
assert_eq!(DataType::UInt64, observed);
2575-
2576-
Ok(())
2577-
}
2578-
25792570
#[test]
25802571
fn test_window_function_case_insensitive() -> Result<()> {
2581-
let names = vec!["ntile", "first_value", "last_value", "nth_value"];
2572+
let names = vec!["first_value", "last_value", "nth_value"];
25822573
for name in names {
25832574
let fun = find_df_window_func(name).unwrap();
25842575
let fun2 = find_df_window_func(name.to_uppercase().as_str()).unwrap();

datafusion/expr/src/window_function.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717

1818
use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};
1919

20-
/// Create an expression to represent the `ntile` window function
21-
pub fn ntile(arg: Expr) -> Expr {
22-
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Ntile, vec![arg]))
23-
}
24-
2520
/// Create an expression to represent the `nth_value` window function
2621
pub fn nth_value(arg: Expr, n: i64) -> Expr {
2722
Expr::WindowFunction(WindowFunction::new(

datafusion/functions-window/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub mod macros;
3434

3535
pub mod cume_dist;
3636
pub mod lead_lag;
37-
37+
pub mod ntile;
3838
pub mod rank;
3939
pub mod row_number;
4040
mod utils;
@@ -44,6 +44,7 @@ pub mod expr_fn {
4444
pub use super::cume_dist::cume_dist;
4545
pub use super::lead_lag::lag;
4646
pub use super::lead_lag::lead;
47+
pub use super::ntile::ntile;
4748
pub use super::rank::{dense_rank, percent_rank, rank};
4849
pub use super::row_number::row_number;
4950
}
@@ -58,6 +59,7 @@ pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
5859
rank::rank_udwf(),
5960
rank::dense_rank_udwf(),
6061
rank::percent_rank_udwf(),
62+
ntile::ntile_udwf(),
6163
]
6264
}
6365
/// Registers all enabled packages with a [`FunctionRegistry`]
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! `ntile` window function implementation
19+
20+
use std::any::Any;
21+
use std::fmt::Debug;
22+
use std::sync::{Arc, OnceLock};
23+
24+
use crate::utils::{
25+
get_scalar_value_from_args, get_signed_integer, get_unsigned_integer,
26+
};
27+
use datafusion_common::arrow::array::{ArrayRef, UInt64Array};
28+
use datafusion_common::arrow::datatypes::{DataType, Field};
29+
use datafusion_common::{exec_err, DataFusionError, Result};
30+
use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
31+
use datafusion_expr::{
32+
Documentation, Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
33+
};
34+
use datafusion_functions_window_common::field;
35+
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
36+
use field::WindowUDFFieldArgs;
37+
38+
get_or_init_udwf!(
39+
Ntile,
40+
ntile,
41+
"integer ranging from 1 to the argument value, dividing the partition as equally as possible"
42+
);
43+
44+
pub fn ntile(arg: Expr) -> Expr {
45+
ntile_udwf().call(vec![arg])
46+
}
47+
48+
#[derive(Debug)]
49+
pub struct Ntile {
50+
signature: Signature,
51+
}
52+
53+
impl Ntile {
54+
/// Create a new `ntile` function
55+
pub fn new() -> Self {
56+
Self {
57+
signature: Signature::uniform(
58+
1,
59+
vec![
60+
DataType::UInt64,
61+
DataType::UInt32,
62+
DataType::UInt16,
63+
DataType::UInt8,
64+
DataType::Int64,
65+
DataType::Int32,
66+
DataType::Int16,
67+
DataType::Int8,
68+
],
69+
Volatility::Immutable,
70+
),
71+
}
72+
}
73+
}
74+
75+
impl Default for Ntile {
76+
fn default() -> Self {
77+
Self::new()
78+
}
79+
}
80+
81+
static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
82+
83+
fn get_ntile_doc() -> &'static Documentation {
84+
DOCUMENTATION.get_or_init(|| {
85+
Documentation::builder()
86+
.with_doc_section(DOC_SECTION_RANKING)
87+
.with_description(
88+
"Integer ranging from 1 to the argument value, dividing the partition as equally as possible",
89+
)
90+
.with_syntax_example("ntile(expression)")
91+
.with_argument("expression","An integer describing the number groups the partition should be split into")
92+
.build()
93+
.unwrap()
94+
})
95+
}
96+
97+
impl WindowUDFImpl for Ntile {
98+
fn as_any(&self) -> &dyn Any {
99+
self
100+
}
101+
102+
fn name(&self) -> &str {
103+
"ntile"
104+
}
105+
106+
fn signature(&self) -> &Signature {
107+
&self.signature
108+
}
109+
110+
fn partition_evaluator(
111+
&self,
112+
partition_evaluator_args: PartitionEvaluatorArgs,
113+
) -> Result<Box<dyn PartitionEvaluator>> {
114+
let scalar_n =
115+
get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 0)?
116+
.ok_or_else(|| {
117+
DataFusionError::Execution(
118+
"NTILE requires a positive integer".to_string(),
119+
)
120+
})?;
121+
122+
if scalar_n.is_null() {
123+
return exec_err!("NTILE requires a positive integer, but finds NULL");
124+
}
125+
126+
if scalar_n.is_unsigned() {
127+
let n = get_unsigned_integer(scalar_n)?;
128+
Ok(Box::new(NtileEvaluator { n }))
129+
} else {
130+
let n: i64 = get_signed_integer(scalar_n)?;
131+
if n <= 0 {
132+
return exec_err!("NTILE requires a positive integer");
133+
}
134+
Ok(Box::new(NtileEvaluator { n: n as u64 }))
135+
}
136+
}
137+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
138+
let nullable = false;
139+
140+
Ok(Field::new(field_args.name(), DataType::UInt64, nullable))
141+
}
142+
143+
fn documentation(&self) -> Option<&Documentation> {
144+
Some(get_ntile_doc())
145+
}
146+
}
147+
148+
#[derive(Debug)]
149+
struct NtileEvaluator {
150+
n: u64,
151+
}
152+
153+
impl PartitionEvaluator for NtileEvaluator {
154+
fn evaluate_all(
155+
&mut self,
156+
_values: &[ArrayRef],
157+
num_rows: usize,
158+
) -> Result<ArrayRef> {
159+
let num_rows = num_rows as u64;
160+
let mut vec: Vec<u64> = Vec::new();
161+
let n = u64::min(self.n, num_rows);
162+
for i in 0..num_rows {
163+
let res = i * n / num_rows;
164+
vec.push(res + 1)
165+
}
166+
Ok(Arc::new(UInt64Array::from(vec)))
167+
}
168+
}

datafusion/functions-window/src/utils.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,15 @@ pub(crate) fn get_scalar_value_from_args(
5151
None
5252
})
5353
}
54+
55+
pub(crate) fn get_unsigned_integer(value: ScalarValue) -> Result<u64> {
56+
if value.is_null() {
57+
return Ok(0);
58+
}
59+
60+
if !value.data_type().is_integer() {
61+
return exec_err!("Expected an integer value");
62+
}
63+
64+
value.cast_to(&DataType::UInt64)?.try_into()
65+
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ mod unknown_column;
3636
/// Module with some convenient methods used in expression building
3737
pub use crate::aggregate::stats::StatsType;
3838
pub use crate::window::nth_value::NthValue;
39-
pub use crate::window::ntile::Ntile;
4039
pub use crate::PhysicalSortExpr;
4140

4241
pub use binary::{binary, similar_to, BinaryExpr};

datafusion/physical-expr/src/window/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ mod aggregate;
1919
mod built_in;
2020
mod built_in_window_function_expr;
2121
pub(crate) mod nth_value;
22-
pub(crate) mod ntile;
2322
mod sliding_aggregate;
2423
mod window_expr;
2524

0 commit comments

Comments
 (0)