Skip to content

Commit 3ee9b3d

Browse files
authored
Performance: enable array allocation reuse (ScalarFunctionArgs gets owned ColumnReference) (#13637)
* Improve documentation * Pass owned args to ScalarFunctionArgs * Update advanced_udf with example of reusing arrays * clarify rationale for cloning * clarify comments * fix expected output
1 parent f2de2c4 commit 3ee9b3d

27 files changed

+288
-212
lines changed

datafusion-examples/examples/advanced_udf.rs

Lines changed: 100 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ use arrow::record_batch::RecordBatch;
2727
use datafusion::error::Result;
2828
use datafusion::logical_expr::Volatility;
2929
use datafusion::prelude::*;
30-
use datafusion_common::{internal_err, ScalarValue};
30+
use datafusion_common::{exec_err, internal_err, ScalarValue};
3131
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
32-
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};
32+
use datafusion_expr::{
33+
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
34+
};
3335

3436
/// This example shows how to use the full ScalarUDFImpl API to implement a user
3537
/// defined function. As in the `simple_udf.rs` example, this struct implements
@@ -83,23 +85,27 @@ impl ScalarUDFImpl for PowUdf {
8385
Ok(DataType::Float64)
8486
}
8587

86-
/// This is the function that actually calculates the results.
88+
/// This function actually calculates the results of the scalar function.
89+
///
90+
/// This is the same way that functions provided with DataFusion are invoked,
91+
/// which permits important special cases:
8792
///
88-
/// This is the same way that functions built into DataFusion are invoked,
89-
/// which permits important special cases when one or both of the arguments
90-
/// are single values (constants). For example `pow(a, 2)`
93+
///1. When one or both of the arguments are single values (constants).
94+
/// For example `pow(a, 2)`
95+
/// 2. When the input arrays can be reused (avoid allocating a new output array)
9196
///
9297
/// However, it also means the implementation is more complex than when
9398
/// using `create_udf`.
94-
fn invoke_batch(
95-
&self,
96-
args: &[ColumnarValue],
97-
_number_rows: usize,
98-
) -> Result<ColumnarValue> {
99+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
100+
// The other fields of the `args` struct are used for more specialized
101+
// uses, and are not needed in this example
102+
let ScalarFunctionArgs { mut args, .. } = args;
99103
// DataFusion has arranged for the correct inputs to be passed to this
100104
// function, but we check again to make sure
101105
assert_eq!(args.len(), 2);
102-
let (base, exp) = (&args[0], &args[1]);
106+
// take ownership of arguments by popping in reverse order
107+
let exp = args.pop().unwrap();
108+
let base = args.pop().unwrap();
103109
assert_eq!(base.data_type(), DataType::Float64);
104110
assert_eq!(exp.data_type(), DataType::Float64);
105111

@@ -118,7 +124,7 @@ impl ScalarUDFImpl for PowUdf {
118124
) => {
119125
// compute the output. Note DataFusion treats `None` as NULL.
120126
let res = match (base, exp) {
121-
(Some(base), Some(exp)) => Some(base.powf(*exp)),
127+
(Some(base), Some(exp)) => Some(base.powf(exp)),
122128
// one or both arguments were NULL
123129
_ => None,
124130
};
@@ -140,31 +146,33 @@ impl ScalarUDFImpl for PowUdf {
140146
// kernel creates very fast "vectorized" code and
141147
// handles things like null values for us.
142148
let res: Float64Array =
143-
compute::unary(base_array, |base| base.powf(*exp));
149+
compute::unary(base_array, |base| base.powf(exp));
144150
Arc::new(res)
145151
}
146152
};
147153
Ok(ColumnarValue::Array(result_array))
148154
}
149155

