diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 2fc9ddf763..c9b8893880 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -540,6 +540,12 @@ ], "sqlState" : "2D521" }, + "DELTA_CONCURRENT_DOMAIN_METADATA" : { + "message" : [ + "ConcurrentDomainMetadataException: This error occurs when multiple queries have a conflicting domain. Please try the operation again. \nRefer to for more details." + ], + "sqlState" : "2D521" + }, "DELTA_CONCURRENT_WRITE" : { "message" : [ "ConcurrentWriteException: A concurrent transaction has written new data since the current transaction read the table. Please try the operation again.\nRefer to for more details." diff --git a/spark/src/main/scala/io/delta/exceptions/DeltaConcurrentExceptions.scala b/spark/src/main/scala/io/delta/exceptions/DeltaConcurrentExceptions.scala index c22cb384c5..00893db2e6 100644 --- a/spark/src/main/scala/io/delta/exceptions/DeltaConcurrentExceptions.scala +++ b/spark/src/main/scala/io/delta/exceptions/DeltaConcurrentExceptions.scala @@ -69,6 +69,23 @@ class MetadataChangedException(message: String) override def getMessage: String = message } +/** + * :: Evolving :: + * + * Thrown when a conflicting metadata domain is added. + * + */ +@Evolving +class ConcurrentDomainMetadataException(message: String) + extends org.apache.spark.sql.delta.ConcurrentWriteException(message) + with DeltaThrowable { + def this(messageParameters: Array[String]) = { + this(DeltaThrowableHelper.getMessage("DELTA_CONCURRENT_DOMAIN_METADATA", messageParameters)) + } + override def getErrorClass: String = "DELTA_CONCURRENT_DOMAIN_METADATA" + override def getMessage: String = message +} + /** * :: Evolving :: * diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala index 0e04431b1d..1690b60621 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala @@ -674,9 +674,8 @@ private[delta] class ConflictChecker( case (domain, _) if RowTrackingMetadataDomain.isSameDomain(domain) => domain case (_, Some(_)) => // Any conflict not specifically handled by a previous case must fail the transaction. - throw new io.delta.exceptions.ConcurrentTransactionException( - s"A conflicting metadata domain ${domainMetadataFromCurrentTransaction.domain} is " + - "added.") + throw new ConcurrentDomainMetadataException(winningCommitSummary.commitInfo, + domainMetadataFromCurrentTransaction.domain) } val mergedDomainMetadata = mutable.Buffer.empty[DomainMetadata] diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 58d7105bae..195274c1f9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -3545,6 +3545,19 @@ class MetadataChangedException(message: String) conflictingCommit)) } +/** + * This class is kept for backward compatibility. + * Use [[io.delta.exceptions.ConcurrentDomainMetadataException]] instead. + */ +class ConcurrentDomainMetadataException(message: String) + extends io.delta.exceptions.DeltaConcurrentModificationException(message) { + def this(conflictingCommit: Option[CommitInfo], domain: String) = this( + DeltaErrors.concurrentModificationExceptionMsg( + SparkEnv.get.conf, + s"A conflicting metadata domain ${domain} is added.", + conflictingCommit)) +} + /** * This class is kept for backward compatibility. * Use [[io.delta.exceptions.ProtocolChangedException]] instead.