Skip to content

Commit a31ece9

Browse files
Chore: simplify array related functions impl (#1490)
## Which issue does this PR close? Related to issue: #1459 ## Rationale for this change Defined under Issue: #1459 ## What changes are included in this PR? In functions related to arrays, scalarExprToProtoWithReturnType or scalarExprToProto is used instead of creating a separate proto for each function. ## How are these changes tested? Regression with available unit tests
1 parent 99c47d0 commit a31ece9

File tree

5 files changed

+186
-267
lines changed

5 files changed

+186
-267
lines changed

native/core/src/execution/planner.rs

Lines changed: 1 addition & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf, bit_or_udaf,
3838
use datafusion::functions_aggregate::min_max::max_udaf;
3939
use datafusion::functions_aggregate::min_max::min_udaf;
4040
use datafusion::functions_aggregate::sum::sum_udaf;
41-
use datafusion::functions_nested::array_has::array_has_any_udf;
42-
use datafusion::functions_nested::concat::ArrayAppend;
43-
use datafusion::functions_nested::remove::array_remove_all_udf;
44-
use datafusion::functions_nested::set_ops::array_intersect_udf;
45-
use datafusion::functions_nested::string::array_to_string_udf;
4641
use datafusion::physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
4742
use datafusion::physical_plan::windows::BoundedWindowAggExec;
4843
use datafusion::physical_plan::InputOrderMode;
@@ -83,10 +78,9 @@ use datafusion::common::{
8378
JoinType as DFJoinType, ScalarValue,
8479
};
8580
use datafusion::datasource::listing::PartitionedFile;
86-
use datafusion::functions_nested::array_has::ArrayHas;
8781
use datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression;
8882
use datafusion::logical_expr::{
89-
AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
83+
AggregateUDF, ReturnTypeArgs, WindowFrame, WindowFrameBound, WindowFrameUnits,
9084
WindowFunctionDefinition,
9185
};
9286
use datafusion::physical_expr::expressions::{Literal, StatsType};
@@ -759,32 +753,6 @@ impl PhysicalPlanner {
759753
expr.ordinal as usize,
760754
)))
761755
}
762-
ExprStruct::ArrayAppend(expr) => {
763-
let left =
764-
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
765-
let right =
766-
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
767-
let return_type = left.data_type(&input_schema)?;
768-
let args = vec![Arc::clone(&left), right];
769-
let datafusion_array_append =
770-
Arc::new(ScalarUDF::new_from_impl(ArrayAppend::new()));
771-
let array_append_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
772-
"array_append",
773-
datafusion_array_append,
774-
args,
775-
return_type,
776-
));
777-
778-
let is_null_expr: Arc<dyn PhysicalExpr> = Arc::new(IsNullExpr::new(left));
779-
let null_literal_expr: Arc<dyn PhysicalExpr> =
780-
Arc::new(Literal::new(ScalarValue::Null));
781-
782-
create_case_expr(
783-
vec![(is_null_expr, null_literal_expr)],
784-
Some(array_append_expr),
785-
&input_schema,
786-
)
787-
}
788756
ExprStruct::ArrayInsert(expr) => {
789757
let src_array_expr = self.create_expr(
790758
expr.src_array_expr.as_ref().unwrap(),
@@ -801,104 +769,6 @@ impl PhysicalPlanner {
801769
expr.legacy_negative_index,
802770
)))
803771
}
804-
ExprStruct::ArrayContains(expr) => {
805-
let src_array_expr =
806-
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
807-
let key_expr =
808-
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
809-
let args = vec![Arc::clone(&src_array_expr), key_expr];
810-
let array_has_expr = Arc::new(ScalarFunctionExpr::new(
811-
"array_has",
812-
Arc::new(ScalarUDF::new_from_impl(ArrayHas::new())),
813-
args,
814-
DataType::Boolean,
815-
));
816-
Ok(array_has_expr)
817-
}
818-
ExprStruct::ArrayRemove(expr) => {
819-
let src_array_expr =
820-
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
821-
let key_expr =
822-
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
823-
let args = vec![Arc::clone(&src_array_expr), Arc::clone(&key_expr)];
824-
let return_type = src_array_expr.data_type(&input_schema)?;
825-
826-
let datafusion_array_remove = array_remove_all_udf();
827-
828-
let array_remove_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
829-
"array_remove",
830-
datafusion_array_remove,
831-
args,
832-
return_type,
833-
));
834-
let is_null_expr: Arc<dyn PhysicalExpr> = Arc::new(IsNullExpr::new(key_expr));
835-
836-
let null_literal_expr: Arc<dyn PhysicalExpr> =
837-
Arc::new(Literal::new(ScalarValue::Null));
838-
839-
create_case_expr(
840-
vec![(is_null_expr, null_literal_expr)],
841-
Some(array_remove_expr),
842-
&input_schema,
843-
)
844-
}
845-
ExprStruct::ArrayIntersect(expr) => {
846-
let left_expr =
847-
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
848-
let right_expr =
849-
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
850-
let args = vec![Arc::clone(&left_expr), right_expr];
851-
let datafusion_array_intersect = array_intersect_udf();
852-
let return_type = left_expr.data_type(&input_schema)?;
853-
let array_intersect_expr = Arc::new(ScalarFunctionExpr::new(
854-
"array_intersect",
855-
datafusion_array_intersect,
856-
args,
857-
return_type,
858-
));
859-
Ok(array_intersect_expr)
860-
}
861-
ExprStruct::ArrayJoin(expr) => {
862-
let array_expr =
863-
self.create_expr(expr.array_expr.as_ref().unwrap(), Arc::clone(&input_schema))?;
864-
let delimiter_expr = self.create_expr(
865-
expr.delimiter_expr.as_ref().unwrap(),
866-
Arc::clone(&input_schema),
867-
)?;
868-
869-
let mut args = vec![Arc::clone(&array_expr), delimiter_expr];
870-
if expr.null_replacement_expr.is_some() {
871-
let null_replacement_expr = self.create_expr(
872-
expr.null_replacement_expr.as_ref().unwrap(),
873-
Arc::clone(&input_schema),
874-
)?;
875-
args.push(null_replacement_expr)
876-
}
877-
878-
let datafusion_array_to_string = array_to_string_udf();
879-
let array_join_expr = Arc::new(ScalarFunctionExpr::new(
880-
"array_join",
881-
datafusion_array_to_string,
882-
args,
883-
DataType::Utf8,
884-
));
885-
Ok(array_join_expr)
886-
}
887-
ExprStruct::ArraysOverlap(expr) => {
888-
let left_array_expr =
889-
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
890-
let right_array_expr =
891-
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
892-
let args = vec![Arc::clone(&left_array_expr), right_array_expr];
893-
let datafusion_array_has_any = array_has_any_udf();
894-
let array_has_any_expr = Arc::new(ScalarFunctionExpr::new(
895-
"array_has_any",
896-
datafusion_array_has_any,
897-
args,
898-
DataType::Boolean,
899-
));
900-
Ok(array_has_any_expr)
901-
}
902772
expr => Err(ExecutionError::GeneralError(format!(
903773
"Not implemented: {:?}",
904774
expr

native/proto/src/proto/expr.proto

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,8 @@ message Expr {
8282
ToJson to_json = 55;
8383
ListExtract list_extract = 56;
8484
GetArrayStructFields get_array_struct_fields = 57;
85-
BinaryExpr array_append = 58;
86-
ArrayInsert array_insert = 59;
87-
BinaryExpr array_contains = 60;
88-
BinaryExpr array_remove = 61;
89-
BinaryExpr array_intersect = 62;
90-
ArrayJoin array_join = 63;
91-
BinaryExpr arrays_overlap = 64;
92-
MathExpr integral_divide = 65;
85+
ArrayInsert array_insert = 58;
86+
MathExpr integral_divide = 59;
9387
}
9488
}
9589

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,34 +1925,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
19251925
None
19261926
}
19271927

1928-
case expr if expr.prettyName == "array_insert" =>
1929-
val srcExprProto = exprToProtoInternal(expr.children(0), inputs, binding)
1930-
val posExprProto = exprToProtoInternal(expr.children(1), inputs, binding)
1931-
val itemExprProto = exprToProtoInternal(expr.children(2), inputs, binding)
1932-
val legacyNegativeIndex =
1933-
SQLConf.get.getConfString("spark.sql.legacy.negativeIndexInArrayInsert").toBoolean
1934-
if (srcExprProto.isDefined && posExprProto.isDefined && itemExprProto.isDefined) {
1935-
val arrayInsertBuilder = ExprOuterClass.ArrayInsert
1936-
.newBuilder()
1937-
.setSrcArrayExpr(srcExprProto.get)
1938-
.setPosExpr(posExprProto.get)
1939-
.setItemExpr(itemExprProto.get)
1940-
.setLegacyNegativeIndex(legacyNegativeIndex)
1941-
1942-
Some(
1943-
ExprOuterClass.Expr
1944-
.newBuilder()
1945-
.setArrayInsert(arrayInsertBuilder)
1946-
.build())
1947-
} else {
1948-
withInfo(
1949-
expr,
1950-
"unsupported arguments for ArrayInsert",
1951-
expr.children(0),
1952-
expr.children(1),
1953-
expr.children(2))
1954-
None
1955-
}
1928+
case expr if expr.prettyName == "array_insert" => convert(CometArrayInsert)
19561929

19571930
case ElementAt(child, ordinal, defaultValue, failOnError)
19581931
if child.dataType.isInstanceOf[ArrayType] =>
@@ -2899,7 +2872,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
28992872
}
29002873

29012874
// Utility method. Adds explain info if the result of calling exprToProto is None
2902-
private def optExprWithInfo(
2875+
def optExprWithInfo(
29032876
optExpr: Option[Expr],
29042877
expr: Expression,
29052878
childExpr: Expression*): Option[Expr] = {

0 commit comments

Comments
 (0)