Skip to content

Commit b36db64

Browse files
authored
fix: Bucketed scan fallback for native_datafusion Parquet scan (#1720)
1 parent c6d6157 commit b36db64

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3333
import org.apache.spark.sql.internal.SQLConf
3434

3535
import org.apache.comet.CometConf
36-
import org.apache.comet.CometConf.{COMET_DPP_FALLBACK_ENABLED, COMET_EXEC_ENABLED, COMET_NATIVE_SCAN_IMPL, COMET_SCHEMA_EVOLUTION_ENABLED, SCAN_NATIVE_ICEBERG_COMPAT}
37-
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo}
36+
import org.apache.comet.CometConf._
37+
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos}
3838
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
3939

4040
case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
@@ -86,14 +86,21 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
8686
val fallbackReasons = new ListBuffer[String]()
8787
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
8888
fallbackReasons += s"Unsupported file format ${r.fileFormat}"
89-
return withInfo(scanExec, fallbackReasons.mkString(", "))
89+
return withInfos(scanExec, fallbackReasons.toSet)
9090
}
9191

9292
val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
9393
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
9494
fallbackReasons +=
9595
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"
96-
return withInfo(scanExec, fallbackReasons.mkString(", "))
96+
return withInfos(scanExec, fallbackReasons.toSet)
97+
}
98+
99+
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) {
100+
// https://github.com/apache/datafusion-comet/issues/1719
101+
fallbackReasons +=
102+
"Full native scan disabled because bucketed scan is not supported"
103+
return withInfos(scanExec, fallbackReasons.toSet)
97104
}
98105

99106
val (schemaSupported, partitionSchemaSupported) = scanImpl match {
@@ -117,7 +124,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
117124
if (schemaSupported && partitionSchemaSupported) {
118125
CometScanExec(scanExec, session)
119126
} else {
120-
withInfo(scanExec, fallbackReasons.mkString(", "))
127+
withInfos(scanExec, fallbackReasons.toSet)
121128
}
122129

123130
case _ =>
@@ -152,7 +159,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
152159
scanExec.copy(scan = cometScan),
153160
runtimeFilters = scanExec.runtimeFilters)
154161
} else {
155-
withInfo(scanExec, fallbackReasons.mkString(", "))
162+
withInfos(scanExec, fallbackReasons.toSet)
156163
}
157164

158165
// Iceberg scan
@@ -179,7 +186,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
179186
scanExec.clone().asInstanceOf[BatchScanExec],
180187
runtimeFilters = scanExec.runtimeFilters)
181188
} else {
182-
withInfo(scanExec, fallbackReasons.mkString(", "))
189+
withInfos(scanExec, fallbackReasons.toSet)
183190
}
184191

185192
case other =>

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,6 +1470,10 @@ class CometExecSuite extends CometTestBase {
14701470
}
14711471

14721472
test("bucketed table") {
1473+
// native_datafusion actually passes this test, but in the case where buckets are pruned it fails, so we're
1474+
// falling back for bucketed scans entirely as a workaround.
1475+
// https://github.com/apache/datafusion-comet/issues/1719
1476+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
14731477
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
14741478
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
14751479
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)

0 commit comments

Comments
 (0)