Skip to content

Commit

Permalink
rename helper
Browse files Browse the repository at this point in the history
  • Loading branch information
Anonymous committed Jul 26, 2023
1 parent 0205923 commit c7c1e94
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils}


/**
* Contains utility classes and method to delete rows in a table using the Deletion Vectors.
* Contains utility classes and method for performing DML operations with Deletion Vectors.
*/
object DeleteWithDeletionVectorsHelper extends DeltaCommand {
object DMLWithDeletionVectorsHelper extends DeltaCommand {
/**
* Creates a DataFrame that can be used to scan for rows matching DELETE condition in given
* Creates a DataFrame that can be used to scan for rows matching condition in given
* files. Generally the given file list is a pruned file list using the stats based pruning.
*/
def createTargetDfForScanningForMatches(
Expand Down Expand Up @@ -114,8 +114,13 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
deltaLog: DeltaLog,
targetDf: DataFrame,
fileIndex: TahoeFileIndex,
condition: Expression): Seq[TouchedFileWithDV] = {
recordDeltaOperation(deltaLog, opType = "DELETE.findTouchedFiles") {
condition: Expression,
opName: String): Seq[TouchedFileWithDV] = {
require(
Set("DELETE", "UPDATE").contains(opName),
s"Expecting 'DELETE' or 'UPDATE', but got '$opName'.")

recordDeltaOperation(deltaLog, opType = s"$opName.findTouchedFiles") {
val candidateFiles = fileIndex match {
case f: TahoeBatchFileIndex => f.addFiles
case _ => throw new IllegalArgumentException("Unexpected file index found!")
Expand Down Expand Up @@ -165,7 +170,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
spark: SparkSession,
touchedFiles: Seq[TouchedFileWithDV],
snapshot: Snapshot): (Seq[FileAction], Map[String, Long]) = {
val numDeletedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
val numModifiedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
val numRemovedFiles: Long = touchedFiles.count(_.isFullyReplaced())

val (fullyRemovedFiles, notFullyRemovedFiles) = touchedFiles.partition(_.isFullyReplaced())
Expand All @@ -182,7 +187,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
val dvAddFilesWithStats = getActionsWithStats(spark, dvAddFiles, snapshot)

// TODO: gather more metrics
val metricMap = Map("numDeletedRows" -> numDeletedRows, "numRemovedFiles" -> numRemovedFiles)
val metricMap = Map("numModifiedRows" -> numModifiedRows, "numRemovedFiles" -> numRemovedFiles)
(fullyRemoved ++ dvAddFilesWithStats ++ dvRemoveFiles, metricMap)
}

Expand Down Expand Up @@ -471,7 +476,7 @@ object DeletionVectorData {
}

/** Final output for each file containing the file path, DeletionVectorDescriptor and how many
* rows are marked as deleted in this file as part of the this DELETE (doesn't include already
* rows are marked as deleted in this file as part of the this OP (doesn't include already
* rows marked as deleted)
*
* @param filePath Absolute path of the data file this DV result is generated for.
Expand Down Expand Up @@ -629,7 +634,7 @@ object DeletionVectorWriter extends DeltaLogging {
}

/**
* Prepares a mapper function that can be used by DELETE command to store the Deletion Vectors
* Prepares a mapper function that can be used by DML command to store the Deletion Vectors
* that are in described in [[DeletionVectorData]] and return their descriptors
* [[DeletionVectorResult]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ case class DeleteCommand(
val fileIndex = new TahoeBatchFileIndex(
sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)
if (shouldWriteDVs) {
val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
sparkSession,
target,
fileIndex)
Expand All @@ -252,21 +252,22 @@ case class DeleteCommand(
// with deletion vectors.
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles(
val touchedFiles = DMLWithDeletionVectorsHelper.findTouchedFiles(
sparkSession,
txn,
mustReadDeletionVectors,
deltaLog,
targetDf,
fileIndex,
cond)
cond,
opName = "DELETE")

if (touchedFiles.nonEmpty) {
val (actions, metricMap) = DeleteWithDeletionVectorsHelper.processUnmodifiedData(
val (actions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
sparkSession,
touchedFiles,
txn.snapshot)
metrics("numDeletedRows").set(metricMap("numDeletedRows"))
metrics("numDeletedRows").set(metricMap("numModifiedRows"))
numRemovedFiles = metricMap("numRemovedFiles")
actions
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ case class UpdateCommand(

if (shouldWriteDeletionVectors) {
// Case 3.1: Update with persistent deletion vectors
val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
sparkSession,
target,
fileIndex)
Expand All @@ -179,22 +179,22 @@ case class UpdateCommand(
// with deletion vectors
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles(
val touchedFiles = DMLWithDeletionVectorsHelper.findTouchedFiles(
sparkSession,
txn,
mustReadDeletionVectors,
deltaLog,
targetDf,
fileIndex,
updateCondition)
updateCondition,
opName = "UPDATE")

if (touchedFiles.nonEmpty) {
val (dvActions, metricMap) = DeleteWithDeletionVectorsHelper.processUnmodifiedData(
val (dvActions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
sparkSession,
touchedFiles,
txn.snapshot)
// Number of updated rows is equal to the number of rows deleted by DV.
metrics("numUpdatedRows").set(metricMap("numDeletedRows"))
metrics("numUpdatedRows").set(metricMap("numModifiedRows"))
numTouchedFiles = metricMap("numRemovedFiles")
val dvRewriteStartMs = System.nanoTime()
val newFiles = rewriteFiles(
Expand Down

0 comments on commit c7c1e94

Please sign in to comment.