Skip to content

Commit dfb6435

Browse files
alambviirya
andauthored
Add ColumnarValue::values_to_arrays, deprecate columnar_values_to_array (#9114)
* Add `ColumnarValue::values_to_array` * Apply suggestions from code review Co-authored-by: Liang-Chi Hsieh <[email protected]> --------- Co-authored-by: Liang-Chi Hsieh <[email protected]>
1 parent 33b52ba commit dfb6435

File tree

4 files changed

+170
-48
lines changed

4 files changed

+170
-48
lines changed

datafusion-examples/examples/simple_udf.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use datafusion::error::Result;
2828
use datafusion::prelude::*;
2929
use datafusion_common::cast::as_float64_array;
3030
use datafusion_expr::ColumnarValue;
31-
use datafusion_physical_expr::functions::columnar_values_to_array;
3231
use std::sync::Arc;
3332

3433
/// create local execution context with an in-memory table:
@@ -71,13 +70,15 @@ async fn main() -> Result<()> {
7170
// this is guaranteed by DataFusion based on the function's signature.
7271
assert_eq!(args.len(), 2);
7372

74-
let args = columnar_values_to_array(args)?;
73+
// Expand the arguments to arrays (this is simple, but inefficient for
74+
// single constant values).
75+
let args = ColumnarValue::values_to_arrays(args)?;
7576

7677
// 1. cast both arguments to f64. These casts MUST be aligned with the signature or this function panics!
7778
let base = as_float64_array(&args[0]).expect("cast failed");
7879
let exponent = as_float64_array(&args[1]).expect("cast failed");
7980

80-
// this is guaranteed by DataFusion. We place it just to make it obvious.
81+
// The array lengths is guaranteed by DataFusion. We assert here to make it obvious.
8182
assert_eq!(exponent.len(), base.len());
8283

8384
// 2. perform the computation

datafusion/expr/src/columnar_value.rs

+163-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use arrow::array::ArrayRef;
2121
use arrow::array::NullArray;
2222
use arrow::datatypes::DataType;
23-
use datafusion_common::{Result, ScalarValue};
23+
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
2424
use std::sync::Arc;
2525

2626
/// Represents the result of evaluating an expression: either a single
@@ -75,4 +75,166 @@ impl ColumnarValue {
7575
pub fn create_null_array(num_rows: usize) -> Self {
7676
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
7777
}
78+
79+
/// Converts [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
80+
///
81+
/// # Performance Note
82+
///
83+
/// This function expands any [`ScalarValue`] to an array. This expansion
84+
/// permits using a single function in terms of arrays, but it can be
85+
/// inefficient compared to handling the scalar value directly.
86+
///
87+
/// Thus, it is recommended to provide specialized implementations for
88+
/// scalar values if performance is a concern.
89+
///
90+
/// # Errors
91+
///
92+
/// If there are multiple array arguments that have different lengths
93+
pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
94+
if args.is_empty() {
95+
return Ok(vec![]);
96+
}
97+
98+
let mut array_len = None;
99+
for arg in args {
100+
array_len = match (arg, array_len) {
101+
(ColumnarValue::Array(a), None) => Some(a.len()),
102+
(ColumnarValue::Array(a), Some(array_len)) => {
103+
if array_len == a.len() {
104+
Some(array_len)
105+
} else {
106+
return internal_err!(
107+
"Arguments has mixed length. Expected length: {array_len}, found length: {}", a.len()
108+
);
109+
}
110+
}
111+
(ColumnarValue::Scalar(_), array_len) => array_len,
112+
}
113+
}
114+
115+
// If array_len is none, it means there are only scalars, so make a 1 element array
116+
let inferred_length = array_len.unwrap_or(1);
117+
118+
let args = args
119+
.iter()
120+
.map(|arg| arg.clone().into_array(inferred_length))
121+
.collect::<Result<Vec<_>>>()?;
122+
123+
Ok(args)
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::*;
130+
131+
#[test]
132+
fn values_to_arrays() {
133+
// (input, expected)
134+
let cases = vec![
135+
// empty
136+
TestCase {
137+
input: vec![],
138+
expected: vec![],
139+
},
140+
// one array of length 3
141+
TestCase {
142+
input: vec![ColumnarValue::Array(make_array(1, 3))],
143+
expected: vec![make_array(1, 3)],
144+
},
145+
// two arrays length 3
146+
TestCase {
147+
input: vec![
148+
ColumnarValue::Array(make_array(1, 3)),
149+
ColumnarValue::Array(make_array(2, 3)),
150+
],
151+
expected: vec![make_array(1, 3), make_array(2, 3)],
152+
},
153+
// array and scalar
154+
TestCase {
155+
input: vec![
156+
ColumnarValue::Array(make_array(1, 3)),
157+
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
158+
],
159+
expected: vec![
160+
make_array(1, 3),
161+
make_array(100, 3), // scalar is expanded
162+
],
163+
},
164+
// scalar and array
165+
TestCase {
166+
input: vec![
167+
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
168+
ColumnarValue::Array(make_array(1, 3)),
169+
],
170+
expected: vec![
171+
make_array(100, 3), // scalar is expanded
172+
make_array(1, 3),
173+
],
174+
},
175+
// multiple scalars and array
176+
TestCase {
177+
input: vec![
178+
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
179+
ColumnarValue::Array(make_array(1, 3)),
180+
ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
181+
],
182+
expected: vec![
183+
make_array(100, 3), // scalar is expanded
184+
make_array(1, 3),
185+
make_array(200, 3), // scalar is expanded
186+
],
187+
},
188+
];
189+
for case in cases {
190+
case.run();
191+
}
192+
}
193+
194+
#[test]
195+
#[should_panic(
196+
expected = "Arguments has mixed length. Expected length: 3, found length: 4"
197+
)]
198+
fn values_to_arrays_mixed_length() {
199+
ColumnarValue::values_to_arrays(&[
200+
ColumnarValue::Array(make_array(1, 3)),
201+
ColumnarValue::Array(make_array(2, 4)),
202+
])
203+
.unwrap();
204+
}
205+
206+
#[test]
207+
#[should_panic(
208+
expected = "Arguments has mixed length. Expected length: 3, found length: 7"
209+
)]
210+
fn values_to_arrays_mixed_length_and_scalar() {
211+
ColumnarValue::values_to_arrays(&[
212+
ColumnarValue::Array(make_array(1, 3)),
213+
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
214+
ColumnarValue::Array(make_array(2, 7)),
215+
])
216+
.unwrap();
217+
}
218+
219+
struct TestCase {
220+
input: Vec<ColumnarValue>,
221+
expected: Vec<ArrayRef>,
222+
}
223+
224+
impl TestCase {
225+
fn run(self) {
226+
let Self { input, expected } = self;
227+
228+
assert_eq!(
229+
ColumnarValue::values_to_arrays(&input).unwrap(),
230+
expected,
231+
"\ninput: {input:?}\nexpected: {expected:?}"
232+
);
233+
}
234+
}
235+
236+
/// Makes an array of length `len` with all elements set to `val`
237+
fn make_array(val: i32, len: usize) -> ArrayRef {
238+
Arc::new(arrow::array::Int32Array::from(vec![val; len]))
239+
}
78240
}

datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1347,7 +1347,6 @@ mod tests {
13471347
use datafusion_physical_expr::execution_props::ExecutionProps;
13481348

13491349
use chrono::{DateTime, TimeZone, Utc};
1350-
use datafusion_physical_expr::functions::columnar_values_to_array;
13511350

13521351
// ------------------------------
13531352
// --- ExprSimplifier tests -----
@@ -1461,7 +1460,7 @@ mod tests {
14611460
let return_type = Arc::new(DataType::Int32);
14621461

14631462
let fun = Arc::new(|args: &[ColumnarValue]| {
1464-
let args = columnar_values_to_array(args)?;
1463+
let args = ColumnarValue::values_to_arrays(args)?;
14651464

14661465
let arg0 = as_int32_array(&args[0])?;
14671466
let arg1 = as_int32_array(&args[1])?;

datafusion/physical-expr/src/functions.rs

+2-42
Original file line numberDiff line numberDiff line change
@@ -173,49 +173,9 @@ pub(crate) enum Hint {
173173
AcceptsSingular,
174174
}
175175

176-
/// A helper function used to infer the length of arguments of Scalar functions and convert
177-
/// [`ColumnarValue`]s to [`ArrayRef`]s with the inferred length. Note that this function
178-
/// only works for functions that accept either that all arguments are scalars or all arguments
179-
/// are arrays with same length. Otherwise, it will return an error.
176+
#[deprecated(since = "36.0.0", note = "Use ColumarValue::values_to_arrays instead")]
180177
pub fn columnar_values_to_array(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
181-
if args.is_empty() {
182-
return Ok(vec![]);
183-
}
184-
185-
let len = args
186-
.iter()
187-
.fold(Option::<usize>::None, |acc, arg| match arg {
188-
ColumnarValue::Scalar(_) if acc.is_none() => Some(1),
189-
ColumnarValue::Scalar(_) => {
190-
if let Some(1) = acc {
191-
acc
192-
} else {
193-
None
194-
}
195-
}
196-
ColumnarValue::Array(a) => {
197-
if let Some(l) = acc {
198-
if l == a.len() {
199-
acc
200-
} else {
201-
None
202-
}
203-
} else {
204-
Some(a.len())
205-
}
206-
}
207-
});
208-
209-
let inferred_length = len.ok_or(DataFusionError::Internal(
210-
"Arguments has mixed length".to_string(),
211-
))?;
212-
213-
let args = args
214-
.iter()
215-
.map(|arg| arg.clone().into_array(inferred_length))
216-
.collect::<Result<Vec<_>>>()?;
217-
218-
Ok(args)
178+
ColumnarValue::values_to_arrays(args)
219179
}
220180

221181
/// Decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function

0 commit comments

Comments
 (0)