Skip to content

Commit

Permalink
[Spark] Table features protocols should handle legacy reader features…
Browse files Browse the repository at this point in the history
… as reader features (#3672)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
This PR fixes a bug where protocol (2, 7) is not considered to support
reader features. This issue could manifest in several places. For
example, it would result in hiding (2, x) features in the reader
features list when it was the only reader feature present. For example,
Protocol(2, 7, None, Set(ColumnMapping, RowTracking).
<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Added new test and adapted existing tests.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
zhipengmao-db authored Sep 12, 2024
1 parent fbf0f9b commit 920f185
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class DeltaLog private(
def assertTableFeaturesMatchMetadata(
targetProtocol: Protocol,
targetMetadata: Metadata): Unit = {
if (!targetProtocol.supportsReaderFeatures && !targetProtocol.supportsWriterFeatures) return
if (!targetProtocol.supportsTableFeatures) return

val protocolEnabledFeatures = targetProtocol.writerFeatureNames
.flatMap(TableFeature.featureNameToFeature)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ class Snapshot(
}
base.put(Protocol.MIN_READER_VERSION_PROP, protocol.minReaderVersion.toString)
base.put(Protocol.MIN_WRITER_VERSION_PROP, protocol.minWriterVersion.toString)
if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
if (protocol.supportsTableFeatures) {
val features = protocol.readerAndWriterFeatureNames.map(name =>
s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}$name" ->
TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,32 @@ import com.fasterxml.jackson.annotation.JsonIgnore
*/
trait TableFeatureSupport { this: Protocol =>

/** Check if this protocol is capable of adding features into its `readerFeatures` field. */
/**
* Check if this protocol can support arbitrary reader features. If this returns false,
* then the table may still be able to support the "columnMapping" feature.
* See [[canSupportColumnMappingFeature]] below.
*/
def supportsReaderFeatures: Boolean =
TableFeatureProtocolUtils.supportsReaderFeatures(minReaderVersion)

/**
* Check if this protocol is in table feature representation and can support column mapping.
* Column mapping is the only legacy reader feature and requires special handling in some
* cases.
*/
def canSupportColumnMappingFeature: Boolean =
TableFeatureProtocolUtils.canSupportColumnMappingFeature(minReaderVersion, minWriterVersion)

/** Check if this protocol is capable of adding features into its `writerFeatures` field. */
def supportsWriterFeatures: Boolean =
TableFeatureProtocolUtils.supportsWriterFeatures(minWriterVersion)

/**
* As soon as a protocol supports writer features it is considered a table features protocol.
* It is not possible to support reader features without supporting writer features.
*/
def supportsTableFeatures: Boolean = supportsWriterFeatures

/**
* Get a new Protocol object that has `feature` supported. Writer-only features will be added to
* `writerFeatures` field, and reader-writer features will be added to `readerFeatures` and
Expand All @@ -60,7 +78,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def withFeature(feature: TableFeature): Protocol = {
def shouldAddRead: Boolean = {
if (supportsReaderFeatures) return true
if (feature == ColumnMappingTableFeature && canSupportColumnMappingFeature) return true
if (feature.minReaderVersion <= minReaderVersion) return false

throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion(
Expand Down Expand Up @@ -111,25 +129,13 @@ trait TableFeatureSupport { this: Protocol =>
* `writerFeatures` field.
*
* The method does not require the feature to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Should never be used directly. Always use withFeature(feature: TableFeature): Protocol.
*/
private[actions] def withFeature(
name: String,
addToReaderFeatures: Boolean,
addToWriterFeatures: Boolean): Protocol = {
if (addToReaderFeatures && !supportsReaderFeatures) {
throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion(
name,
currentVersion = minReaderVersion,
requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION)
}
if (addToWriterFeatures && !supportsWriterFeatures) {
throw DeltaErrors.tableFeatureRequiresHigherWriterProtocolVersion(
name,
currentVersion = minWriterVersion,
requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
}

val addedReaderFeatureOpt = if (addToReaderFeatures) Some(name) else None
val addedWriterFeatureOpt = if (addToWriterFeatures) Some(name) else None

Expand All @@ -143,23 +149,23 @@ trait TableFeatureSupport { this: Protocol =>
* `readerFeatures` field.
*
* The method does not require the features to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Intended only for testing. Use with caution.
*/
private[delta] def withReaderFeatures(names: Iterable[String]): Protocol = {
names.foldLeft(this)(
_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false))
names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false))
}

/**
* Get a new Protocol object with additional feature descriptors added to the protocol's
* `writerFeatures` field.
*
* The method does not require the features to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Intended only for testing. Use with caution.
*/
private[delta] def withWriterFeatures(names: Iterable[String]): Protocol = {
names.foldLeft(this)(
_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true))
names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true))
}

/**
Expand Down Expand Up @@ -203,14 +209,16 @@ trait TableFeatureSupport { this: Protocol =>
*/
@JsonIgnore
lazy val implicitlySupportedFeatures: Set[TableFeature] = {
if (supportsReaderFeatures && supportsWriterFeatures) {
// this protocol uses both reader and writer features, no feature can be implicitly supported
if (supportsTableFeatures) {
// As soon as a protocol supports writer features, all features need to be explicitly defined.
// This includes legacy reader features (the only one is Column Mapping), even if the
// reader protocol is legacy and explicitly supports Column Mapping.
Set()
} else {
TableFeature.allSupportedFeaturesMap.values
.filter(_.isLegacyFeature)
.filterNot(supportsReaderFeatures || this.minReaderVersion < _.minReaderVersion)
.filterNot(supportsWriterFeatures || this.minWriterVersion < _.minWriterVersion)
.filter(_.minReaderVersion <= this.minReaderVersion)
.filter(_.minWriterVersion <= this.minWriterVersion)
.toSet
}
}
Expand Down Expand Up @@ -271,14 +279,11 @@ trait TableFeatureSupport { this: Protocol =>
val protocols = this +: others
val mergedReaderVersion = protocols.map(_.minReaderVersion).max
val mergedWriterVersion = protocols.map(_.minWriterVersion).max
val mergedReaderFeatures = protocols.flatMap(_.readerFeatureNames)
val mergedWriterFeatures = protocols.flatMap(_.writerFeatureNames)
val mergedFeatures = protocols.flatMap(_.readerAndWriterFeatures)
val mergedImplicitFeatures = protocols.flatMap(_.implicitlySupportedFeatures)

val mergedProtocol = Protocol(mergedReaderVersion, mergedWriterVersion)
.withReaderFeatures(mergedReaderFeatures)
.withWriterFeatures(mergedWriterFeatures)
.withFeatures(mergedImplicitFeatures)
.withFeatures(mergedFeatures ++ mergedImplicitFeatures)

// The merged protocol is always normalized in order to represent the protocol
// with the weakest possible form. This enables backward compatibility.
Expand Down Expand Up @@ -348,7 +353,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def normalized: Protocol = {
// Normalization can only be applied to table feature protocols.
if (!supportsWriterFeatures) return this
if (!supportsTableFeatures) return this

val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
Expand All @@ -371,7 +376,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def denormalized: Protocol = {
// Denormalization can only be applied to legacy protocols.
if (supportsWriterFeatures) return this
if (supportsTableFeatures) return this

val (minReaderVersion, _) =
TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq)
Expand Down Expand Up @@ -419,7 +424,7 @@ object TableFeatureProtocolUtils {
/** The string constant "supported" for uses in table properties. */
val FEATURE_PROP_SUPPORTED = "supported"

/** Min reader version that supports reader features. */
/** Min reader version that supports native reader features. */
val TABLE_FEATURES_MIN_READER_VERSION = 3

/** Min reader version that supports writer features. */
Expand All @@ -440,8 +445,20 @@ object TableFeatureProtocolUtils {
s"$DEFAULT_FEATURE_PROP_PREFIX$featureName"

/**
* Determine whether a [[Protocol]] with the given reader protocol version is capable of adding
* features into its `readerFeatures` field.
* Determine whether a [[Protocol]] with the given reader protocol version can support column
* mapping. All table feature protocols that can support column mapping are capable of adding
* the feature to the `readerFeatures` field. This includes legacy reader protocol version
* (2, 7).
*/
def canSupportColumnMappingFeature(readerVersion: Int, writerVersion: Int): Boolean = {
readerVersion >= ColumnMappingTableFeature.minReaderVersion &&
supportsWriterFeatures(writerVersion)
}

/**
* Determine whether a [[Protocol]] with the given reader protocol version supports
* native features. All protocols that can support native reader features are capable
* of adding the feature to the `readerFeatures` field.
*/
def supportsReaderFeatures(readerVersion: Int): Boolean = {
readerVersion >= TABLE_FEATURES_MIN_READER_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ case class Protocol private (
// Correctness check
// Reader and writer versions must match the status of reader and writer features
require(
supportsReaderFeatures == readerFeatures.isDefined,
(supportsReaderFeatures || canSupportColumnMappingFeature) == readerFeatures.isDefined,
"Mismatched minReaderVersion and readerFeatures.")
require(
supportsWriterFeatures == writerFeatures.isDefined,
"Mismatched minWriterVersion and writerFeatures.")

// When reader is on table features, writer must be on table features too
// When reader is on table features, writer must be on table features too.
if (supportsReaderFeatures && !supportsWriterFeatures) {
throw DeltaErrors.tableFeatureReadRequiresWriteException(
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
Expand All @@ -166,7 +166,7 @@ case class Protocol private (
*/
@JsonIgnore
lazy val simpleString: String = {
if (!supportsReaderFeatures && !supportsWriterFeatures) {
if (!supportsTableFeatures) {
s"$minReaderVersion,$minWriterVersion"
} else {
val readerFeaturesStr = readerFeatures
Expand Down Expand Up @@ -203,18 +203,20 @@ object Protocol {
def apply(
minReaderVersion: Int = Action.readerVersion,
minWriterVersion: Int = Action.writerVersion): Protocol = {
val shouldAddReaderFeatures = supportsReaderFeatures(minReaderVersion) ||
canSupportColumnMappingFeature(minReaderVersion, minWriterVersion)
new Protocol(
minReaderVersion = minReaderVersion,
minWriterVersion = minWriterVersion,
readerFeatures = if (supportsReaderFeatures(minReaderVersion)) Some(Set()) else None,
readerFeatures = if (shouldAddReaderFeatures) Some(Set()) else None,
writerFeatures = if (supportsWriterFeatures(minWriterVersion)) Some(Set()) else None)
}

/** Returns the required protocol for a given feature. Takes into account dependent features. */
def forTableFeature(tf: TableFeature): Protocol = {
// Every table feature is a writer feature.
val writerFeatures = tf.requiredFeatures + tf
val readerFeatures = writerFeatures.filter(f => f.isReaderWriterFeature && !f.isLegacyFeature)
val readerFeatures = writerFeatures.filter(_.isReaderWriterFeature)
val writerFeaturesNames = writerFeatures.map(_.name)
val readerFeaturesNames = readerFeatures.map(_.name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
expectedJson =
s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" +
s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" +
""""readerFeatures":["testLegacyReaderWriter"],""" +
""""readerFeatures":[],""" +
""""writerFeatures":["testLegacyReaderWriter"]}}""")

testActionSerDe(
Expand All @@ -248,7 +248,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
expectedJson =
s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" +
s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" +
""""readerFeatures":["testLegacyReaderWriter","testReaderWriter"],""" +
""""readerFeatures":["testReaderWriter"],""" +
""""writerFeatures":["testLegacyReaderWriter","testReaderWriter"]}}""")

testActionSerDe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase
}
// upgrade to name mode
val protocol = deltaLog.snapshot.protocol
val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
val (r, w) = if (protocol.supportsTableFeatures) {
(TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession {
Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString),
(Protocol.MIN_WRITER_VERSION_PROP,
Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString))
if (snapshot.protocol.supportsReaderFeatures || snapshot.protocol.supportsWriterFeatures) {
if (snapshot.protocol.supportsTableFeatures) {
props ++=
Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures(
spark, metadata, snapshot.protocol)
Expand Down
Loading

0 comments on commit 920f185

Please sign in to comment.