Skip to content

Commit

Permalink
Issue-9767 - Extract array_dims, array_ndims and flatten functions fr…
Browse files Browse the repository at this point in the history
…om functions-array subcrate' s kernels and udf containers (apache#9786)
  • Loading branch information
erenavsarogullari authored Mar 24, 2024
1 parent 1e4ddb6 commit 916d4db
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
// specific language governing permissions and limitations
// under the License.

//! [`ScalarUDFImpl`] definitions for array functions.
//! [`ScalarUDFImpl`] definitions for array_dims and array_ndims functions.

use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use datafusion_common::exec_err;
use datafusion_common::plan_err;
use datafusion_common::Result;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use arrow::array::{
Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array,
};
use arrow::datatypes::{DataType, UInt64Type};
use std::any::Any;

use datafusion_common::cast::{as_large_list_array, as_list_array};
use datafusion_common::{exec_err, plan_err, Result};

use crate::utils::{compute_array_dims, make_scalar_function};
use arrow_schema::DataType::{FixedSizeList, LargeList, List, UInt64};
use arrow_schema::Field;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};
use std::sync::Arc;

make_udf_function!(
Expand Down Expand Up @@ -64,7 +69,6 @@ impl ScalarUDFImpl for ArrayDims {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
use DataType::*;
Ok(match arg_types[0] {
List(_) | LargeList(_) | FixedSizeList(_, _) => {
List(Arc::new(Field::new("item", UInt64, true)))
Expand All @@ -76,8 +80,7 @@ impl ScalarUDFImpl for ArrayDims {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::array_dims(&args).map(ColumnarValue::Array)
make_scalar_function(array_dims_inner)(args)
}

fn aliases(&self) -> &[String] {
Expand Down Expand Up @@ -120,7 +123,6 @@ impl ScalarUDFImpl for ArrayNdims {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
use DataType::*;
Ok(match arg_types[0] {
List(_) | LargeList(_) | FixedSizeList(_, _) => UInt64,
_ => {
Expand All @@ -130,79 +132,76 @@ impl ScalarUDFImpl for ArrayNdims {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::array_ndims(&args).map(ColumnarValue::Array)
make_scalar_function(array_ndims_inner)(args)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

make_udf_function!(
Flatten,
flatten,
array,
"flattens an array of arrays into a single array.",
flatten_udf
);
/// Array_dims SQL function
pub fn array_dims_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 1 {
return exec_err!("array_dims needs one argument");
}

#[derive(Debug)]
pub(super) struct Flatten {
signature: Signature,
aliases: Vec<String>,
}
impl Flatten {
pub fn new() -> Self {
Self {
signature: Signature::array(Volatility::Immutable),
aliases: vec![String::from("flatten")],
let data = match args[0].data_type() {
List(_) => {
let array = as_list_array(&args[0])?;
array
.iter()
.map(compute_array_dims)
.collect::<Result<Vec<_>>>()?
}
}
LargeList(_) => {
let array = as_large_list_array(&args[0])?;
array
.iter()
.map(compute_array_dims)
.collect::<Result<Vec<_>>>()?
}
array_type => {
return exec_err!("array_dims does not support type '{array_type:?}'");
}
};

let result = ListArray::from_iter_primitive::<UInt64Type, _, _>(data);

Ok(Arc::new(result) as ArrayRef)
}

impl ScalarUDFImpl for Flatten {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"flatten"
/// Array_ndims SQL function
pub fn array_ndims_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 1 {
return exec_err!("array_ndims needs one argument");
}

fn signature(&self) -> &Signature {
&self.signature
}
fn general_list_ndims<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
) -> Result<ArrayRef> {
let mut data = Vec::new();
let ndims = datafusion_common::utils::list_ndims(array.data_type());

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
use DataType::*;
fn get_base_type(data_type: &DataType) -> Result<DataType> {
match data_type {
List(field) | FixedSizeList(field, _)
if matches!(field.data_type(), List(_) | FixedSizeList(_, _)) =>
{
get_base_type(field.data_type())
}
LargeList(field) if matches!(field.data_type(), LargeList(_)) => {
get_base_type(field.data_type())
}
Null | List(_) | LargeList(_) => Ok(data_type.to_owned()),
FixedSizeList(field, _) => Ok(List(field.clone())),
_ => exec_err!(
"Not reachable, data_type should be List, LargeList or FixedSizeList"
),
for arr in array.iter() {
if arr.is_some() {
data.push(Some(ndims))
} else {
data.push(None)
}
}

let data_type = get_base_type(&arg_types[0])?;
Ok(data_type)
Ok(Arc::new(UInt64Array::from(data)) as ArrayRef)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::flatten(&args).map(ColumnarValue::Array)
}

fn aliases(&self) -> &[String] {
&self.aliases
match args[0].data_type() {
List(_) => {
let array = as_list_array(&args[0])?;
general_list_ndims::<i32>(array)
}
LargeList(_) => {
let array = as_large_list_array(&args[0])?;
general_list_ndims::<i64>(array)
}
array_type => exec_err!("array_ndims does not support type {array_type:?}"),
}
}
2 changes: 1 addition & 1 deletion datafusion/functions-array/src/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! implementation kernel for array_except function
//! [`ScalarUDFImpl`] definitions for array_except function.

use crate::utils::check_datatypes;
use arrow::row::{RowConverter, SortField};
Expand Down
Loading

0 comments on commit 916d4db

Please sign in to comment.