Skip to content

Commit

Permalink
Type Evolution in MERGE
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Mar 19, 2024
1 parent 72fad38 commit 370ed5d
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ object ResolveDeltaMergeInto {
// schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE
// clauses since these can't by definition reference source columns and thus can't introduce
// new columns in the target schema.
val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions)
val actions = (resolvedMatchedClauses ++ resolvedNotMatchedClauses).flatMap(_.actions)
val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts }
val containsStarAction = actions.exists {
case _: UnresolvedStar => true
Expand Down Expand Up @@ -278,14 +278,25 @@ object ResolveDeltaMergeInto {
})

val migrationSchema = filterSchema(source.schema, Seq.empty)
val allowTypeWidening = EliminateSubqueryAliases(target) match {
case DeltaFullTable(_, index) =>
TypeWidening.isEnabled(
index.snapshotAtAnalysis.protocol,
index.snapshotAtAnalysis.metadata
)
case o => throw DeltaErrors.notADeltaSourceException("MERGE", Some(o))
}

// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
// enforces Parquet-level write compatibility, which would mean an INT source can't be merged
// into a LONG target.
SchemaMergingUtils.mergeSchemas(
target.schema,
migrationSchema,
allowImplicitConversions = true)
allowImplicitConversions = true,
allowTypeWidening = allowTypeWidening
)
} else {
target.schema
}
Expand Down
13 changes: 13 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ object TypeWidening {
case _ => false
}

/**
* Returns whether the given type change is eligible for **automatic** widening. Only a subset of
* supported type changes are considered for automatic widening.
*/
def isAutomaticTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if !isTypeChangeSupported(from, to) => false
case (from, to) if from == to => true
case (ByteType, ShortType) => true
case (ByteType | ShortType, IntegerType) => true
case _ => false
}

/**
* Filter the given list of files to only keep files that were written before the latest type
* change, if any. These older files contain a column or field with a type that is different than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ trait ImplicitMetadataOperation extends DeltaLogging {
if (rearrangeOnly) {
throw DeltaErrors.unexpectedDataChangeException("Change the Delta table schema")
}
txn.updateMetadata(txn.metadata.copy(schemaString = mergedSchema.json

val schemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata(
txn,
schema = mergedSchema,
oldSchema = txn.metadata.schema
)

txn.updateMetadata(txn.metadata.copy(schemaString = schemaWithTypeWideningMetadata.json
))
} else if (isNewSchema || isNewPartitioning
) {
Expand Down Expand Up @@ -201,7 +208,8 @@ object ImplicitMetadataOperation {
SchemaMergingUtils.mergeSchemas(
txn.metadata.schema,
dataSchema,
fixedTypeColumns = fixedTypeColumns)
fixedTypeColumns = fixedTypeColumns,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.sql.delta.DeltaAnalysisException
import org.apache.spark.sql.delta.{DeltaAnalysisException, TypeWidening}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, DecimalType, IntegerType, MapType, NullType, ShortType, StructField, StructType}
import org.apache.spark.sql.types._

/**
* Utils to merge table schema with data schema.
Expand Down Expand Up @@ -168,6 +167,7 @@ object SchemaMergingUtils {
dataSchema: StructType,
allowImplicitConversions: Boolean = false,
keepExistingType: Boolean = false,
allowTypeWidening: Boolean = false,
fixedTypeColumns: Set[String] = Set.empty,
caseSensitive: Boolean = false): StructType = {
checkColumnNameDuplication(dataSchema, "in the data to save", caseSensitive)
Expand Down Expand Up @@ -232,6 +232,9 @@ object SchemaMergingUtils {
// Simply keeps the existing type for primitive types
case (current, update) if keepExistingType => current

case (current: AtomicType, update: AtomicType) if allowTypeWidening &&
TypeWidening.isAutomaticTypeChangeSupported(current, update) => update

// If implicit conversions are allowed, that means we can use any valid implicit cast to
// perform the merge.
case (current, update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,10 @@ trait DeltaErrorsSuiteBase
val e = intercept[DeltaAnalysisException] {
val s1 = StructType(Seq(StructField("c0", IntegerType, true)))
val s2 = StructType(Seq(StructField("c0", StringType, false)))
SchemaMergingUtils.mergeSchemas(s1, s2, false, false, Set("c0"))
SchemaMergingUtils.mergeSchemas(s1, s2,
allowImplicitConversions = false,
keepExistingType = false,
allowTypeWidening = false, Set("c0"))
}
checkErrorMessage(e, Some("DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH"), Some("42K09"),
Some("Column c0 is a generated column or a column used by a generated " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode}
import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, RDDScanExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -467,6 +468,8 @@ trait DeltaDMLTestUtils
with BeforeAndAfterEach {
self: SharedSparkSession =>

import testImplicits._

protected var tempDir: File = _

protected var deltaLog: DeltaLog = _
Expand Down Expand Up @@ -515,6 +518,23 @@ trait DeltaDMLTestUtils
}
}

/**
* Parse the input JSON data into a dataframe, one row per input element.
* Throws an exception on malformed inputs or records that don't comply with the provided schema.
*/
protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = {
if (schema != null) {
spark.read
.schema(schema)
.option("mode", FailFastMode.name)
.json(data.toDS)
} else {
spark.read
.option("mode", FailFastMode.name)
.json(data.toDS)
}
}

protected def readDeltaTable(path: String): DataFrame = {
spark.read.format("delta").load(path)
}
Expand Down
Loading

0 comments on commit 370ed5d

Please sign in to comment.