Skip to content

Commit

Permalink
[Spark] Fix stats with tightBounds check for AddFiles with deletionVe…
Browse files Browse the repository at this point in the history
…ctors (#3633)

## Description

Check `DeletionVectorFilesHaveWideBoundsCheck` has been disabled for
COMPUTE STATS because it reintroduces stats with tight bound to files
with Deletion Vectors. However, there are other operations that can then
copy over these AddFile actions with DVs and tight stats. These
operations resulted in
DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED error, which
was a false positive.

In this PR we also attempt to introduce and discuss a "framework" for
checks like that as a property of DeltaOperations, with DeltaOperations
declaring as a member method whether a certain property and check should
be performed. This is opposed to current practice, where many places in
the code feature special cases like matching against a certain
DeltaOperation and doing something special; this kind of code is very
decentralized, and it's easy to miss if any new place or new operation
needs such central handling. If this was centralized in DeltaOperations,
this could lead to better discoverability of special cases and edge
cases when implementing new operations.

## How was this patch tested?
Tests added.

Co-authored-by: Julek Sompolski <Juliusz Sompolski>
  • Loading branch information
juliuszsompolski authored Sep 3, 2024
1 parent a745000 commit 5d63094
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 17 deletions.
135 changes: 134 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ object DeltaOperations {
transformer.transformToString(metric, allMetrics)
}
}

/**
* A transaction that commits AddFile actions with deletionVector should have column stats that
* are not tight bounds. An exception to this is ComputeStats operation, which recomputes stats
* on these files, and the new stats are tight bounds. Some other operations that merely take an
* existing AddFile action and commit a copy of it, not changing the deletionVector or stats,
* can then also recommit AddFile with deletionVector and tight bound stats that were recomputed
* before.
*
* An operation for which this can happen, and there is no way that it could be committing
* new deletion vectors, should set this to false to bypass this check.
* All other operations should set this to true, so that this is validated during commit.
*
* This is abstract to force the implementers of all operations to think about this setting.
* All operations should add a comment justifying this setting.
* Any operation that sets this to false should add a test in TightBoundsSuite.
*/
def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean

}

