Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Delta] Add Test Suites for Coordinated Commits Properties #3729

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -857,141 +857,6 @@ trait CloneTableSuiteBase extends QueryTest
assert(sourceSnapshot.protocol === targetSnapshot.protocol)
}
}

testAllClones(s"Clone should pick commit coordinator from session settings") {
(source, target, isShallow) =>
val someOtherCommitCoordinatorName = "some-other-commit-coordinator"
case class SomeValidCommitCoordinatorBuilder() extends CommitCoordinatorBuilder {
val commitCoordinator = new InMemoryCommitCoordinator(batchSize = 1000L)
override def getName: String = someOtherCommitCoordinatorName
override def build(spark: SparkSession, conf: Map[String, String]): CommitCoordinatorClient =
commitCoordinator
}
CommitCoordinatorProvider.registerBuilder(SomeValidCommitCoordinatorBuilder())

// Create source table
val sourceLog = DeltaLog.forTable(spark, source)
spark.range(5).write.format("delta").save(source)
spark.range(5, 10).write.format("delta").mode("append").save(source)
spark.range(10, 15).write.format("delta").mode("append").save(source)

// CLONE should pick the commit coordinator from the session settings over the
// commit coordinator from the source table.
withCustomCoordinatedCommitsTableProperties(someOtherCommitCoordinatorName) {
cloneTable(
source,
target,
isReplace = true)
val targetLog = DeltaLog.forTable(spark, target)
val targetCommitCoordinator =
targetLog.update().tableCommitCoordinatorClientOpt.get.commitCoordinatorClient
assert(targetCommitCoordinator.asInstanceOf[InMemoryCommitCoordinator].batchSize == 1000)
}
checkAnswer(
spark.read.format("delta").load(source),
spark.read.format("delta").load(target))
}

testAllClones(s"Clone should ignore commit coordinator if it is not set in session settings") {
(source, target, isShallow) =>
// Create source table
val sourceLog = DeltaLog.forTable(spark, source)
spark.range(5).write.format("delta").save(source)
spark.range(5, 10).write.format("delta").mode("append").save(source)
spark.range(10, 15).write.format("delta").mode("append").save(source)

// Commit-Coordinator for target should not be set because it is unset in session.
withoutCoordinatedCommitsDefaultTableProperties {
cloneTable(
source,
target,
isReplace = true)
val targetLog = DeltaLog.forTable(spark, target)
assert(targetLog.update().tableCommitCoordinatorClientOpt.isEmpty)
}
checkAnswer(
spark.read.format("delta").load(source),
spark.read.format("delta").load(target))
}

testAllClones(s"Clone should give highest priority to commit coordinator specified directly in " +
s"clone command") { (source, target, isShallow) =>
val someOtherCommitCoordinatorName1 = "some-other-commit-coordinator-1"
case class SomeValidCommitCoordinatorBuilder1() extends CommitCoordinatorBuilder {
val commitCoordinator = new InMemoryCommitCoordinator(batchSize = 1000L)
override def getName: String = someOtherCommitCoordinatorName1
override def build(spark: SparkSession, conf: Map[String, String]): CommitCoordinatorClient =
commitCoordinator
}
CommitCoordinatorProvider.registerBuilder(SomeValidCommitCoordinatorBuilder1())
val someOtherCommitCoordinatorName2 = "some-other-commit-coordinator-2"
case class SomeValidCommitCoordinatorBuilder2() extends CommitCoordinatorBuilder {
val commitCoordinator = new InMemoryCommitCoordinator(batchSize = 2000L)
override def getName: String = someOtherCommitCoordinatorName2
override def build(spark: SparkSession, conf: Map[String, String]): CommitCoordinatorClient =
commitCoordinator
}
CommitCoordinatorProvider.registerBuilder(SomeValidCommitCoordinatorBuilder2())

// Create source table
val sourceLog = DeltaLog.forTable(spark, source)
spark.range(5).write.format("delta").save(source)
spark.range(5, 10).write.format("delta").mode("append").save(source)
spark.range(10, 15).write.format("delta").mode("append").save(source)

// When commit-coordinator is specified in both the spark session (with batchSize=1000 here)
// and CLONE command overrides (with batchSize=2000 here, CLONE should give priority to
// properties explicitly overridden with the CLONE command.
withCustomCoordinatedCommitsTableProperties(someOtherCommitCoordinatorName1) {
val e1 = intercept[IllegalArgumentException] {
val properties = Map(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key ->
someOtherCommitCoordinatorName2)
cloneTable(
source,
target,
isReplace = true,
tableProperties = properties)
}
assert(e1.getMessage.contains("During CREATE with CLONE, either both coordinated commits " +
"configurations (\"delta.coordinatedCommits.commitCoordinator-preview\", \"delta." +
"coordinatedCommits.commitCoordinatorConf-preview\") are set in the command or neither " +
"of them. Missing: \"delta.coordinatedCommits.commitCoordinatorConf-preview\"."))

val e2 = intercept[IllegalArgumentException] {
val properties = Map(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key ->
someOtherCommitCoordinatorName2,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key -> "{}",
DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF.key -> "{}")
cloneTable(
source,
target,
isReplace = true,
tableProperties = properties)
}
assert(e2.getMessage.contains("During CREATE with CLONE, configuration \"delta." +
"coordinatedCommits.tableConf-preview\" cannot be set from the command."))

val properties = Map(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key ->
someOtherCommitCoordinatorName2,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key -> "{}")
cloneTable(
source,
target,
isReplace = true,
tableProperties = properties)

val targetLog = DeltaLog.forTable(spark, target)
val targetCommitStore =
targetLog.update().tableCommitCoordinatorClientOpt.get.commitCoordinatorClient
assert(targetCommitStore.asInstanceOf[InMemoryCommitCoordinator].batchSize == 2000L)
}
checkAnswer(
spark.read.format("delta").load(source),
spark.read.format("delta").load(target))
}
}


Expand Down
Loading
Loading