Skip to content

Commit 443b5db

Browse files
authored
chore: Move all array_* serde to new framework, use correct INCOMPAT config (#1349)
1 parent e964947 commit 443b5db

File tree

8 files changed

+343
-245
lines changed

8 files changed

+343
-245
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

+15-5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf
4545
*/
4646
object CometConf extends ShimCometConf {
4747

48+
val COMPAT_GUIDE: String = "For more information, refer to the Comet Compatibility " +
49+
"Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)"
50+
4851
private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " +
4952
"Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)"
5053

@@ -605,20 +608,27 @@ object CometConf extends ShimCometConf {
605608
.booleanConf
606609
.createWithDefault(false)
607610

611+
val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
612+
conf("spark.comet.expression.allowIncompatible")
613+
.doc(
614+
"Comet is not currently fully compatible with Spark for all expressions. " +
615+
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
616+
.booleanConf
617+
.createWithDefault(false)
618+
608619
val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
609620
conf("spark.comet.cast.allowIncompatible")
610621
.doc(
611622
"Comet is not currently fully compatible with Spark for all cast operations. " +
612-
"Set this config to true to allow them anyway. See compatibility guide " +
613-
"for more information.")
623+
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
614624
.booleanConf
615625
.createWithDefault(false)
616626

617627
val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
618628
conf("spark.comet.regexp.allowIncompatible")
619-
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
620-
"Set this config to true to allow them anyway using Rust's regular expression engine. " +
621-
"See compatibility guide for more information.")
629+
.doc(
630+
"Comet is not currently fully compatible with Spark for all regular expressions. " +
631+
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
622632
.booleanConf
623633
.createWithDefault(false)
624634

docs/source/user-guide/configs.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Comet provides the following configuration settings.
2525
|--------|-------------|---------------|
2626
| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 |
2727
| spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false |
28-
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
28+
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
2929
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false |
3030
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
3131
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 |
@@ -64,6 +64,7 @@ Comet provides the following configuration settings.
6464
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
6565
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
6666
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
67+
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
6768
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 |
6869
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
6970
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
@@ -73,7 +74,7 @@ Comet provides the following configuration settings.
7374
| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 |
7475
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
7576
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
76-
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
77+
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
7778
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
7879
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
7980
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |

docs/source/user-guide/expressions.md

+20-9
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,26 @@ The following Spark expressions are currently available. Any known compatibility
182182
| VariancePop | |
183183
| VarianceSamp | |
184184

185-
## Complex Types
186-
187-
| Expression | Notes |
188-
| ----------------- | ----------- |
189-
| CreateNamedStruct | |
190-
| ElementAt | Arrays only |
191-
| GetArrayItem | |
192-
| GetStructField | |
193-
| StructsToJson | |
185+
## Arrays
186+
187+
| Expression | Notes |
188+
|-------------------|--------------|
189+
| ArrayAppend | Experimental |
190+
| ArrayContains | Experimental |
191+
| ArrayIntersect | Experimental |
192+
| ArrayJoin | Experimental |
193+
| ArrayRemove | Experimental |
194+
| ArraysOverlap | Experimental |
195+
| ElementAt | Arrays only |
196+
| GetArrayItem | |
197+
198+
## Structs
199+
200+
| Expression | Notes |
201+
|-------------------|--------------|
202+
| CreateNamedStruct | |
203+
| GetStructField | |
204+
| StructsToJson | |
194205

195206
## Other
196207

docs/templates/compatibility-template.md

+16-6
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,6 @@ be used in production.
3232

3333
There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.
3434

35-
## Regular Expressions
36-
37-
Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
38-
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
39-
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
40-
4135
## Floating number comparison
4236

4337
Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark.
@@ -46,6 +40,22 @@ because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`).
4640
functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
4741
So Comet will add additional normalization expression of NaN and zero for comparison.
4842

43+
## Incompatible Expressions
44+
45+
Some Comet native expressions are not 100% compatible with Spark and are disabled by default. These expressions
46+
will fall back to Spark but can be enabled by setting `spark.comet.expression.allowIncompatible=true`.
47+
48+
## Array Expressions
49+
50+
Comet has experimental support for a number of array expressions. These are experimental and currently marked
51+
as incompatible and can be enabled by setting `spark.comet.expression.allowIncompatible=true`.
52+
53+
## Regular Expressions
54+
55+
Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
56+
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
57+
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
58+
4959
## Cast
5060

5161
Cast operations in Comet fall into three levels of support:

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

+25-73
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
929929
binding: Boolean): Option[Expr] = {
930930
SQLConf.get
931931

932+
def convert(handler: CometExpressionSerde): Option[Expr] = {
933+
handler match {
934+
case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
935+
withInfo(
936+
expr,
937+
s"$expr is not fully compatible with Spark. To enable it anyway, set " +
938+
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.")
939+
None
940+
case _ =>
941+
handler.convert(expr, inputs, binding)
942+
}
943+
}
944+
932945
expr match {
933946
case a @ Alias(_, _) =>
934947
val r = exprToProtoInternal(a.child, inputs, binding)
@@ -2345,83 +2358,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
23452358
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
23462359
None
23472360
}
2348-
case expr: ArrayRemove => CometArrayRemove.convert(expr, inputs, binding)
2349-
case expr if expr.prettyName == "array_contains" =>
2350-
createBinaryExpr(
2351-
expr,
2352-
expr.children(0),
2353-
expr.children(1),
2354-
inputs,
2355-
binding,
2356-
(builder, binaryExpr) => builder.setArrayContains(binaryExpr))
2357-
case _ if expr.prettyName == "array_append" =>
2358-
createBinaryExpr(
2359-
expr,
2360-
expr.children(0),
2361-
expr.children(1),
2362-
inputs,
2363-
binding,
2364-
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
2365-
case _ if expr.prettyName == "array_intersect" =>
2366-
createBinaryExpr(
2367-
expr,
2368-
expr.children(0),
2369-
expr.children(1),
2370-
inputs,
2371-
binding,
2372-
(builder, binaryExpr) => builder.setArrayIntersect(binaryExpr))
2373-
case ArrayJoin(arrayExpr, delimiterExpr, nullReplacementExpr) =>
2374-
val arrayExprProto = exprToProto(arrayExpr, inputs, binding)
2375-
val delimiterExprProto = exprToProto(delimiterExpr, inputs, binding)
2376-
2377-
if (arrayExprProto.isDefined && delimiterExprProto.isDefined) {
2378-
val arrayJoinBuilder = nullReplacementExpr match {
2379-
case Some(nrExpr) =>
2380-
val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding)
2381-
ExprOuterClass.ArrayJoin
2382-
.newBuilder()
2383-
.setArrayExpr(arrayExprProto.get)
2384-
.setDelimiterExpr(delimiterExprProto.get)
2385-
.setNullReplacementExpr(nullReplacementExprProto.get)
2386-
case None =>
2387-
ExprOuterClass.ArrayJoin
2388-
.newBuilder()
2389-
.setArrayExpr(arrayExprProto.get)
2390-
.setDelimiterExpr(delimiterExprProto.get)
2391-
}
2392-
Some(
2393-
ExprOuterClass.Expr
2394-
.newBuilder()
2395-
.setArrayJoin(arrayJoinBuilder)
2396-
.build())
2397-
} else {
2398-
val exprs: List[Expression] = nullReplacementExpr match {
2399-
case Some(nrExpr) => List(arrayExpr, delimiterExpr, nrExpr)
2400-
case None => List(arrayExpr, delimiterExpr)
2401-
}
2402-
withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*)
2403-
None
2404-
}
2405-
case ArraysOverlap(leftArrayExpr, rightArrayExpr) =>
2406-
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
2407-
createBinaryExpr(
2408-
expr,
2409-
leftArrayExpr,
2410-
rightArrayExpr,
2411-
inputs,
2412-
binding,
2413-
(builder, binaryExpr) => builder.setArraysOverlap(binaryExpr))
2414-
} else {
2415-
withInfo(
2416-
expr,
2417-
s"$expr is not supported yet. To enable all incompatible casts, set " +
2418-
s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
2419-
None
2420-
}
2361+
case _: ArrayRemove => convert(CometArrayRemove)
2362+
case _: ArrayContains => convert(CometArrayContains)
2363+
// Function introduced in 3.4.0. Refer by name to provide compatibility
2364+
// with older Spark builds
2365+
case _ if expr.prettyName == "array_append" => convert(CometArrayAppend)
2366+
case _: ArrayIntersect => convert(CometArrayIntersect)
2367+
case _: ArrayJoin => convert(CometArrayJoin)
2368+
case _: ArraysOverlap => convert(CometArraysOverlap)
24212369
case _ =>
24222370
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
24232371
None
24242372
}
2373+
24252374
}
24262375

24272376
/**
@@ -3464,3 +3413,6 @@ trait CometExpressionSerde {
34643413
inputs: Seq[Attribute],
34653414
binding: Boolean): Option[ExprOuterClass.Expr]
34663415
}
3416+
3417+
/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */
3418+
trait IncompatExpr {}

spark/src/main/scala/org/apache/comet/serde/arrays.scala

+102-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
package org.apache.comet.serde
2121

22-
import org.apache.spark.sql.catalyst.expressions.{ArrayRemove, Attribute, Expression}
22+
import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression}
2323
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, StructType}
2424

2525
import org.apache.comet.CometSparkSessionExtensions.withInfo
26-
import org.apache.comet.serde.QueryPlanSerde.createBinaryExpr
26+
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto}
2727
import org.apache.comet.shims.CometExprShim
2828

2929
object CometArrayRemove extends CometExpressionSerde with CometExprShim {
@@ -65,3 +65,103 @@ object CometArrayRemove extends CometExpressionSerde with CometExprShim {
6565
(builder, binaryExpr) => builder.setArrayRemove(binaryExpr))
6666
}
6767
}
68+
69+
object CometArrayAppend extends CometExpressionSerde with IncompatExpr {
70+
override def convert(
71+
expr: Expression,
72+
inputs: Seq[Attribute],
73+
binding: Boolean): Option[ExprOuterClass.Expr] = {
74+
createBinaryExpr(
75+
expr,
76+
expr.children(0),
77+
expr.children(1),
78+
inputs,
79+
binding,
80+
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
81+
}
82+
}
83+
84+
object CometArrayContains extends CometExpressionSerde with IncompatExpr {
85+
override def convert(
86+
expr: Expression,
87+
inputs: Seq[Attribute],
88+
binding: Boolean): Option[ExprOuterClass.Expr] = {
89+
createBinaryExpr(
90+
expr,
91+
expr.children(0),
92+
expr.children(1),
93+
inputs,
94+
binding,
95+
(builder, binaryExpr) => builder.setArrayContains(binaryExpr))
96+
}
97+
}
98+
99+
object CometArrayIntersect extends CometExpressionSerde with IncompatExpr {
100+
override def convert(
101+
expr: Expression,
102+
inputs: Seq[Attribute],
103+
binding: Boolean): Option[ExprOuterClass.Expr] = {
104+
createBinaryExpr(
105+
expr,
106+
expr.children(0),
107+
expr.children(1),
108+
inputs,
109+
binding,
110+
(builder, binaryExpr) => builder.setArrayIntersect(binaryExpr))
111+
}
112+
}
113+
114+
object CometArraysOverlap extends CometExpressionSerde with IncompatExpr {
115+
override def convert(
116+
expr: Expression,
117+
inputs: Seq[Attribute],
118+
binding: Boolean): Option[ExprOuterClass.Expr] = {
119+
createBinaryExpr(
120+
expr,
121+
expr.children(0),
122+
expr.children(1),
123+
inputs,
124+
binding,
125+
(builder, binaryExpr) => builder.setArraysOverlap(binaryExpr))
126+
}
127+
}
128+
129+
object CometArrayJoin extends CometExpressionSerde with IncompatExpr {
130+
override def convert(
131+
expr: Expression,
132+
inputs: Seq[Attribute],
133+
binding: Boolean): Option[ExprOuterClass.Expr] = {
134+
val arrayExpr = expr.asInstanceOf[ArrayJoin]
135+
val arrayExprProto = exprToProto(arrayExpr.array, inputs, binding)
136+
val delimiterExprProto = exprToProto(arrayExpr.delimiter, inputs, binding)
137+
138+
if (arrayExprProto.isDefined && delimiterExprProto.isDefined) {
139+
val arrayJoinBuilder = arrayExpr.nullReplacement match {
140+
case Some(nrExpr) =>
141+
val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding)
142+
ExprOuterClass.ArrayJoin
143+
.newBuilder()
144+
.setArrayExpr(arrayExprProto.get)
145+
.setDelimiterExpr(delimiterExprProto.get)
146+
.setNullReplacementExpr(nullReplacementExprProto.get)
147+
case None =>
148+
ExprOuterClass.ArrayJoin
149+
.newBuilder()
150+
.setArrayExpr(arrayExprProto.get)
151+
.setDelimiterExpr(delimiterExprProto.get)
152+
}
153+
Some(
154+
ExprOuterClass.Expr
155+
.newBuilder()
156+
.setArrayJoin(arrayJoinBuilder)
157+
.build())
158+
} else {
159+
val exprs: List[Expression] = arrayExpr.nullReplacement match {
160+
case Some(nrExpr) => List(arrayExpr, arrayExpr.delimiter, nrExpr)
161+
case None => List(arrayExpr, arrayExpr.delimiter)
162+
}
163+
withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*)
164+
None
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)