Skip to content

Commit 6b3db8c

Browse files
committed
Move from invoke to invoke batch
1 parent ae73371 commit 6b3db8c

File tree

121 files changed

+921
-462
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+921
-462
lines changed

datafusion-examples/examples/advanced_udf.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ impl ScalarUDFImpl for PowUdf {
9191
///
9292
/// However, it also means the implementation is more complex than when
9393
/// using `create_udf`.
94-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
94+
fn invoke_batch(
95+
&self,
96+
args: &[ColumnarValue],
97+
_number_rows: usize,
98+
) -> Result<ColumnarValue> {
9599
// DataFusion has arranged for the correct inputs to be passed to this
96100
// function, but we check again to make sure
97101
assert_eq!(args.len(), 2);

datafusion-examples/examples/optimizer_rule.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,11 @@ impl ScalarUDFImpl for MyEq {
205205
Ok(DataType::Boolean)
206206
}
207207

208-
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
208+
fn invoke_batch(
209+
&self,
210+
_args: &[ColumnarValue],
211+
_number_rows: usize,
212+
) -> Result<ColumnarValue> {
209213
// this example simply returns "true" which is not what a real
210214
// implementation would do.
211215
Ok(ColumnarValue::Scalar(ScalarValue::from(true)))

datafusion/core/src/physical_optimizer/projection_pushdown.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1382,7 +1382,11 @@ mod tests {
13821382
Ok(DataType::Int32)
13831383
}
13841384

1385-
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
1385+
fn invoke_batch(
1386+
&self,
1387+
_args: &[ColumnarValue],
1388+
_number_rows: usize,
1389+
) -> Result<ColumnarValue> {
13861390
unimplemented!("DummyUDF::invoke")
13871391
}
13881392
}

datafusion/core/tests/fuzz_cases/equivalence/utils.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,11 @@ impl ScalarUDFImpl for TestScalarUDF {
581581
Ok(input[0].sort_properties)
582582
}
583583

584-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
584+
fn invoke_batch(
585+
&self,
586+
args: &[ColumnarValue],
587+
_number_rows: usize,
588+
) -> Result<ColumnarValue> {
585589
let args = ColumnarValue::values_to_arrays(args)?;
586590

587591
let arr: ArrayRef = match args[0].data_type() {

datafusion/core/tests/user_defined/user_defined_scalar_functions.rs

+20-8
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,6 @@ impl ScalarUDFImpl for AddIndexToStringVolatileScalarUDF {
520520
Ok(self.return_type.clone())
521521
}
522522

523-
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
524-
not_impl_err!("index_with_offset function does not accept arguments")
525-
}
526-
527523
fn invoke_batch(
528524
&self,
529525
args: &[ColumnarValue],
@@ -720,7 +716,11 @@ impl ScalarUDFImpl for CastToI64UDF {
720716
Ok(ExprSimplifyResult::Simplified(new_expr))
721717
}
722718

723-
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
719+
fn invoke_batch(
720+
&self,
721+
_args: &[ColumnarValue],
722+
_number_rows: usize,
723+
) -> Result<ColumnarValue> {
724724
unimplemented!("Function should have been simplified prior to evaluation")
725725
}
726726
}
@@ -848,7 +848,11 @@ impl ScalarUDFImpl for TakeUDF {
848848
}
849849

850850
// The actual implementation
851-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
851+
fn invoke_batch(
852+
&self,
853+
args: &[ColumnarValue],
854+
_number_rows: usize,
855+
) -> Result<ColumnarValue> {
852856
let take_idx = match &args[2] {
853857
ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) if v < &2 => *v as usize,
854858
_ => unreachable!(),
@@ -956,7 +960,11 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
956960
Ok(self.return_type.clone())
957961
}
958962

959-
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
963+
fn invoke_batch(
964+
&self,
965+
_args: &[ColumnarValue],
966+
_number_rows: usize,
967+
) -> Result<ColumnarValue> {
960968
internal_err!("This function should not get invoked!")
961969
}
962970

@@ -1240,7 +1248,11 @@ impl ScalarUDFImpl for MyRegexUdf {
12401248
}
12411249
}
12421250

1243-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
1251+
fn invoke_batch(
1252+
&self,
1253+
args: &[ColumnarValue],
1254+
_number_rows: usize,
1255+
) -> Result<ColumnarValue> {
12441256
match args {
12451257
[ColumnarValue::Scalar(ScalarValue::Utf8(value))] => {
12461258
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(

datafusion/expr/src/expr.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2355,7 +2355,7 @@ mod test {
23552355
use crate::expr_fn::col;
23562356
use crate::{
23572357
case, lit, qualified_wildcard, wildcard, wildcard_with_options, ColumnarValue,
2358-
ScalarUDF, ScalarUDFImpl, Volatility,
2358+
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Volatility,
23592359
};
23602360
use sqlparser::ast;
23612361
use sqlparser::ast::{Ident, IdentWithAlias};
@@ -2484,7 +2484,7 @@ mod test {
24842484
Ok(DataType::Utf8)
24852485
}
24862486

2487-
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
2487+
fn invoke(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
24882488
Ok(ColumnarValue::Scalar(ScalarValue::from("a")))
24892489
}
24902490
}

datafusion/expr/src/expr_fn.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::function::{
2727
};
2828
use crate::{
2929
conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery,
30-
AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator,
30+
AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator, ScalarFunctionArgs,
3131
ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
3232
};
3333
use crate::{
@@ -462,8 +462,8 @@ impl ScalarUDFImpl for SimpleScalarUDF {
462462
Ok(self.return_type.clone())
463463
}
464464

465-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
466-
(self.fun)(args)
465+
fn invoke(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
466+
(self.fun)(args.args.as_slice())
467467
}
468468
}
469469

datafusion/expr/src/udf.rs

+17-87
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
use crate::expr::schema_name_from_exprs_comma_seperated_without_space;
2121
use crate::simplify::{ExprSimplifyResult, SimplifyInfo};
2222
use crate::sort_properties::{ExprProperties, SortProperties};
23-
use crate::{
24-
ColumnarValue, Documentation, Expr, ScalarFunctionImplementation, Signature,
25-
};
23+
use crate::{ColumnarValue, Documentation, Expr, Signature};
2624
use arrow::datatypes::DataType;
2725
use datafusion_common::{not_impl_err, ExprSchema, Result};
2826
use datafusion_expr_common::interval_arithmetic::Interval;
@@ -203,12 +201,6 @@ impl ScalarUDF {
203201
self.inner.simplify(args, info)
204202
}
205203

206-
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
207-
pub fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
208-
#[allow(deprecated)]
209-
self.inner.invoke(args)
210-
}
211-
212204
pub fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
213205
self.inner.is_nullable(args, schema)
214206
}
@@ -225,27 +217,9 @@ impl ScalarUDF {
225217

226218
/// Invoke the function on `args`, returning the appropriate result.
227219
///
228-
/// See [`ScalarUDFImpl::invoke_with_args`] for more details.
229-
pub fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
230-
self.inner.invoke_with_args(args)
231-
}
232-
233-
/// Invoke the function without `args` but number of rows, returning the appropriate result.
234-
///
235-
/// See [`ScalarUDFImpl::invoke_no_args`] for more details.
236-
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
237-
pub fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
238-
#[allow(deprecated)]
239-
self.inner.invoke_no_args(number_rows)
240-
}
241-
242-
/// Returns a `ScalarFunctionImplementation` that can invoke the function
243-
/// during execution
244-
#[deprecated(since = "42.0.0", note = "Use `invoke_batch` instead")]
245-
pub fn fun(&self) -> ScalarFunctionImplementation {
246-
let captured = Arc::clone(&self.inner);
247-
#[allow(deprecated)]
248-
Arc::new(move |args| captured.invoke(args))
220+
/// See [`ScalarUDFImpl::invoke`] for more details.
221+
pub fn invoke(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
222+
self.inner.invoke(args)
249223
}
250224

251225
/// Get the circuits of inner implementation
@@ -329,7 +303,7 @@ where
329303

330304
pub struct ScalarFunctionArgs<'a> {
331305
// The evaluated arguments to the function
332-
pub args: &'a [ColumnarValue],
306+
pub args: Vec<ColumnarValue>,
333307
// The number of rows in record batch being evaluated
334308
pub number_rows: usize,
335309
// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`)
@@ -353,7 +327,7 @@ pub struct ScalarFunctionArgs<'a> {
353327
/// # use std::sync::OnceLock;
354328
/// # use arrow::datatypes::DataType;
355329
/// # use datafusion_common::{DataFusionError, plan_err, Result};
356-
/// # use datafusion_expr::{col, ColumnarValue, Documentation, Signature, Volatility};
330+
/// # use datafusion_expr::{col, ColumnarValue, Documentation, ScalarFunctionArgs, Signature, Volatility};
357331
/// # use datafusion_expr::{ScalarUDFImpl, ScalarUDF};
358332
/// # use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH;
359333
///
@@ -396,7 +370,7 @@ pub struct ScalarFunctionArgs<'a> {
396370
/// Ok(DataType::Int32)
397371
/// }
398372
/// // The actual implementation would add one to the argument
399-
/// fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> { unimplemented!() }
373+
/// fn invoke(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { unimplemented!() }
400374
/// fn documentation(&self) -> Option<&Documentation> {
401375
/// Some(get_doc())
402376
/// }
@@ -490,33 +464,6 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
490464
true
491465
}
492466

493-
/// Invoke the function on `args`, returning the appropriate result
494-
///
495-
/// The function will be invoked passed with the slice of [`ColumnarValue`]
496-
/// (either scalar or array).
497-
///
498-
/// If the function does not take any arguments, please use [invoke_no_args]
499-
/// instead and return [not_impl_err] for this function.
500-
///
501-
///
502-
/// # Performance
503-
///
504-
/// For the best performance, the implementations of `invoke` should handle
505-
/// the common case when one or more of their arguments are constant values
506-
/// (aka [`ColumnarValue::Scalar`]).
507-
///
508-
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
509-
/// to arrays, which will likely be simpler code, but be slower.
510-
///
511-
/// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
512-
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
513-
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
514-
not_impl_err!(
515-
"Function {} does not implement invoke but called",
516-
self.name()
517-
)
518-
}
519-
520467
/// Invoke the function with `args` and the number of rows,
521468
/// returning the appropriate result.
522469
///
@@ -531,24 +478,15 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
531478
///
532479
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
533480
/// to arrays, which will likely be simpler code, but be slower.
534-
#[deprecated(since = "43.0.0", note = "Use `invoke_with_args` instead")]
481+
#[deprecated(since = "43.0.0", note = "Use `invoke` instead")]
535482
fn invoke_batch(
536483
&self,
537-
args: &[ColumnarValue],
538-
number_rows: usize,
484+
_args: &[ColumnarValue],
485+
_number_rows: usize,
539486
) -> Result<ColumnarValue> {
540-
match args.is_empty() {
541-
true =>
542-
{
543-
#[allow(deprecated)]
544-
self.invoke_no_args(number_rows)
545-
}
546-
false =>
547-
{
548-
#[allow(deprecated)]
549-
self.invoke(args)
550-
}
551-
}
487+
not_impl_err!(
488+
"invoke_batch, this method is deprecated implement `invoke` instead"
489+
)
552490
}
553491

554492
/// Invoke the function with `args: ScalarFunctionArgs` returning the appropriate result.
@@ -563,19 +501,11 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
563501
///
564502
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
565503
/// to arrays, which will likely be simpler code, but be slower.
566-
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
504+
/// Note that this invoke method replaces the original invoke function deprecated in
505+
/// version = 42.1.0.
506+
fn invoke(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
567507
#[allow(deprecated)]
568-
self.invoke_batch(args.args, args.number_rows)
569-
}
570-
571-
/// Invoke the function without `args`, instead the number of rows are provided,
572-
/// returning the appropriate result.
573-
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
574-
fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
575-
not_impl_err!(
576-
"Function {} does not implement invoke_no_args but called",
577-
self.name()
578-
)
508+
self.invoke_batch(args.args.as_slice(), args.number_rows)
579509
}
580510

581511
/// Returns any aliases (alternate names) for this function.

datafusion/functions-nested/src/array_has.rs

+15-3
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ impl ScalarUDFImpl for ArrayHas {
9898
Ok(DataType::Boolean)
9999
}
100100

101-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
101+
fn invoke_batch(
102+
&self,
103+
args: &[ColumnarValue],
104+
_number_rows: usize,
105+
) -> Result<ColumnarValue> {
102106
match &args[1] {
103107
ColumnarValue::Array(array_needle) => {
104108
// the needle is already an array, convert the haystack to an array of the same length
@@ -322,7 +326,11 @@ impl ScalarUDFImpl for ArrayHasAll {
322326
Ok(DataType::Boolean)
323327
}
324328

325-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
329+
fn invoke_batch(
330+
&self,
331+
args: &[ColumnarValue],
332+
_number_rows: usize,
333+
) -> Result<ColumnarValue> {
326334
make_scalar_function(array_has_all_inner)(args)
327335
}
328336

@@ -403,7 +411,11 @@ impl ScalarUDFImpl for ArrayHasAny {
403411
Ok(DataType::Boolean)
404412
}
405413

406-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
414+
fn invoke_batch(
415+
&self,
416+
args: &[ColumnarValue],
417+
_number_rows: usize,
418+
) -> Result<ColumnarValue> {
407419
make_scalar_function(array_has_any_inner)(args)
408420
}
409421

datafusion/functions-nested/src/cardinality.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,11 @@ impl ScalarUDFImpl for Cardinality {
8383
})
8484
}
8585

86-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
86+
fn invoke_batch(
87+
&self,
88+
args: &[ColumnarValue],
89+
_number_rows: usize,
90+
) -> Result<ColumnarValue> {
8791
make_scalar_function(cardinality_inner)(args)
8892
}
8993

0 commit comments

Comments
 (0)