Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow user-specified schema in read if it's consistent #3929

Merged
merged 6 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ class DeltaDataSource
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
if (schema.nonEmpty && schema.get.nonEmpty) {
throw DeltaErrors.specifySchemaAtReadTimeException
}
val path = parameters.getOrElse("path", {
throw DeltaErrors.pathNotSpecifiedException
})
Expand Down Expand Up @@ -108,6 +105,11 @@ class DeltaDataSource
.getOrElse(snapshot.schema)
}

if (schema.nonEmpty && schema.get.nonEmpty &&
!DataType.equalsIgnoreCompatibleNullability(readSchema, schema.get)) {
throw DeltaErrors.specifySchemaAtReadTimeException
}

val schemaToUse = DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalWriterMetadata(sqlContext.sparkSession, readSchema)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase

test("disallow user specified schema") {
withTempDir { inputDir =>
new File(inputDir, "_delta_log").mkdir()
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))
withMetadata(deltaLog, StructType.fromDDL("value STRING"))

val e = intercept[AnalysisException] {
spark.readStream
.schema(StructType.fromDDL("a INT, b STRING"))
Expand All @@ -154,6 +156,32 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
for (msg <- Seq("Delta does not support specifying the schema at read time")) {
assert(e.getMessage.contains(msg))
}

val e2 = intercept[Exception] {
spark.readStream
.schema(StructType.fromDDL("value STRING"))
.format("delta")
.load(inputDir.getCanonicalPath)
}
assert(e2.getMessage.contains("does not support user-specified schema"))
}
}

test("allow user specified schema if consistent: v1 source") {
withTempDir { inputDir =>
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))
withMetadata(deltaLog, StructType.fromDDL("value STRING"))

import org.apache.spark.sql.execution.datasources.DataSource
// User-specified schema is allowed if it's consistent with the actual Delta table schema.
// Here we use Spark internal APIs to trigger v1 source code path. That being said, we
// are not fixing end-user behavior, but advanced Spark plugins.
val v1DataSource = DataSource(
spark,
userSpecifiedSchema = Some(StructType.fromDDL("value STRING")),
className = "delta",
options = Map("path" -> inputDir.getCanonicalPath))
Dataset.ofRows(spark, StreamingRelation(v1DataSource))
}
}

Expand Down
Loading