150-
// special case if the base is a constant (note this code is quite
151-
// similar to the previous case, so we omit comments)
156+
// special case if the base is a constant.
157+
//
158+
// Note this case is very similar to the previous case, so we could
159+
// use the same pattern. However, for this case we demonstrate an
160+
// even more advanced pattern to potentially avoid allocating a new array
152161
(
153162
ColumnarValue::Scalar(ScalarValue::Float64(base)),
154163
ColumnarValue::Array(exp_array),
155164
) => {
156165
let res = match base {
157166
None => new_null_array(exp_array.data_type(), exp_array.len()),
158-
Some(base) => {
159-
let exp_array = exp_array.as_primitive::<Float64Type>();
160-
let res: Float64Array =
161-
compute::unary(exp_array, |exp| base.powf(exp));
162-
Arc::new(res)
163-
}
167+
Some(base) => maybe_pow_in_place(base, exp_array)?,
164168
};
165169
Ok(ColumnarValue::Array(res))
166170
}
167-
// Both arguments are arrays so we have to perform the calculation for every row
171+
// Both arguments are arrays so we have to perform the calculation
172+
// for every row
173+
//
174+
// Note this could also be done in place using `binary_mut` as
175+
// is done in `maybe_pow_in_place` but here we use binary for simplicity
168176
(ColumnarValue::Array(base_array), ColumnarValue::Array(exp_array)) => {
169177
let res: Float64Array = compute::binary(
170178
base_array.as_primitive::<Float64Type>(),
@@ -191,6 +199,52 @@ impl ScalarUDFImpl for PowUdf {
191199
}
192200
}
193201

202+
/// Evaluate `base ^ exp` *without* allocating a new array, if possible
203+
fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
204+
// Calling `unary` creates a new array for the results. Avoiding
205+
// allocations is a common optimization in performance critical code.
206+
// arrow-rs allows this optimization via the `unary_mut`
207+
// and `binary_mut` kernels in certain cases
208+
//
209+
// These kernels can only be used if there are no other references to
210+
// the arrays (exp_array has to be the last remaining reference).
211+
let owned_array = exp_array
212+
// as in the previous example, we first downcast to &Float64Array
213+
.as_primitive::<Float64Type>()
214+
// non-obviously, we call clone here to get an owned `Float64Array`.
215+
// Calling clone() is relatively inexpensive as it increments
216+
// some ref counts but doesn't clone the data)
217+
//
218+
// Once we have the owned Float64Array we can drop the original
219+
// exp_array (untyped) reference
220+
.clone();
221+
222+
// We *MUST* drop the reference to `exp_array` explicitly so that
223+
// owned_array is the only reference remaining in this function.
224+
//
225+
// Note that depending on the query there may still be other references
226+
// to the underlying buffers, which would prevent reuse. The only way to
227+
// know for sure is the result of `compute::unary_mut`
228+
drop(exp_array);
229+
230+
// If we have the only reference, compute the result directly into the same
231+
// allocation as was used for the input array
232+
match compute::unary_mut(owned_array, |exp| base.powf(exp)) {
233+
Err(_orig_array) => {
234+
// unary_mut will return the original array if there are other
235+
// references into the underling buffer (and thus reuse is
236+
// impossible)
237+
//
238+
// In a real implementation, this case should fall back to
239+
// calling `unary` and allocate a new array; In this example
240+
// we will return an error for demonstration purposes
241+
exec_err!("Could not reuse array for maybe_pow_in_place")
242+
}
243+
// a result of OK means the operation was run successfully
244+
Ok(res) => Ok(Arc::new(res)),
245+
}
246+
}
247+
194248
/// In this example we register `PowUdf` as a user defined function
195249
/// and invoke it via the DataFrame API and SQL
196250
#[tokio::main]
@@ -215,9 +269,29 @@ async fn main() -> Result<()> {
215269
// print the results
216270
df.show().await?;
217271

218-
// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
219-
let sql_df = ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t").await?;
220-
sql_df.show().await?;
272+
// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
273+
ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t")
274+
.await?
275+
.show()
276+
.await?;
277+
278+
// You can also invoke pow_in_place by passing a constant base and a
279+
// column `a` as the exponent . If there is only a single
280+
// reference to `a` the code works well
281+
ctx.sql("SELECT pow(2, a) FROM t").await?.show().await?;
282+
283+
// However, if there are multiple references to `a` in the evaluation
284+
// the array storage can not be reused
285+
let err = ctx
286+
.sql("SELECT pow(2, a), pow(3, a) FROM t")
287+
.await?
288+
.show()
289+
.await
290+
.unwrap_err();
291+
assert_eq!(
292+
err.to_string(),
293+
"Execution error: Could not reuse array for maybe_pow_in_place"
294+
);
221295

222296
Ok(())
223297
}

datafusion/expr/src/udf.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,13 +326,15 @@ where
326326
}
327327
}
328328

329+
/// Arguments passed to [`ScalarUDFImpl::invoke_with_args`] when invoking a
330+
/// scalar function.
329331
pub struct ScalarFunctionArgs<'a> {
330-
// The evaluated arguments to the function
331-
pub args: &'a [ColumnarValue],
332-
// The number of rows in record batch being evaluated
332+
/// The evaluated arguments to the function
333+
pub args: Vec<ColumnarValue>,
334+
/// The number of rows in record batch being evaluated
333335
pub number_rows: usize,
334-
// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`)
335-
// when creating the physical expression from the logical expression
336+
/// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`)
337+
/// when creating the physical expression from the logical expression
336338
pub return_type: &'a DataType,
337339
}
338340

