From f5a4df4ef24c5be98b388b6d3601051b0fca6e05 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 6 Dec 2024 12:19:00 +0800 Subject: [PATCH 1/6] allow user-specified schema in read if it's consistent --- .../resources/error/delta-error-classes.json | 2 +- .../sql/delta/sources/DeltaDataSource.scala | 7 ++++--- .../spark/sql/delta/DeltaSourceSuite.scala | 18 ++++++++++++++++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index c028865d2e7..a556bf7eca8 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2859,7 +2859,7 @@ }, "DELTA_UNSUPPORTED_SCHEMA_DURING_READ" : { "message" : [ - "Delta does not support specifying the schema at read time." + "Delta does not support specifying the schema at read time, unless it's the same as the actual table schema." ], "sqlState" : "0AKDC" }, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index 350ec6ede56..ca34a6bba6a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -75,9 +75,6 @@ class DeltaDataSource schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { - if (schema.nonEmpty && schema.get.nonEmpty) { - throw DeltaErrors.specifySchemaAtReadTimeException - } val path = parameters.getOrElse("path", { throw DeltaErrors.pathNotSpecifiedException }) @@ -108,6 +105,10 @@ class DeltaDataSource .getOrElse(snapshot.schema) } + if (schema.nonEmpty && !DataType.equalsIgnoreCompatibleNullability(readSchema, schema.get)) { + throw DeltaErrors.specifySchemaAtReadTimeException + } + val schemaToUse = DeltaColumnMapping.dropColumnMappingMetadata( DeltaTableUtils.removeInternalWriterMetadata(sqlContext.sparkSession, readSchema) ) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 1750a7580d2..aeda425bf12 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -157,6 +157,24 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase } } + test("allow user specified schema if consistent: v1 source") { + withTempDir { inputDir => + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + withMetadata(deltaLog, StructType.fromDDL("value STRING")) + + import org.apache.spark.sql.execution.datasources.DataSource + // User-specified schema is allowed if it's consistent with the actual Delta table schema. + // Here we use Spark internal APIs to trigger v1 source code path. That being said, we + // are not fixing end-user behavior, but advanced Spark plugins. + val v1DataSource = DataSource( + spark, + userSpecifiedSchema = Some(StructType.fromDDL("value STRING")), + className = "delta", + options = Map("path" -> inputDir.getCanonicalPath)) + Dataset.ofRows(spark, StreamingRelation(v1DataSource)) + } + } + test("basic") { withTempDir { inputDir => val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) From c734a7bee84404737889910bf4527a1fa14122fe Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 4 Feb 2025 23:16:00 +0800 Subject: [PATCH 2/6] Update spark/src/main/resources/error/delta-error-classes.json --- spark/src/main/resources/error/delta-error-classes.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index a556bf7eca8..c028865d2e7 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2859,7 +2859,7 @@ }, "DELTA_UNSUPPORTED_SCHEMA_DURING_READ" : { "message" : [ - "Delta does not support specifying the schema at read time, unless it's the same as the actual table schema." + "Delta does not support specifying the schema at read time." ], "sqlState" : "0AKDC" }, From 07b4e9a0facdd7a1acfba92e9cfc24402f6ad055 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 4 Feb 2025 23:17:07 +0800 Subject: [PATCH 3/6] Update spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala --- .../org/apache/spark/sql/delta/sources/DeltaDataSource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index ca34a6bba6a..f9a1e0d1e2c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -105,7 +105,8 @@ class DeltaDataSource .getOrElse(snapshot.schema) } - if (schema.nonEmpty && !DataType.equalsIgnoreCompatibleNullability(readSchema, schema.get)) { + if (schema.nonEmpty && schema.get.nonEmpty && + !DataType.equalsIgnoreCompatibleNullability(readSchema, schema.get)) { throw DeltaErrors.specifySchemaAtReadTimeException } From 746df4731bc08e64129e4adc23258452800b29fc Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 4 Feb 2025 23:21:54 +0800 Subject: [PATCH 4/6] Update DeltaSourceSuite.scala --- .../apache/spark/sql/delta/DeltaSourceSuite.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index aeda425bf12..12a3e46825d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -144,7 +144,9 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase test("disallow user specified schema") { withTempDir { inputDir => - new File(inputDir, "_delta_log").mkdir() + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + withMetadata(deltaLog, StructType.fromDDL("value STRING")) + val e = intercept[AnalysisException] { spark.readStream .schema(StructType.fromDDL("a INT, b STRING")) @@ -154,6 +156,14 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase for (msg <- Seq("Delta does not support specifying the schema at read time")) { assert(e.getMessage.contains(msg)) } + + val e = intercept[SparkThrowable] { + spark.readStream + .schema(StructType.fromDDL("value STRING")) + .format("delta") + .load(inputDir.getCanonicalPath) + } + assert(e.getMessage.contains("does not support user-specified schema")) } } From aa8f00abe6d9b28e2a1400f23add397b673490e8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 5 Feb 2025 09:44:39 +0800 Subject: [PATCH 5/6] Update spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala --- .../scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 12a3e46825d..e377ec96d03 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -157,13 +157,13 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase assert(e.getMessage.contains(msg)) } - val e = intercept[SparkThrowable] { + val e2 = intercept[SparkThrowable] { spark.readStream .schema(StructType.fromDDL("value STRING")) .format("delta") .load(inputDir.getCanonicalPath) } - assert(e.getMessage.contains("does not support user-specified schema")) + assert(e2.getMessage.contains("does not support user-specified schema")) } } From b8eacccdcd49ce71de9d95b33861f736e385bf23 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 5 Feb 2025 11:19:05 +0800 Subject: [PATCH 6/6] Update spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala --- .../scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index e377ec96d03..25096cf165f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -157,7 +157,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase assert(e.getMessage.contains(msg)) } - val e2 = intercept[SparkThrowable] { + val e2 = intercept[Exception] { spark.readStream .schema(StructType.fromDDL("value STRING")) .format("delta")