diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 60a96306b7..f2be176910 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -1033,12 +1033,12 @@ case class DeltaSource( var commitProcessedInBatch = false /** - * This overloaded method checks if all the AddCDCFiles for a commit can be accommodated by + * This overloaded method checks if all the FileActions for a commit can be accommodated by * the rate limit. */ - def admit(fileActions: Seq[AddCDCFile]): Boolean = { - def getSize(actions: Seq[AddCDCFile]): Long = { - actions.foldLeft(0L) { (l, r) => l + r.size } + def admit(fileActions: Seq[FileAction]): Boolean = { + def getSize(actions: Seq[FileAction]): Long = { + actions.foldLeft(0L) { (l, r) => l + r.getFileSize } } if (fileActions.isEmpty) { true diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala index f54457740c..358c1ccd20 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala @@ -148,13 +148,31 @@ trait DeltaSourceCDCSupport { self: DeltaSource => // We allow entries with no file actions and index as [[DeltaSourceOffset.BASE_INDEX]] // that are used primarily to update latest offset when no other // file action based entries are present. - fileActions.filter(indexedFile => hasAddsOrRemoves(indexedFile) || - hasNoFileActionAndStartIndex(indexedFile)) - .filter( - isValidIndexedFile(_, fromVersion, fromIndex, endOffset) - ).takeWhile { indexedFile => - admissionControl.admit(Option(indexedFile.getFileAction)) - }.toIterator + val filteredFiles = fileActions + .filter { indexedFile => + hasAddsOrRemoves(indexedFile) || hasNoFileActionAndStartIndex(indexedFile) + } + .filter(isValidIndexedFile(_, fromVersion, fromIndex, endOffset)) + val filteredFileActions = filteredFiles.flatMap(f => Option(f.getFileAction)) + val hasDeletionVectors = filteredFileActions.exists { + case add: AddFile => add.deletionVector != null + case remove: RemoveFile => remove.deletionVector != null + case _ => false + } + if (hasDeletionVectors) { + // We cannot split up add/remove pairs with Deletion Vectors, because we will get the + // wrong result. + // So in this case we behave as above with CDC files and either admit all or none. + if (admissionControl.admit(filteredFileActions)) { + filteredFiles.toIterator + } else { + Iterator() + } + } else { + filteredFiles.takeWhile { indexedFile => + admissionControl.admit(Option(indexedFile.getFileAction)) + }.toIterator + } } } } @@ -279,7 +297,6 @@ trait DeltaSourceCDCSupport { self: DeltaSource => } else { filterAndIndexDeltaLogs(fromVersion) } - // In this case, filterFiles will consume the available capacity. We use takeWhile // to stop the iteration when we reach the limit which will save us from reading // unnecessary log files. diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala index d8a591437d..38982a4805 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala @@ -1006,6 +1006,40 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest } } } + + // LC-1281: Ensure that when we would split batches into one file at a time, we still produce + // correct CDF even in cases where the CDF may need to compare multiple file actions from the + // same commit to be correct, such as with persistent deletion vectors. + test("double delete-only on the same file") { + withTempDir { tableDir => + val tablePath = tableDir.toString + spark.range(start = 0L, end = 10L, step = 1L, numPartitions = 1).toDF("id") + .write.format("delta").save(tablePath) + + spark.sql(s"DELETE FROM delta.`$tablePath` WHERE id IN (1, 3, 6)") + spark.sql(s"DELETE FROM delta.`$tablePath` WHERE id IN (2, 4, 7)") + + val stream = spark.readStream + .format("delta") + .option(DeltaOptions.CDC_READ_OPTION, true) + .option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, 1) + .option(DeltaOptions.STARTING_VERSION_OPTION, 1) + .load(tablePath) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP) + + testStream(stream)( + ProcessAllAvailable(), + CheckAnswer( + (1L, "delete", 1L), + (3L, "delete", 1L), + (6L, "delete", 1L), + (2L, "delete", 2L), + (4L, "delete", 2L), + (7L, "delete", 2L) + ) + ) + } + } } class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite