Skip to content

Commit cc325e6

Browse files
committed
Add datafusion-functions-array crate
1 parent 4257c16 commit cc325e6

File tree

26 files changed

+654
-306
lines changed

26 files changed

+654
-306
lines changed

.github/workflows/rust.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ jobs:
8282
- name: Check function packages (encoding_expressions)
8383
run: cargo check --no-default-features --features=encoding_expressions -p datafusion
8484

85+
- name: Check function packages (array_expressions)
86+
run: cargo check --no-default-features --features=array_expressions -p datafusion
87+
8588
- name: Check Cargo.lock for datafusion-cli
8689
run: |
8790
# If this test fails, try running `cargo update` in the `datafusion-cli` directory

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
[workspace]
1919
exclude = ["datafusion-cli"]
20-
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/functions", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", "benchmarks",
20+
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/functions", "datafusion/functions-array", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", "benchmarks",
2121
]
2222
resolver = "2"
2323

@@ -50,6 +50,7 @@ datafusion-common = { path = "datafusion/common", version = "35.0.0" }
5050
datafusion-execution = { path = "datafusion/execution", version = "35.0.0" }
5151
datafusion-expr = { path = "datafusion/expr", version = "35.0.0" }
5252
datafusion-functions = { path = "datafusion/functions", version = "35.0.0" }
53+
datafusion-functions-array = { path = "datafusion/functions-array", version = "35.0.0" }
5354
datafusion-optimizer = { path = "datafusion/optimizer", version = "35.0.0" }
5455
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "35.0.0" }
5556
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "35.0.0" }

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ This crate has several [features] which can be specified in your `Cargo.toml`.
7575

7676
Default features:
7777

78+
- `array_expressions`: functions for working with arrays such as `array_to_string`
7879
- `compression`: reading files compressed with `xz2`, `bzip2`, `flate2`, and `zstd`
7980
- `crypto_expressions`: cryptographic functions such as `md5` and `sha256`
8081
- `encoding_expressions`: `encode` and `decode` functions

datafusion-examples/examples/simple_udf.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use datafusion::error::Result;
2828
use datafusion::prelude::*;
2929
use datafusion_common::cast::as_float64_array;
3030
use datafusion_expr::ColumnarValue;
31-
use datafusion_physical_expr::functions::columnar_values_to_array;
3231
use std::sync::Arc;
3332

3433
/// create local execution context with an in-memory table:
@@ -71,7 +70,7 @@ async fn main() -> Result<()> {
7170
// this is guaranteed by DataFusion based on the function's signature.
7271
assert_eq!(args.len(), 2);
7372

74-
let args = columnar_values_to_array(args)?;
73+
let args = ColumnarValue::values_to_arrays(args)?;
7574

7675
// 1. cast both arguments to f64. These casts MUST be aligned with the signature or this function panics!
7776
let base = as_float64_array(&args[0]).expect("cast failed");

datafusion/core/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ path = "src/lib.rs"
3535

3636
[features]
3737
# Used to enable the avro format
38+
array_expressions = ["datafusion-functions-array"]
3839
avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
3940
backtrace = ["datafusion-common/backtrace"]
4041
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression", "tokio-util"]
4142
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
42-
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
43+
default = ["array_expressions", "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
4344
encoding_expressions = ["datafusion-functions/encoding_expressions"]
4445
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
4546
force_hash_collisions = []
@@ -65,7 +66,8 @@ dashmap = { workspace = true }
6566
datafusion-common = { path = "../common", version = "35.0.0", features = ["object_store"], default-features = false }
6667
datafusion-execution = { workspace = true }
6768
datafusion-expr = { workspace = true }
68-
datafusion-functions = { path = "../functions", version = "35.0.0" }
69+
datafusion-functions = {workspace = true }
70+
datafusion-functions-array = { workspace=true, optional = true }
6971
datafusion-optimizer = { path = "../optimizer", version = "35.0.0", default-features = false }
7072
datafusion-physical-expr = { path = "../physical-expr", version = "35.0.0", default-features = false }
7173
datafusion-physical-plan = { workspace = true }

datafusion/core/src/execution/context/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,11 @@ impl SessionState {
13481348
datafusion_functions::register_all(&mut new_self)
13491349
.expect("can not register built in functions");
13501350

1351+
// register crate of array expressions (if enabled)
1352+
#[cfg(feature = "array_expressions")]
1353+
datafusion_functions_array::register_all(&mut new_self)
1354+
.expect("can not register array expressions");
1355+
13511356
new_self
13521357
}
13531358
/// Returns new [`SessionState`] using the provided

datafusion/core/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,12 @@ pub mod functions {
521521
pub use datafusion_functions::*;
522522
}
523523

524+
/// re-export of [`datafusion_functions_array`] crate, if "array_expressions" feature is enabled
525+
pub mod functions_array {
526+
#[cfg(feature = "array_expressions")]
527+
pub use datafusion_functions::*;
528+
}
529+
524530
#[cfg(test)]
525531
pub mod test;
526532
pub mod test_util;

datafusion/core/src/prelude.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub use datafusion_expr::{
3939
Expr,
4040
};
4141
pub use datafusion_functions::expr_fn::*;
42+
#[cfg(feature = "array_expressions")]
43+
pub use datafusion_functions_array::expr_fn::*;
4244

4345
pub use std::ops::Not;
4446
pub use std::ops::{Add, Div, Mul, Neg, Rem, Sub};

datafusion/core/tests/dataframe/dataframe_functions.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use arrow::{
2020
array::{Int32Array, StringArray},
2121
record_batch::RecordBatch,
2222
};
23+
use arrow_array::types::Int32Type;
24+
use arrow_array::ListArray;
2325
use arrow_schema::SchemaRef;
2426
use std::sync::Arc;
2527

@@ -40,6 +42,7 @@ fn test_schema() -> SchemaRef {
4042
Arc::new(Schema::new(vec![
4143
Field::new("a", DataType::Utf8, false),
4244
Field::new("b", DataType::Int32, false),
45+
Field::new("l", DataType::new_list(DataType::Int32, true), true),
4346
]))
4447
}
4548

@@ -57,6 +60,12 @@ async fn create_test_table() -> Result<DataFrame> {
5760
"123AbcDef",
5861
])),
5962
Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
63+
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
64+
Some(vec![Some(0), Some(1), Some(2)]),
65+
None,
66+
Some(vec![Some(3), None, Some(5)]),
67+
Some(vec![Some(6), Some(7)]),
68+
])),
6069
],
6170
)?;
6271

