-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatic type widening in MERGE #2764
Automatic type widening in MERGE #2764
Conversation
spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments.
spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! This is going to be really powerful when combined with the already existing schema auto migration! I have two questions and a small nit, but otherwise it LGTM.
@@ -278,14 +278,22 @@ object ResolveDeltaMergeInto { | |||
}) | |||
|
|||
val migrationSchema = filterSchema(source.schema, Seq.empty) | |||
val allowTypeWidening = target.exists { | |||
case DeltaTable(fileIndex) => | |||
TypeWidening.isEnabled(fileIndex.protocol, fileIndex.metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if type widening is disabled after this statement, but before the main transaction starts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two calls to SchemaMergingUtils.mergeSchemas (here and in ImplicitMetadataOperation) will return a different schema: first one with the wider type, second one with the original type.
This works currently because the second call to mergeSchemas doesn't allow implicit casts and will fail but it's quite brittle. I added a check at the start of MERGE to properly fail when type widening is enabled/disabled concurrently
spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome.
Which Delta project/connector is this regarding?
Description
This change is part of the type widening table feature.
Type widening feature request: #2622
Type Widening protocol RFC: #2624
It adds automatic type widening as part of schema evolution in MERGE INTO:
DeltaMergeInto
plan, when merging the target and source schema to compute the schema after evolution, we keep the wider source type when type widening is enabled on the table.How was this patch tested?
DeltaTypeWideningSchemaEvolutionSuite
is added to cover type evolution in MERGEThis PR introduces the following user-facing changes
The table feature is available in testing only, there are no user-facing changes as of now.
When automatic schema evolution is enabled in MERGE and the source schema contains a type that is wider than the target schema:
With type widening disabled: the type in the target schema is not changed. the ingestion behavior follows the
storeAssignmentPolicy
configuration:null
With type widening enabled: the type in the target schema is updated to the wider source type. The MERGE operation always succeeds:
After the MERGE operation, the target schema is
key int, value int
.