diff --git a/datafusion-examples/examples/simple_udf.rs b/datafusion-examples/examples/simple_udf.rs index dda6ba62e0af..64cf7857e223 100644 --- a/datafusion-examples/examples/simple_udf.rs +++ b/datafusion-examples/examples/simple_udf.rs @@ -28,7 +28,6 @@ use datafusion::error::Result; use datafusion::prelude::*; use datafusion_common::cast::as_float64_array; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr::functions::columnar_values_to_array; use std::sync::Arc; /// create local execution context with an in-memory table: @@ -71,13 +70,15 @@ async fn main() -> Result<()> { // this is guaranteed by DataFusion based on the function's signature. assert_eq!(args.len(), 2); - let args = columnar_values_to_array(args)?; + // Expand the arguments to arrays (this is simple, but inefficient for + // single constant values). + let args = ColumnarValue::values_to_arrays(args)?; // 1. cast both arguments to f64. These casts MUST be aligned with the signature or this function panics! let base = as_float64_array(&args[0]).expect("cast failed"); let exponent = as_float64_array(&args[1]).expect("cast failed"); - // this is guaranteed by DataFusion. We place it just to make it obvious. + // The array lengths is guaranteed by DataFusion. We assert here to make it obvious. assert_eq!(exponent.len(), base.len()); // 2. perform the computation diff --git a/datafusion/expr/src/columnar_value.rs b/datafusion/expr/src/columnar_value.rs index 58c534b50aad..585bee3b9bfa 100644 --- a/datafusion/expr/src/columnar_value.rs +++ b/datafusion/expr/src/columnar_value.rs @@ -20,7 +20,7 @@ use arrow::array::ArrayRef; use arrow::array::NullArray; use arrow::datatypes::DataType; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; use std::sync::Arc; /// Represents the result of evaluating an expression: either a single @@ -75,4 +75,166 @@ impl ColumnarValue { pub fn create_null_array(num_rows: usize) -> Self { ColumnarValue::Array(Arc::new(NullArray::new(num_rows))) } + + /// Converts [`ColumnarValue`]s to [`ArrayRef`]s with the same length. + /// + /// # Performance Note + /// + /// This function expands any [`ScalarValue`] to an array. This expansion + /// permits using a single function in terms of arrays, but it can be + /// inefficient compared to handling the scalar value directly. + /// + /// Thus, it is recommended to provide specialized implementations for + /// scalar values if performance is a concern. + /// + /// # Errors + /// + /// If there are multiple array arguments that have different lengths + pub fn values_to_arrays(args: &[ColumnarValue]) -> Result> { + if args.is_empty() { + return Ok(vec![]); + } + + let mut array_len = None; + for arg in args { + array_len = match (arg, array_len) { + (ColumnarValue::Array(a), None) => Some(a.len()), + (ColumnarValue::Array(a), Some(array_len)) => { + if array_len == a.len() { + Some(array_len) + } else { + return internal_err!( + "Arguments has mixed length. Expected length: {array_len}, found length: {}", a.len() + ); + } + } + (ColumnarValue::Scalar(_), array_len) => array_len, + } + } + + // If array_len is none, it means there are only scalars, so make a 1 element array + let inferred_length = array_len.unwrap_or(1); + + let args = args + .iter() + .map(|arg| arg.clone().into_array(inferred_length)) + .collect::>>()?; + + Ok(args) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn values_to_arrays() { + // (input, expected) + let cases = vec![ + // empty + TestCase { + input: vec![], + expected: vec![], + }, + // one array of length 3 + TestCase { + input: vec![ColumnarValue::Array(make_array(1, 3))], + expected: vec![make_array(1, 3)], + }, + // two arrays length 3 + TestCase { + input: vec![ + ColumnarValue::Array(make_array(1, 3)), + ColumnarValue::Array(make_array(2, 3)), + ], + expected: vec![make_array(1, 3), make_array(2, 3)], + }, + // array and scalar + TestCase { + input: vec![ + ColumnarValue::Array(make_array(1, 3)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(100))), + ], + expected: vec![ + make_array(1, 3), + make_array(100, 3), // scalar is expanded + ], + }, + // scalar and array + TestCase { + input: vec![ + ColumnarValue::Scalar(ScalarValue::Int32(Some(100))), + ColumnarValue::Array(make_array(1, 3)), + ], + expected: vec![ + make_array(100, 3), // scalar is expanded + make_array(1, 3), + ], + }, + // multiple scalars and array + TestCase { + input: vec![ + ColumnarValue::Scalar(ScalarValue::Int32(Some(100))), + ColumnarValue::Array(make_array(1, 3)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(200))), + ], + expected: vec![ + make_array(100, 3), // scalar is expanded + make_array(1, 3), + make_array(200, 3), // scalar is expanded + ], + }, + ]; + for case in cases { + case.run(); + } + } + + #[test] + #[should_panic( + expected = "Arguments has mixed length. Expected length: 3, found length: 4" + )] + fn values_to_arrays_mixed_length() { + ColumnarValue::values_to_arrays(&[ + ColumnarValue::Array(make_array(1, 3)), + ColumnarValue::Array(make_array(2, 4)), + ]) + .unwrap(); + } + + #[test] + #[should_panic( + expected = "Arguments has mixed length. Expected length: 3, found length: 7" + )] + fn values_to_arrays_mixed_length_and_scalar() { + ColumnarValue::values_to_arrays(&[ + ColumnarValue::Array(make_array(1, 3)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(100))), + ColumnarValue::Array(make_array(2, 7)), + ]) + .unwrap(); + } + + struct TestCase { + input: Vec, + expected: Vec, + } + + impl TestCase { + fn run(self) { + let Self { input, expected } = self; + + assert_eq!( + ColumnarValue::values_to_arrays(&input).unwrap(), + expected, + "\ninput: {input:?}\nexpected: {expected:?}" + ); + } + } + + /// Makes an array of length `len` with all elements set to `val` + fn make_array(val: i32, len: usize) -> ArrayRef { + Arc::new(arrow::array::Int32Array::from(vec![val; len])) + } } diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 87d81abc4b15..ab62cf8646e8 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -1347,7 +1347,6 @@ mod tests { use datafusion_physical_expr::execution_props::ExecutionProps; use chrono::{DateTime, TimeZone, Utc}; - use datafusion_physical_expr::functions::columnar_values_to_array; // ------------------------------ // --- ExprSimplifier tests ----- @@ -1461,7 +1460,7 @@ mod tests { let return_type = Arc::new(DataType::Int32); let fun = Arc::new(|args: &[ColumnarValue]| { - let args = columnar_values_to_array(args)?; + let args = ColumnarValue::values_to_arrays(args)?; let arg0 = as_int32_array(&args[0])?; let arg1 = as_int32_array(&args[1])?; diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 21eaeab7213a..cbd780a8fb32 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -173,49 +173,9 @@ pub(crate) enum Hint { AcceptsSingular, } -/// A helper function used to infer the length of arguments of Scalar functions and convert -/// [`ColumnarValue`]s to [`ArrayRef`]s with the inferred length. Note that this function -/// only works for functions that accept either that all arguments are scalars or all arguments -/// are arrays with same length. Otherwise, it will return an error. +#[deprecated(since = "36.0.0", note = "Use ColumarValue::values_to_arrays instead")] pub fn columnar_values_to_array(args: &[ColumnarValue]) -> Result> { - if args.is_empty() { - return Ok(vec![]); - } - - let len = args - .iter() - .fold(Option::::None, |acc, arg| match arg { - ColumnarValue::Scalar(_) if acc.is_none() => Some(1), - ColumnarValue::Scalar(_) => { - if let Some(1) = acc { - acc - } else { - None - } - } - ColumnarValue::Array(a) => { - if let Some(l) = acc { - if l == a.len() { - acc - } else { - None - } - } else { - Some(a.len()) - } - } - }); - - let inferred_length = len.ok_or(DataFusionError::Internal( - "Arguments has mixed length".to_string(), - ))?; - - let args = args - .iter() - .map(|arg| arg.clone().into_array(inferred_length)) - .collect::>>()?; - - Ok(args) + ColumnarValue::values_to_arrays(args) } /// Decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function