@@ -67,7 +76,7 @@ async fn create_test_table() -> Result<DataFrame> {
6776
ctx.table("test").await
6877
}
6978

70-
/// Excutes an expression on the test dataframe as a select.
79+
/// Executes an expression on the test dataframe as a select.
7180
/// Compares formatted output of a record batch with an expected
7281
/// vector of strings, using the assert_batch_eq! macro
7382
macro_rules! assert_fn_batches {
@@ -841,3 +850,22 @@ async fn test_fn_decode() -> Result<()> {
841850

842851
Ok(())
843852
}
853+
854+
#[tokio::test]
855+
async fn test_fn_array_to_string() -> Result<()> {
856+
let expr = array_to_string(col("l"), lit("***"));
857+
858+
let expected = [
859+
"+-------------------------------------+",
860+
"| array_to_string(test.l,Utf8(\"***\")) |",
861+
"+-------------------------------------+",
862+
"| 0***1***2 |",
863+
"| |",
864+
"| 3***5 |",
865+
"| 6***7 |",
866+
"+-------------------------------------+",
867+
];
868+
assert_fn_batches!(expr, expected);
869+
870+
Ok(())
871+
}

datafusion/expr/src/built_in_function.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,6 @@ pub enum BuiltinScalarFunction {
175175
ArrayReverse,
176176
/// array_slice
177177
ArraySlice,
178-
/// array_to_string
179-
ArrayToString,
180178
/// array_intersect
181179
ArrayIntersect,
182180
/// array_union
@@ -432,7 +430,6 @@ impl BuiltinScalarFunction {
432430
BuiltinScalarFunction::ArrayReverse => Volatility::Immutable,
433431
BuiltinScalarFunction::Flatten => Volatility::Immutable,
434432
BuiltinScalarFunction::ArraySlice => Volatility::Immutable,
435-
BuiltinScalarFunction::ArrayToString => Volatility::Immutable,
436433
BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable,
437434
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
438435
BuiltinScalarFunction::ArrayResize => Volatility::Immutable,
@@ -628,7 +625,6 @@ impl BuiltinScalarFunction {
628625
BuiltinScalarFunction::ArrayReverse => Ok(input_expr_types[0].clone()),
629626
BuiltinScalarFunction::ArraySlice => Ok(input_expr_types[0].clone()),
630627
BuiltinScalarFunction::ArrayResize => Ok(input_expr_types[0].clone()),
631-
BuiltinScalarFunction::ArrayToString => Ok(Utf8),
632628
BuiltinScalarFunction::ArrayIntersect => {
633629
match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
634630
(DataType::Null, DataType::Null) | (DataType::Null, _) => {
@@ -970,9 +966,6 @@ impl BuiltinScalarFunction {
970966
Signature::variadic_any(self.volatility())
971967
}
972968

973-
BuiltinScalarFunction::ArrayToString => {
974-
Signature::variadic_any(self.volatility())
975-
}
976969
BuiltinScalarFunction::ArrayIntersect => Signature::any(2, self.volatility()),
977970
BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()),
978971
BuiltinScalarFunction::Cardinality => Signature::any(1, self.volatility()),
@@ -1574,12 +1567,6 @@ impl BuiltinScalarFunction {
15741567
}
15751568
BuiltinScalarFunction::ArrayReverse => &["array_reverse", "list_reverse"],
15761569
BuiltinScalarFunction::ArraySlice => &["array_slice", "list_slice"],
1577-
BuiltinScalarFunction::ArrayToString => &[
1578-
"array_to_string",
1579-
"list_to_string",
1580-
"array_join",
1581-
"list_join",
1582-
],
15831570
BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"],
15841571
BuiltinScalarFunction::Cardinality => &["cardinality"],
15851572
BuiltinScalarFunction::ArrayResize => &["array_resize", "list_resize"],

datafusion/expr/src/columnar_value.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use arrow::array::ArrayRef;
2121
use arrow::array::NullArray;
2222
use arrow::datatypes::DataType;
23-
use datafusion_common::{Result, ScalarValue};
23+
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
2424
use std::sync::Arc;
2525

2626
/// Represents the result of evaluating an expression: either a single
@@ -75,4 +75,51 @@ impl ColumnarValue {
7575
pub fn create_null_array(num_rows: usize) -> Self {
7676
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
7777
}
78+
79+
/// A helper function used to infer the length of arguments of Scalar
80+
/// functions and convert [`ColumnarValue`]s to [`ArrayRef`]s with the
81+
/// inferred length.
82+
///
83+
/// # Note on Performance
84+
///
85+
/// This function expands any `ScalalarValues` to arrays of the same length.
86+
/// This allows a single function implementaiton in terms of arrays, but it
87+
/// can be inefficient. It is recommended to provide specialized
88+
/// implementations for scalar values if performance is a concern.
89+
///
90+
/// # Errors
91+
///
92+
/// If there are multiple array arguments that have different lengths
93+
pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
94+
if args.is_empty() {
95+
return Ok(vec![]);
96+
}
97+
98+
let mut array_len = None;
99+
for arg in args {
100+
array_len = match (arg, array_len) {
101+
(ColumnarValue::Array(a), None) => Some(a.len()),
102+
(ColumnarValue::Array(a), Some(array_len)) => {
103+
if array_len == a.len() {
104+
Some(array_len)
105+
} else {
106+
return internal_err!(
107+
"Arguments has mixed length. Expected length: {array_len}, found length: {}",a.len()
108+
);
109+
}
110+
}
111+
(ColumnarValue::Scalar(_), array_len) => array_len,
112+
}
113+
}
114+
115+
// If array_len is none, it means there are only scalars, so make a 1 element array
116+
let inferred_length = array_len.unwrap_or(1);
117+
118+
let args = args
119+
.iter()
120+
.map(|arg| arg.clone().into_array(inferred_length))
121+
.collect::<Result<Vec<_>>>()?;
122+
123+
Ok(args)
124+
}
78125
}

datafusion/expr/src/expr_fn.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -740,12 +740,6 @@ scalar_expr!(
740740
array begin end stride,
741741
"returns a slice of the array."
742742
);
743-
scalar_expr!(
744-
ArrayToString,
745-
array_to_string,
746-
array delimiter,
747-
"converts each element to its text representation."
748-
);
749743
scalar_expr!(ArrayUnion, array_union, array1 array2, "returns an array of the elements in the union of array1 and array2 without duplicates.");
750744

751745
scalar_expr!(
@@ -1447,7 +1441,6 @@ mod test {
14471441
test_scalar_expr!(ArrayReplace, array_replace, array, from, to);
14481442
test_scalar_expr!(ArrayReplaceN, array_replace_n, array, from, to, max);
14491443
test_scalar_expr!(ArrayReplaceAll, array_replace_all, array, from, to);
1450-
test_scalar_expr!(ArrayToString, array_to_string, array, delimiter);
14511444
test_unary_scalar_expr!(Cardinality, cardinality);
14521445
test_nary_scalar_expr!(MakeArray, array, input);
14531446

datafusion/functions-array/Cargo.toml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-functions-array"
20+
description = "Array Function packages for the DataFusion query engine"
21+
keywords = ["datafusion", "logical", "plan", "expressions"]
22+
readme = "README.md"
23+
version = { workspace = true }
24+
edition = { workspace = true }
25+
homepage = { workspace = true }
26+
repository = { workspace = true }
27+
license = { workspace = true }
28+
authors = { workspace = true }
29+
rust-version = { workspace = true }
30+
31+
[features]
32+
33+
[lib]
34+
name = "datafusion_functions_array"
35+
path = "src/lib.rs"
36+
37+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
38+
39+
[dependencies]
40+
arrow = { workspace = true }
41+
datafusion-common = { workspace = true }
42+
datafusion-execution = { workspace = true }
43+
datafusion-expr = { workspace = true }
44+
log = "0.4.20"
45+
paste = "1.0.14"

datafusion/functions-array/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# DataFusion Function Library
21+
22+
[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.
23+
24+
This crate contains functions for working with arrays, such as `array_append` that work with
25+
`ListArray`, `LargeListArray` and `FixedListArray` types from the `arrow` crate.
26+
27+
[df]: https://crates.io/crates/datafusion

0 commit comments

Comments
 (0)