Skip to content

Commit 7d46ff2

Browse files
authored
chore: Add scanImpl attribute to CometScanExec (#1746)
1 parent 093a244 commit 7d46ff2

21 files changed

+125
-130
lines changed

.github/workflows/miri.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,4 @@ jobs:
5252
- name: Test with Miri
5353
run: |
5454
cd native
55-
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test
55+
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --lib --bins --tests --examples

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,6 @@ object CometConf extends ShimCometConf {
104104
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
105105
.toLowerCase(Locale.ROOT))
106106

107-
def isExperimentalNativeScan: Boolean = COMET_NATIVE_SCAN_IMPL.get() match {
108-
case SCAN_NATIVE_DATAFUSION | SCAN_NATIVE_ICEBERG_COMPAT => true
109-
case SCAN_NATIVE_COMET => false
110-
}
111-
112107
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
113108
conf("spark.comet.parquet.read.parallel.io.enabled")
114109
.doc(

dev/diffs/3.4.3.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,7 @@ index 75eabcb96f2..36e3318ad7e 100644
961961
_.asInstanceOf[FileScanRDD].filePartitions.forall(
962962
_.files.forall(_.urlEncodedPath.contains("p=0"))))
963963
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
964-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
964+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
965965
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
966966
+ fs.inputRDDs().forall(
967967
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

dev/diffs/3.5.4.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ index 260c992f1ae..b9d8e22337c 100644
10921092
_.asInstanceOf[FileScanRDD].filePartitions.forall(
10931093
_.files.forall(_.urlEncodedPath.contains("p=0"))))
10941094
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1095-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1095+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
10961096
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
10971097
+ fs.inputRDDs().forall(
10981098
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

dev/diffs/3.5.5.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ index 04702201f82..6cc2b01b7f3 100644
963963
_.asInstanceOf[FileScanRDD].filePartitions.forall(
964964
_.files.forall(_.urlEncodedPath.contains("p=0"))))
965965
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
966-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
966+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
967967
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
968968
+ fs.inputRDDs().forall(
969969
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

dev/diffs/4.0.0-preview1.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1035,7 +1035,7 @@ index 68f14f13bbd..174636cefb5 100644
10351035
_.asInstanceOf[FileScanRDD].filePartitions.forall(
10361036
_.files.forall(_.urlEncodedPath.contains("p=0"))))
10371037
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1038-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1038+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
10391039
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
10401040
+ fs.inputRDDs().forall(
10411041
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ import org.apache.comet.vector.CometVector
5656
* in [[org.apache.comet.CometSparkSessionExtensions]]
5757
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values.
5858
*/
59-
class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with ShimSQLConf {
59+
class CometParquetFileFormat(scanImpl: String)
60+
extends ParquetFileFormat
61+
with MetricsSupport
62+
with ShimSQLConf {
6063
override def shortName(): String = "parquet"
6164
override def toString: String = "CometParquet"
6265
override def hashCode(): Int = getClass.hashCode()
@@ -100,8 +103,8 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
100103

101104
// Comet specific configurations
102105
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
103-
val nativeIcebergCompat =
104-
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
106+
107+
val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
105108

106109
(file: PartitionedFile) => {
107110
val sharedConf = broadcastedHadoopConf.value.value

spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.comet.{CometConf, CometRuntimeException}
4646
import org.apache.comet.shims.ShimSQLConf
4747

4848
case class CometParquetPartitionReaderFactory(
49+
usingDataFusionReader: Boolean,
4950
@transient sqlConf: SQLConf,
5051
broadcastedConf: Broadcast[SerializableConfiguration],
5152
readDataSchema: StructType,
@@ -71,17 +72,6 @@ case class CometParquetPartitionReaderFactory(
7172
// Comet specific configurations
7273
private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
7374

74-
@transient private lazy val usingDataFusionReader: Boolean = {
75-
val conf = broadcastedConf.value.value
76-
conf.getBoolean(
77-
CometConf.COMET_NATIVE_SCAN_ENABLED.key,
78-
CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
79-
conf
80-
.get(
81-
CometConf.COMET_NATIVE_SCAN_IMPL.key,
82-
CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
83-
.equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
84-
}
8575
// This is only called at executor on a Broadcast variable, so we don't want it to be
8676
// materialized at driver.
8777
@transient private lazy val preFetchEnabled = {

spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ trait CometParquetScan extends FileScan with MetricsSupport {
5858
val broadcastedConf =
5959
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
6060
CometParquetPartitionReaderFactory(
61+
usingDataFusionReader = false, // this value is not used since this is v2 scan
6162
sqlConf,
6263
broadcastedConf,
6364
readDataSchema,

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec
3535
import org.apache.spark.sql.types.{DoubleType, FloatType}
3636

3737
import org.apache.comet.{CometConf, ExtendedExplainInfo}
38-
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_NATIVE_SCAN_IMPL, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
38+
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
3939
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
4040
import org.apache.comet.serde.OperatorOuterClass.Operator
4141
import org.apache.comet.serde.QueryPlanSerde
@@ -154,8 +154,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
154154

155155
plan.transformUp {
156156
// Fully native scan for V1
157-
case scan: CometScanExec
158-
if COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
157+
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
159158
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
160159
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
161160

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PlanExpression}
2626
import org.apache.spark.sql.catalyst.rules.Rule
2727
import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
28-
import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
28+
import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
2929
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
3030
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
3131
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
3232
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3333
import org.apache.spark.sql.internal.SQLConf
34+
import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, MapType, ShortType, StructType}
3435

35-
import org.apache.comet.CometConf
36+
import org.apache.comet.{CometConf, DataTypeSupport}
3637
import org.apache.comet.CometConf._
3738
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos}
3839
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
@@ -106,16 +107,11 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
106107
return withInfos(scanExec, fallbackReasons.toSet)
107108
}
108109

109-
val (schemaSupported, partitionSchemaSupported) = scanImpl match {
110-
case CometConf.SCAN_NATIVE_DATAFUSION =>
111-
(
112-
CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema, fallbackReasons),
113-
CometNativeScanExec.isSchemaSupported(r.partitionSchema, fallbackReasons))
114-
case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
115-
(
116-
CometScanExec.isSchemaSupported(scanExec.requiredSchema, fallbackReasons),
117-
CometScanExec.isSchemaSupported(r.partitionSchema, fallbackReasons))
118-
}
110+
val typeChecker = new CometScanTypeChecker(scanImpl)
111+
val schemaSupported =
112+
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
113+
val partitionSchemaSupported =
114+
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
119115

120116
if (!schemaSupported) {
121117
fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl"
@@ -125,7 +121,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
125121
}
126122

127123
if (schemaSupported && partitionSchemaSupported) {
128-
CometScanExec(scanExec, session)
124+
// this is confusing, but we always insert a CometScanExec here, which may replaced
125+
// with a CometNativeExec when CometExecRule runs, depending on the scanImpl value.
126+
CometScanExec(scanExec, session, scanImpl)
129127
} else {
130128
withInfos(scanExec, fallbackReasons.toSet)
131129
}
@@ -201,3 +199,23 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
201199
}
202200

203201
}
202+
203+
case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport {
204+
override def isTypeSupported(
205+
dt: DataType,
206+
name: String,
207+
fallbackReasons: ListBuffer[String]): Boolean = {
208+
dt match {
209+
case ByteType | ShortType
210+
if scanImpl != CometConf.SCAN_NATIVE_COMET &&
211+
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
212+
fallbackReasons += s"$scanImpl scan cannot read $dt when " +
213+
s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}."
214+
false
215+
case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET =>
216+
false
217+
case _ =>
218+
super.isTypeSupported(dt, name, fallbackReasons)
219+
}
220+
}
221+
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2282,8 +2282,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
22822282
op match {
22832283

22842284
// Fully native scan for V1
2285-
case scan: CometScanExec
2286-
if CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION =>
2285+
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
22872286
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
22882287
nativeScanBuilder.setSource(op.simpleStringWithNodeId())
22892288

@@ -2376,12 +2375,26 @@ object QueryPlanSerde extends Logging with CometExprShim {
23762375
val cond = exprToProto(condition, child.output)
23772376

23782377
if (cond.isDefined && childOp.nonEmpty) {
2378+
// We need to determine whether to use DataFusion's FilterExec or Comet's
2379+
// FilterExec. The difference is that DataFusion's implementation will sometimes pass
2380+
// batches through whereas the Comet implementation guarantees that a copy is always
2381+
// made, which is critical when using `native_comet` scans due to buffer re-use
2382+
2383+
// TODO this could be optimized more to stop walking the tree on hitting
2384+
// certain operators such as join or aggregate which will copy batches
2385+
def containsNativeCometScan(plan: SparkPlan): Boolean = {
2386+
plan match {
2387+
case w: CometScanWrapper => containsNativeCometScan(w.originalPlan)
2388+
case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET
2389+
case _: CometNativeScanExec => false
2390+
case _ => plan.children.exists(containsNativeCometScan)
2391+
}
2392+
}
2393+
23792394
val filterBuilder = OperatorOuterClass.Filter
23802395
.newBuilder()
23812396
.setPredicate(cond.get)
2382-
.setUseDatafusionFilter(
2383-
CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION ||
2384-
CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
2397+
.setUseDatafusionFilter(!containsNativeCometScan(op))
23852398
Some(result.setFilter(filterBuilder).build())
23862399
} else {
23872400
withInfo(op, condition, child)

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.spark.sql.comet
2121

22-
import scala.collection.mutable.ListBuffer
2322
import scala.reflect.ClassTag
2423

2524
import org.apache.spark.rdd.RDD
@@ -36,12 +35,12 @@ import org.apache.spark.util.collection._
3635

3736
import com.google.common.base.Objects
3837

39-
import org.apache.comet.{CometConf, DataTypeSupport}
38+
import org.apache.comet.CometConf
4039
import org.apache.comet.parquet.CometParquetFileFormat
4140
import org.apache.comet.serde.OperatorOuterClass.Operator
4241

4342
/**
44-
* Comet fully native scan node for DataSource V1.
43+
* Comet fully native scan node for DataSource V1 that delegates to DataFusion's DataSourceExec.
4544
*/
4645
case class CometNativeScanExec(
4746
override val nativeOp: Operator,
@@ -184,7 +183,7 @@ case class CometNativeScanExec(
184183
override def inputRDDs(): Seq[RDD[InternalRow]] = originalPlan.inputRDDs()
185184
}
186185

187-
object CometNativeScanExec extends DataTypeSupport {
186+
object CometNativeScanExec {
188187
def apply(
189188
nativeOp: Operator,
190189
scanExec: FileSourceScanExec,
@@ -206,7 +205,8 @@ object CometNativeScanExec extends DataTypeSupport {
206205
// https://github.com/apache/arrow-datafusion-comet/issues/190
207206
def transform(arg: Any): AnyRef = arg match {
208207
case _: HadoopFsRelation =>
209-
scanExec.relation.copy(fileFormat = new CometParquetFileFormat)(session)
208+
scanExec.relation.copy(fileFormat =
209+
new CometParquetFileFormat(CometConf.SCAN_NATIVE_DATAFUSION))(session)
210210
case other: AnyRef => other
211211
case null => null
212212
}
@@ -229,18 +229,4 @@ object CometNativeScanExec extends DataTypeSupport {
229229
scanExec.logicalLink.foreach(batchScanExec.setLogicalLink)
230230
batchScanExec
231231
}
232-
233-
override def isTypeSupported(
234-
dt: DataType,
235-
name: String,
236-
fallbackReasons: ListBuffer[String]): Boolean = {
237-
dt match {
238-
case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
239-
fallbackReasons += s"${CometConf.SCAN_NATIVE_DATAFUSION} scan cannot read $dt when " +
240-
s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}."
241-
false
242-
case _ =>
243-
super.isTypeSupported(dt, name, fallbackReasons)
244-
}
245-
}
246232
}

0 commit comments

Comments
 (0)