diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 7f6f8e2782..10796a0a11 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -426,7 +426,7 @@ class DeltaLog private( def assertTableFeaturesMatchMetadata( targetProtocol: Protocol, targetMetadata: Metadata): Unit = { - if (!targetProtocol.supportsReaderFeatures && !targetProtocol.supportsWriterFeatures) return + if (!targetProtocol.supportsTableFeatures) return val protocolEnabledFeatures = targetProtocol.writerFeatureNames .flatMap(TableFeature.featureNameToFeature) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 1591a99f83..7ed6840454 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -542,7 +542,7 @@ class Snapshot( } base.put(Protocol.MIN_READER_VERSION_PROP, protocol.minReaderVersion.toString) base.put(Protocol.MIN_WRITER_VERSION_PROP, protocol.minWriterVersion.toString) - if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { + if (protocol.supportsTableFeatures) { val features = protocol.readerAndWriterFeatureNames.map(name => s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}$name" -> TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index b906549908..2d41fae0f7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -39,14 +39,32 @@ import com.fasterxml.jackson.annotation.JsonIgnore */ trait TableFeatureSupport { this: Protocol => - /** Check if this protocol is capable of adding features into its `readerFeatures` field. */ + /** + * Check if this protocol can support arbitrary reader features. If this returns false, + * then the table may still be able to support the "columnMapping" feature. + * See [[canSupportColumnMappingFeature]] below. + */ def supportsReaderFeatures: Boolean = TableFeatureProtocolUtils.supportsReaderFeatures(minReaderVersion) + /** + * Check if this protocol is in table feature representation and can support column mapping. + * Column mapping is the only legacy reader feature and requires special handling in some + * cases. + */ + def canSupportColumnMappingFeature: Boolean = + TableFeatureProtocolUtils.canSupportColumnMappingFeature(minReaderVersion, minWriterVersion) + /** Check if this protocol is capable of adding features into its `writerFeatures` field. */ def supportsWriterFeatures: Boolean = TableFeatureProtocolUtils.supportsWriterFeatures(minWriterVersion) + /** + * As soon as a protocol supports writer features it is considered a table features protocol. + * It is not possible to support reader features without supporting writer features. + */ + def supportsTableFeatures: Boolean = supportsWriterFeatures + /** * Get a new Protocol object that has `feature` supported. Writer-only features will be added to * `writerFeatures` field, and reader-writer features will be added to `readerFeatures` and @@ -60,7 +78,7 @@ trait TableFeatureSupport { this: Protocol => */ def withFeature(feature: TableFeature): Protocol = { def shouldAddRead: Boolean = { - if (supportsReaderFeatures) return true + if (feature == ColumnMappingTableFeature && canSupportColumnMappingFeature) return true if (feature.minReaderVersion <= minReaderVersion) return false throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion( @@ -111,25 +129,13 @@ trait TableFeatureSupport { this: Protocol => * `writerFeatures` field. * * The method does not require the feature to be recognized by the client, therefore will not - * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution. + * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. + * Should never be used directly. Always use withFeature(feature: TableFeature): Protocol. */ private[actions] def withFeature( name: String, addToReaderFeatures: Boolean, addToWriterFeatures: Boolean): Protocol = { - if (addToReaderFeatures && !supportsReaderFeatures) { - throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion( - name, - currentVersion = minReaderVersion, - requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION) - } - if (addToWriterFeatures && !supportsWriterFeatures) { - throw DeltaErrors.tableFeatureRequiresHigherWriterProtocolVersion( - name, - currentVersion = minWriterVersion, - requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) - } - val addedReaderFeatureOpt = if (addToReaderFeatures) Some(name) else None val addedWriterFeatureOpt = if (addToWriterFeatures) Some(name) else None @@ -143,11 +149,11 @@ trait TableFeatureSupport { this: Protocol => * `readerFeatures` field. * * The method does not require the features to be recognized by the client, therefore will not - * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution. + * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. + * Intended only for testing. Use with caution. */ private[delta] def withReaderFeatures(names: Iterable[String]): Protocol = { - names.foldLeft(this)( - _.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false)) + names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false)) } /** @@ -155,11 +161,11 @@ trait TableFeatureSupport { this: Protocol => * `writerFeatures` field. * * The method does not require the features to be recognized by the client, therefore will not - * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution. + * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. + * Intended only for testing. Use with caution. */ private[delta] def withWriterFeatures(names: Iterable[String]): Protocol = { - names.foldLeft(this)( - _.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true)) + names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true)) } /** @@ -203,14 +209,16 @@ trait TableFeatureSupport { this: Protocol => */ @JsonIgnore lazy val implicitlySupportedFeatures: Set[TableFeature] = { - if (supportsReaderFeatures && supportsWriterFeatures) { - // this protocol uses both reader and writer features, no feature can be implicitly supported + if (supportsTableFeatures) { + // As soon as a protocol supports writer features, all features need to be explicitly defined. + // This includes legacy reader features (the only one is Column Mapping), even if the + // reader protocol is legacy and explicitly supports Column Mapping. Set() } else { TableFeature.allSupportedFeaturesMap.values .filter(_.isLegacyFeature) - .filterNot(supportsReaderFeatures || this.minReaderVersion < _.minReaderVersion) - .filterNot(supportsWriterFeatures || this.minWriterVersion < _.minWriterVersion) + .filter(_.minReaderVersion <= this.minReaderVersion) + .filter(_.minWriterVersion <= this.minWriterVersion) .toSet } } @@ -271,14 +279,11 @@ trait TableFeatureSupport { this: Protocol => val protocols = this +: others val mergedReaderVersion = protocols.map(_.minReaderVersion).max val mergedWriterVersion = protocols.map(_.minWriterVersion).max - val mergedReaderFeatures = protocols.flatMap(_.readerFeatureNames) - val mergedWriterFeatures = protocols.flatMap(_.writerFeatureNames) + val mergedFeatures = protocols.flatMap(_.readerAndWriterFeatures) val mergedImplicitFeatures = protocols.flatMap(_.implicitlySupportedFeatures) val mergedProtocol = Protocol(mergedReaderVersion, mergedWriterVersion) - .withReaderFeatures(mergedReaderFeatures) - .withWriterFeatures(mergedWriterFeatures) - .withFeatures(mergedImplicitFeatures) + .withFeatures(mergedFeatures ++ mergedImplicitFeatures) // The merged protocol is always normalized in order to represent the protocol // with the weakest possible form. This enables backward compatibility. @@ -348,7 +353,7 @@ trait TableFeatureSupport { this: Protocol => */ def normalized: Protocol = { // Normalization can only be applied to table feature protocols. - if (!supportsWriterFeatures) return this + if (!supportsTableFeatures) return this val (minReaderVersion, minWriterVersion) = TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures) @@ -371,7 +376,7 @@ trait TableFeatureSupport { this: Protocol => */ def denormalized: Protocol = { // Denormalization can only be applied to legacy protocols. - if (supportsWriterFeatures) return this + if (supportsTableFeatures) return this val (minReaderVersion, _) = TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq) @@ -419,7 +424,7 @@ object TableFeatureProtocolUtils { /** The string constant "supported" for uses in table properties. */ val FEATURE_PROP_SUPPORTED = "supported" - /** Min reader version that supports reader features. */ + /** Min reader version that supports native reader features. */ val TABLE_FEATURES_MIN_READER_VERSION = 3 /** Min reader version that supports writer features. */ @@ -440,8 +445,20 @@ object TableFeatureProtocolUtils { s"$DEFAULT_FEATURE_PROP_PREFIX$featureName" /** - * Determine whether a [[Protocol]] with the given reader protocol version is capable of adding - * features into its `readerFeatures` field. + * Determine whether a [[Protocol]] with the given reader protocol version can support column + * mapping. All table feature protocols that can support column mapping are capable of adding + * the feature to the `readerFeatures` field. This includes legacy reader protocol version + * (2, 7). + */ + def canSupportColumnMappingFeature(readerVersion: Int, writerVersion: Int): Boolean = { + readerVersion >= ColumnMappingTableFeature.minReaderVersion && + supportsWriterFeatures(writerVersion) + } + + /** + * Determine whether a [[Protocol]] with the given reader protocol version supports + * native features. All protocols that can support native reader features are capable + * of adding the feature to the `readerFeatures` field. */ def supportsReaderFeatures(readerVersion: Int): Boolean = { readerVersion >= TABLE_FEATURES_MIN_READER_VERSION diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 08d5ed0d71..a692ce2c41 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -143,13 +143,13 @@ case class Protocol private ( // Correctness check // Reader and writer versions must match the status of reader and writer features require( - supportsReaderFeatures == readerFeatures.isDefined, + (supportsReaderFeatures || canSupportColumnMappingFeature) == readerFeatures.isDefined, "Mismatched minReaderVersion and readerFeatures.") require( supportsWriterFeatures == writerFeatures.isDefined, "Mismatched minWriterVersion and writerFeatures.") - // When reader is on table features, writer must be on table features too + // When reader is on table features, writer must be on table features too. if (supportsReaderFeatures && !supportsWriterFeatures) { throw DeltaErrors.tableFeatureReadRequiresWriteException( TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) @@ -166,7 +166,7 @@ case class Protocol private ( */ @JsonIgnore lazy val simpleString: String = { - if (!supportsReaderFeatures && !supportsWriterFeatures) { + if (!supportsTableFeatures) { s"$minReaderVersion,$minWriterVersion" } else { val readerFeaturesStr = readerFeatures @@ -203,10 +203,12 @@ object Protocol { def apply( minReaderVersion: Int = Action.readerVersion, minWriterVersion: Int = Action.writerVersion): Protocol = { + val shouldAddReaderFeatures = supportsReaderFeatures(minReaderVersion) || + canSupportColumnMappingFeature(minReaderVersion, minWriterVersion) new Protocol( minReaderVersion = minReaderVersion, minWriterVersion = minWriterVersion, - readerFeatures = if (supportsReaderFeatures(minReaderVersion)) Some(Set()) else None, + readerFeatures = if (shouldAddReaderFeatures) Some(Set()) else None, writerFeatures = if (supportsWriterFeatures(minWriterVersion)) Some(Set()) else None) } @@ -214,7 +216,7 @@ object Protocol { def forTableFeature(tf: TableFeature): Protocol = { // Every table feature is a writer feature. val writerFeatures = tf.requiredFeatures + tf - val readerFeatures = writerFeatures.filter(f => f.isReaderWriterFeature && !f.isLegacyFeature) + val readerFeatures = writerFeatures.filter(_.isReaderWriterFeature) val writerFeaturesNames = writerFeatures.map(_.name) val readerFeaturesNames = readerFeatures.map(_.name) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index e9121cd5ba..48aa427a53 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -235,7 +235,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta expectedJson = s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" + s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" + - """"readerFeatures":["testLegacyReaderWriter"],""" + + """"readerFeatures":[],""" + """"writerFeatures":["testLegacyReaderWriter"]}}""") testActionSerDe( @@ -248,7 +248,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta expectedJson = s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" + s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" + - """"readerFeatures":["testLegacyReaderWriter","testReaderWriter"],""" + + """"readerFeatures":["testReaderWriter"],""" + """"writerFeatures":["testLegacyReaderWriter","testReaderWriter"]}}""") testActionSerDe( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala index 76eaba2121..465a748ba6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala @@ -619,7 +619,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase } // upgrade to name mode val protocol = deltaLog.snapshot.protocol - val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { + val (r, w) = if (protocol.supportsTableFeatures) { (TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) } else { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala index 41786b02ec..fbf60a8d8e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala @@ -271,7 +271,7 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession { Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString), (Protocol.MIN_WRITER_VERSION_PROP, Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString)) - if (snapshot.protocol.supportsReaderFeatures || snapshot.protocol.supportsWriterFeatures) { + if (snapshot.protocol.supportsTableFeatures) { props ++= Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures( spark, metadata, snapshot.protocol) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index b0e34c49ad..e7d0eb6940 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -214,39 +214,38 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } test("upgrade to support table features - many features") { - withTempDir { path => - val log = createTableWithProtocol(Protocol(2, 5), path) - assert(log.update().protocol === Protocol(2, 5)) - val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath) - table.upgradeTableProtocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) - // Setting table feature versions to a protocol without table features is a noop. - assert(log.update().protocol === Protocol(2, 5)) - spark.sql( - s"ALTER TABLE delta.`${path.getPath}` SET TBLPROPERTIES (" + - s" delta.feature.${TestWriterFeature.name}='enabled'" + - s")") - table.upgradeTableProtocol( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = 2, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = None, - writerFeatures = Some( - Set( - AppendOnlyTableFeature, - ChangeDataFeedTableFeature, - CheckConstraintsTableFeature, - ColumnMappingTableFeature, - GeneratedColumnsTableFeature, - InvariantsTableFeature, - TestLegacyWriterFeature, - TestRemovableLegacyWriterFeature, - TestLegacyReaderWriterFeature, - TestRemovableLegacyReaderWriterFeature, - TestWriterFeature) - .map(_.name)))) + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + withTempDir { path => + val log = createTableWithProtocol(Protocol(2, 5), path) + assert(log.update().protocol === Protocol(2, 5)) + val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath) + table.upgradeTableProtocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) + // Setting table feature versions to a protocol without table features is a noop. + assert(log.update().protocol === Protocol(2, 5)) + spark.sql( + s"ALTER TABLE delta.`${path.getPath}` SET TBLPROPERTIES (" + + s" delta.feature.${RowTrackingFeature.name}='enabled'" + + s")") + table.upgradeTableProtocol( + TABLE_FEATURES_MIN_READER_VERSION, + TABLE_FEATURES_MIN_WRITER_VERSION) + assert( + log.snapshot.protocol === Protocol( + minReaderVersion = 2, + minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, + readerFeatures = Some(Set(ColumnMappingTableFeature.name)), + writerFeatures = Some( + Set( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature, + CheckConstraintsTableFeature, + ColumnMappingTableFeature, + GeneratedColumnsTableFeature, + DomainMetadataTableFeature, + RowTrackingFeature) + .map(_.name)))) + } } } @@ -1313,7 +1312,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest deltaLog.snapshot.protocol === Protocol( minReaderVersion = 2, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = None, + readerFeatures = Some(Set.empty), writerFeatures = Some(Set(TestLegacyReaderWriterFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } @@ -2136,7 +2135,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest assert(log.snapshot.protocol === Protocol( 2, TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = None, + readerFeatures = Some(Set.empty), writerFeatures = Some(Set(TestLegacyReaderWriterFeature.name)))) } } @@ -2377,6 +2376,27 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } } + test("Column mapping appears in reader features") { + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir) + sql( + s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta + |TBLPROPERTIES ( + |delta.feature.${ColumnMappingTableFeature.name} = 'supported', + |delta.feature.${TestWriterFeature.name} = 'supported' + |)""".stripMargin) + assert(deltaLog.update().protocol === Protocol( + minReaderVersion = 2, + minWriterVersion = 7, + readerFeatures = Some(Set(ColumnMappingTableFeature.name)), + writerFeatures = Some(Set( + InvariantsTableFeature.name, + AppendOnlyTableFeature.name, + ColumnMappingTableFeature.name, + TestWriterFeature.name)))) + } + } + def protocolWithFeatures( readerFeatures: Seq[TableFeature] = Seq.empty, writerFeatures: Seq[TableFeature] = Seq.empty): Protocol = { @@ -2434,8 +2454,17 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val readerVersion = Math.max(feature.minReaderVersion, 1) val expectedWriterFeatures = Some(Set(feature.name, InvariantsTableFeature.name, AppendOnlyTableFeature.name)) + val supportsColumnMapping = + canSupportColumnMappingFeature(readerVersion, TABLE_FEATURES_MIN_WRITER_VERSION) val expectedReaderFeatures: Option[Set[String]] = - if (supportsReaderFeatures(readerVersion)) Some(Set(feature.name)) else None + if ((feature == ColumnMappingTableFeature && supportsColumnMapping) || + supportsReaderFeatures(readerVersion)) { + Some(Set(feature.name)) + } else if (supportsColumnMapping) { + Some(Set.empty) + } else { + None + } assert( deltaLog.update().protocol === Protocol( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala index cae6b29f4c..36c142ace2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala @@ -303,7 +303,7 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream // upgrade to name mode val protocol = deltaLog.snapshot.protocol - val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { + val (r, w) = if (protocol.supportsTableFeatures) { (TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) } else { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index dc013656bb..1c36c0375a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -3000,7 +3000,7 @@ class DeltaNameColumnMappingSuite extends DeltaSuite .save(tempDir.getCanonicalPath) val protocol = DeltaLog.forTable(spark, tempDir).snapshot.protocol - val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { + val (r, w) = if (protocol.supportsTableFeatures) { (TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) } else { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 5dcb76e0d1..38f1e275ad 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -197,9 +197,8 @@ class DeltaTableFeatureSuite val protocol = Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature) - assert(!protocol.readerFeatures.isDefined) - assert( - protocol.writerFeatures.get === Set(TestLegacyReaderWriterFeature.name)) + assert(protocol.readerFeatures.get === Set.empty) + assert(protocol.writerFeatures.get === Set(TestLegacyReaderWriterFeature.name)) } test("merge protocols") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index 917a04ef5f..20fdd92d18 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -424,7 +424,7 @@ trait DescribeDeltaHistorySuiteBase Seq("UPGRADE PROTOCOL", s"""{"minReaderVersion":$readerVersion,""" + s""""minWriterVersion":$writerVersion,""" + - s""""readerFeatures":["${TestLegacyReaderWriterFeature.name}"],""" + + s""""readerFeatures":[],""" + s""""writerFeatures":["${TestLegacyReaderWriterFeature.name}"]}"""), Seq($"operation", $"operationParameters.newProtocol")) // scalastyle:on line.size.limit diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala index 8eaaeef98a..1a2a76e9cd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala @@ -460,7 +460,8 @@ class RowTrackingBackfillSuite assert( afterProtocol.minWriterVersion === TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) - assert(afterProtocol.readerFeatures === None) + assert(afterProtocol.readerFeatures === Some(Set( + ColumnMappingTableFeature.name))) assert( afterProtocol.writerFeatures === Some(( prevProtocol.implicitlyAndExplicitlySupportedFeatures ++