@@ -539,7 +541,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
539541
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
540542
/// to arrays, which will likely be simpler code, but be slower.
541543
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
542-
self.invoke_batch(args.args, args.number_rows)
544+
self.invoke_batch(&args.args, args.number_rows)
543545
}
544546

545547
/// Invoke the function without `args`, instead the number of rows are provided,

datafusion/functions/src/datetime/to_local_time.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ mod tests {
562562
fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
563563
let res = ToLocalTimeFunc::new()
564564
.invoke_with_args(ScalarFunctionArgs {
565-
args: &[ColumnarValue::Scalar(input)],
565+
args: vec![ColumnarValue::Scalar(input)],
566566
number_rows: 1,
567567
return_type: &expected.data_type(),
568568
})

datafusion/functions/src/string/ascii.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ mod tests {
157157
($INPUT:expr, $EXPECTED:expr) => {
158158
test_function!(
159159
AsciiFunc::new(),
160-
&[ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))],
160+
vec![ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))],
161161
$EXPECTED,
162162
i32,
163163
Int32,
@@ -166,7 +166,7 @@ mod tests {
166166

167167
test_function!(
168168
AsciiFunc::new(),
169-
&[ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))],
169+
vec![ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))],
170170
$EXPECTED,
171171
i32,
172172
Int32,
@@ -175,7 +175,7 @@ mod tests {
175175

176176
test_function!(
177177
AsciiFunc::new(),
178-
&[ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))],
178+
vec![ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))],
179179
$EXPECTED,
180180
i32,
181181
Int32,

datafusion/functions/src/string/btrim.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,17 @@ mod tests {
152152
// String view cases for checking normal logic
153153
test_function!(
154154
BTrimFunc::new(),
155-
&[ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
155+
vec![ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
156156
String::from("alphabet ")
157-
))),],
157+
)))],
158158
Ok(Some("alphabet")),
159159
&str,
160160
Utf8View,
161161
StringViewArray
162162
);
163163
test_function!(
164164
BTrimFunc::new(),
165-
&[ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
165+
vec![ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
166166
String::from(" alphabet ")
167167
))),],
168168
Ok(Some("alphabet")),
@@ -172,7 +172,7 @@ mod tests {
172172
);
173173
test_function!(
174174
BTrimFunc::new(),
175-
&[
175+
vec![
176176
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
177177
"alphabet"
178178
)))),
@@ -185,7 +185,7 @@ mod tests {
185185
);
186186
test_function!(
187187
BTrimFunc::new(),
188-
&[
188+
vec![
189189
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
190190
"alphabet"
191191
)))),
@@ -200,7 +200,7 @@ mod tests {
200200
);
201201
test_function!(
202202
BTrimFunc::new(),
203-
&[
203+
vec![
204204
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
205205
"alphabet"
206206
)))),
@@ -214,7 +214,7 @@ mod tests {
214214
// Special string view case for checking unlined output(len > 12)
215215
test_function!(
216216
BTrimFunc::new(),
217-
&[
217+
vec![
218218
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
219219
"xxxalphabetalphabetxxx"
220220
)))),
@@ -228,7 +228,7 @@ mod tests {
228228
// String cases
229229
test_function!(
230230
BTrimFunc::new(),
231-
&[ColumnarValue::Scalar(ScalarValue::Utf8(Some(
231+
vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some(
232232
String::from("alphabet ")
233233
))),],
234234
Ok(Some("alphabet")),
@@ -238,7 +238,7 @@ mod tests {
238238
);
239239
test_function!(
240240
BTrimFunc::new(),
241-
&[ColumnarValue::Scalar(ScalarValue::Utf8(Some(
241+
vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some(
242242
String::from("alphabet ")
243243
))),],
244244
Ok(Some("alphabet")),
@@ -248,7 +248,7 @@ mod tests {
248248
);
249249
test_function!(
250250
BTrimFunc::new(),
251-
&[
251+
vec![
252252
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabet")))),
253253
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("t")))),
254254
],
@@ -259,7 +259,7 @@ mod tests {
259259
);
260260
test_function!(
261261
BTrimFunc::new(),
262-
&[
262+
vec![
263263
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabet")))),
264264
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabe")))),
265265
],
@@ -270,7 +270,7 @@ mod tests {
270270
);
271271
test_function!(
272272
BTrimFunc::new(),
273-
&[
273+
vec![
274274
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabet")))),
275275
ColumnarValue::Scalar(ScalarValue::Utf8(None)),
276276
],

0 commit comments

Comments
 (0)