Skip to content

Commit

Permalink
Don't write empty commits delta-io#1933
Browse files Browse the repository at this point in the history
Add unit tests for change delta-io#1933

Fixup scalastyle complaints

Remove extraneous curly braces

Address PR comments

Signed-off-by: Christopher Watford <[email protected]>
  • Loading branch information
watfordkcf committed Aug 14, 2023
1 parent a35c559 commit 79ac482
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ case class WriteIntoDelta(
mode, Option(partitionColumns),
options.replaceWhere, options.userMetadata
)
txn.commit(actions, operation)

txn.commitIfNeeded(actions, operation)
}
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
Expand All @@ -35,6 +36,8 @@ class OptimisticTransactionSuite
extends OptimisticTransactionLegacyTests
with OptimisticTransactionSuiteBase {

import testImplicits._

// scalastyle:off: removeFile
private val addA = createTestAddFile(path = "a")
private val addB = createTestAddFile(path = "b")
Expand Down Expand Up @@ -642,4 +645,58 @@ class OptimisticTransactionSuite
checkLastCommitTags(expectedTags = Some(tags2))
}
}

test("empty commits are elided on write by default") {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)

val expectedSnapshot = deltaLog.update()
val expectedDeltaVersion = expectedSnapshot.version

val emptyDf = Seq.empty[(Integer, Integer)].toDF("key", "value")
emptyDf.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val actualSnapshot = deltaLog.update()
val actualDeltaVersion = actualSnapshot.version

checkAnswer(spark.read.format("delta").load(tableDir.getCanonicalPath),
Row(1, 0) :: Row(2, 1) :: Nil)

assert(expectedDeltaVersion === actualDeltaVersion)
}
}

Seq(true, false).foreach { skip =>
test(s"Elide empty commits when requested - skipRecordingEmptyCommits=$skip") {
withSQLConf(DeltaSQLConf.DELTA_SKIP_RECORDING_EMPTY_COMMITS.key -> skip.toString) {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)

val expectedSnapshot = deltaLog.update()
val expectedDeltaVersion = if (skip) {
expectedSnapshot.version
} else {
expectedSnapshot.version + 1
}

val emptyDf = Seq.empty[(Integer, Integer)].toDF("key", "value")
emptyDf.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val actualSnapshot = deltaLog.update()
val actualDeltaVersion = actualSnapshot.version

checkAnswer(spark.read.format("delta").load(tableDir.getCanonicalPath),
Row(1, 0) :: Row(2, 1) :: Nil)

assert(expectedDeltaVersion === actualDeltaVersion)
}
}
}
}
}

0 comments on commit 79ac482

Please sign in to comment.