abstract class OperationWithPredicates(name: String, val predicates: Seq[Expression])
Expand Down Expand Up @@ -133,13 +152,20 @@ object DeltaOperations {
DeltaOperationMetrics.WRITE_REPLACE_WHERE
}
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
// DVs can be introduced by the replaceWhere operation.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class RemoveColumnMapping(
override val userMetadata: Option[String] = None) extends Operation("REMOVE COLUMN MAPPING") {
override def parameters: Map[String, Any] = Map()

override val operationMetrics: Set[String] = DeltaOperationMetrics.REMOVE_COLUMN_MAPPING

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded during streaming inserts. */
Expand All @@ -154,6 +180,9 @@ object DeltaOperations {
)
override val operationMetrics: Set[String] = DeltaOperationMetrics.STREAMING_UPDATE
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded while deleting certain partitions. */
case class Delete(predicate: Seq[Expression])
Expand All @@ -175,12 +204,18 @@ object DeltaOperations {
strMetrics ++ dvMetrics
}
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when truncating the table. */
case class Truncate() extends Operation("TRUNCATE") {
override val parameters: Map[String, Any] = Map.empty
override val operationMetrics: Set[String] = DeltaOperationMetrics.TRUNCATE
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when converting a table into a Delta table. */
Expand All @@ -198,6 +233,9 @@ object DeltaOperations {
sourceFormat.map("sourceFormat" -> _)
override val operationMetrics: Set[String] = DeltaOperationMetrics.CONVERT
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Represents the predicates and action type (insert, update, delete) for a Merge clause */
Expand Down Expand Up @@ -265,6 +303,9 @@ object DeltaOperations {
}

override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

object Merge {
Expand Down Expand Up @@ -296,6 +337,9 @@ object DeltaOperations {
val dvMetrics = transformDeletionVectorMetrics(metrics)
super.transformMetrics(metrics) ++ dvMetrics
}

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table is created. */
case class CreateTable(
Expand All @@ -317,6 +361,9 @@ object DeltaOperations {
DeltaOperationMetrics.WRITE
}
override def changesData: Boolean = asSelect

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table is replaced. */
case class ReplaceTable(
Expand All @@ -341,12 +388,18 @@ object DeltaOperations {
DeltaOperationMetrics.WRITE
}
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table properties are set. */
val OP_SET_TBLPROPERTIES = "SET TBLPROPERTIES"
case class SetTableProperties(
properties: Map[String, String]) extends Operation(OP_SET_TBLPROPERTIES) {
override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table properties are unset. */
case class UnsetTableProperties(
Expand All @@ -355,6 +408,9 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"properties" -> JsonUtils.toJson(propKeys),
"ifExists" -> ifExists)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when dropping a table feature. */
case class DropTableFeature(
Expand All @@ -363,6 +419,9 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"featureName" -> featureName,
"truncateHistory" -> truncateHistory)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when columns are added. */
case class AddColumns(
Expand All @@ -375,6 +434,9 @@ object DeltaOperations {
"column" -> structFieldToMap(columnPath, column)
) ++ colPosition.map("position" -> _.toString)
}))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when columns are dropped. */
Expand All @@ -384,6 +446,9 @@ object DeltaOperations {

override val parameters: Map[String, Any] = Map(
"columns" -> JsonUtils.toJson(colsToDrop.map(UnresolvedAttribute(_).name)))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when column is renamed */
Expand All @@ -394,6 +459,9 @@ object DeltaOperations {
"oldColumnPath" -> UnresolvedAttribute(oldColumnPath).name,
"newColumnPath" -> UnresolvedAttribute(newColumnPath).name
)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when columns are changed. */
Expand All @@ -406,13 +474,19 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"column" -> JsonUtils.toJson(structFieldToMap(columnPath, newColumn))
) ++ colPosition.map("position" -> _)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when columns are replaced. */
case class ReplaceColumns(
columns: Seq[StructField]) extends Operation("REPLACE COLUMNS") {

override val parameters: Map[String, Any] = Map(
"columns" -> JsonUtils.toJson(columns.map(structFieldToMap(Seq.empty, _))))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") {
Expand All @@ -422,15 +496,24 @@ object DeltaOperations {
"readerFeatures" -> newProtocol.readerFeatures,
"writerFeatures" -> newProtocol.writerFeatures
)))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

object ManualUpdate extends Operation("Manual Update") {
override val parameters: Map[String, Any] = Map.empty

// Unsafe manual update disables checks.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

/** A commit without any actions. Could be used to force creation of new checkpoints. */
object EmptyCommit extends Operation("Empty Commit") {
override val parameters: Map[String, Any] = Map.empty

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class UpdateColumnMetadata(
Expand All @@ -442,18 +525,27 @@ object DeltaOperations {
case (path, field) => structFieldToMap(path, field)
}))
}

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class UpdateSchema(oldSchema: StructType, newSchema: StructType)
extends Operation("UPDATE SCHEMA") {
override val parameters: Map[String, Any] = Map(
"oldSchema" -> JsonUtils.toJson(oldSchema),
"newSchema" -> JsonUtils.toJson(newSchema))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class AddConstraint(
constraintName: String, expr: String) extends Operation("ADD CONSTRAINT") {
override val parameters: Map[String, Any] = Map("name" -> constraintName, "expr" -> expr)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class DropConstraint(
Expand All @@ -465,11 +557,19 @@ object DeltaOperations {
Map("name" -> constraintName, "existed" -> "false")
}
}

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when recomputing stats on the table. */
case class ComputeStats(predicate: Seq[Expression])
extends OperationWithPredicates("COMPUTE STATS", predicate)
extends OperationWithPredicates("COMPUTE STATS", predicate) {

// ComputeStats operation commits AddFiles with recomputed stats which are always tight bounds,
// even when DVs are present. This check should be disabled.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

/** Recorded when restoring a Delta table to an older version. */
val OP_RESTORE = "RESTORE"
Expand All @@ -482,6 +582,10 @@ object DeltaOperations {
override def changesData: Boolean = true

override val operationMetrics: Set[String] = DeltaOperationMetrics.RESTORE

// Restore operation commits AddFiles with files, DVs and stats from the version it restores to.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression])
Expand Down Expand Up @@ -517,6 +621,9 @@ object DeltaOperations {
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when cloning a Delta table into a new location. */
Expand All @@ -531,6 +638,10 @@ object DeltaOperations {
)
override def changesData: Boolean = true
override val operationMetrics: Set[String] = DeltaOperationMetrics.CLONE

// Clone operation commits AddFiles with files, DVs and stats copied over from the source table.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

/**
Expand All @@ -548,6 +659,9 @@ object DeltaOperations {
) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/**
Expand All @@ -559,6 +673,9 @@ object DeltaOperations {
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when running REORG on the table. */
Expand All @@ -570,6 +687,9 @@ object DeltaOperations {
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when clustering columns are changed on clustered tables. */
Expand All @@ -579,6 +699,9 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"oldClusteringColumns" -> oldClusteringColumns,
"newClusteringColumns" -> newClusteringColumns)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when we backfill a Delta table's existing AddFiles with row tracking data. */
Expand All @@ -587,6 +710,10 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"batchId" -> JsonUtils.toJson(batchId)
)

// RowTrackingBackfill operation commits AddFiles with files, DVs and stats copied over.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Expand All @@ -610,6 +737,9 @@ object DeltaOperations {
/** Dummy operation only for testing with arbitrary operation names */
case class TestOperation(operationName: String = "TEST") extends Operation(operationName) {
override val parameters: Map[String, Any] = Map.empty

// Perform the check for testing.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/**
Expand All @@ -630,6 +760,9 @@ object DeltaOperations {
case class UpgradeUniformProperties(properties: Map[String, String]) extends Operation(
OP_UPGRADE_UNIFORM_BY_REORG) {
override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties))

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
}

Expand Down
Loading

0 comments on commit 5d63094

Please sign in to comment.