From 326dbb4478732eb9b7a683511e69206f2b21bd37 Mon Sep 17 00:00:00 2001 From: Vladimir Golubev Date: Tue, 7 May 2024 20:28:50 +0800 Subject: [PATCH] [SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser # What changes were proposed in this pull request? New lightweight exception for control-flow between UnivocityParser and FalureSafeParser to speed-up malformed CSV parsing ### Why are the changes needed? Parsing in `PermissiveMode` is slow due to heavy exception construction (stacktrace filling + string template substitution in `SparkRuntimeException`) ### Does this PR introduce _any_ user-facing change? No, since `FailureSafeParser` unwraps `BadRecordException` and correctly rethrows user-facing exceptions in `FailFastMode` ### How was this patch tested? - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite` - Manually run csv benchmark on DB benchmark workspace - Manually checked correct and malformed csv in sherk-shell (org.apache.spark.SparkException is thrown with the stacktrace) ### Was this patch authored or co-authored using generative AI tooling? No Closes #46400 from vladimirg-db/vladimirg-db/speed-up-csv-parser. Authored-by: Vladimir Golubev Signed-off-by: Wenchen Fan --- .../sql/catalyst/csv/UnivocityParser.scala | 10 ++++---- .../sql/catalyst/json/JacksonParser.scala | 5 ++-- .../catalyst/util/BadRecordException.scala | 23 +++++++++++++++---- .../sql/catalyst/util/FailureSafeParser.scala | 2 +- .../sql/catalyst/xml/StaxXmlParser.scala | 5 ++-- 5 files changed, 29 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index a5158d8a22c6b..37d9143e5b5a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -316,17 +316,17 @@ class UnivocityParser( throw BadRecordException( () => getCurrentInput, () => Array.empty, - QueryExecutionErrors.malformedCSVRecordError("")) + () => QueryExecutionErrors.malformedCSVRecordError("")) } val currentInput = getCurrentInput - var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) { + var badRecordException: Option[() => Throwable] = if (tokens.length != parsedSchema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens. It continues to parses the // tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing // tokens. - Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)) + Some(() => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)) } else None // When the length of the returned tokens is identical to the length of the parsed schema, // we just need to: @@ -348,7 +348,7 @@ class UnivocityParser( } catch { case e: SparkUpgradeException => throw e case NonFatal(e) => - badRecordException = badRecordException.orElse(Some(e)) + badRecordException = badRecordException.orElse(Some(() => e)) // Use the corresponding DEFAULT value associated with the column, if any. row.update(i, ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i)) } @@ -359,7 +359,7 @@ class UnivocityParser( } else { if (badRecordException.isDefined) { throw BadRecordException( - () => currentInput, () => Array(requiredRow.get), badRecordException.get) + () => currentInput, () => Array[InternalRow](requiredRow.get), badRecordException.get) } else { requiredRow } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index eadd0a4f8ab9e..d1093a3b1be17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -613,7 +613,7 @@ class JacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => recordLiteral(record), () => Array.empty, e) + throw BadRecordException(() => recordLiteral(record), cause = e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -621,8 +621,7 @@ class JacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), () => Array.empty, - wrappedCharException) + throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 65a56c1064e45..84f183af77199 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -67,16 +67,31 @@ case class PartialResultArrayException( extends Exception(cause) /** - * Exception thrown when the underlying parser meet a bad record and can't parse it. + * Exception thrown when the underlying parser meets a bad record and can't parse it. Used for + * control flow between wrapper and underlying parser without overhead of creating a full exception. * @param record a function to return the record that cause the parser to fail * @param partialResults a function that returns an row array, which is the partial results of * parsing this bad record. - * @param cause the actual exception about why the record is bad and can't be parsed. + * @param cause a function to return the actual exception about why the record is bad and can't be + * parsed. */ case class BadRecordException( @transient record: () => UTF8String, - @transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], - cause: Throwable) extends Exception(cause) + @transient partialResults: () => Array[InternalRow], + @transient cause: () => Throwable) + extends Exception() { + + override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) + override def fillInStackTrace(): Throwable = this +} + +object BadRecordException { + def apply( + record: () => UTF8String, + partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], + cause: Throwable): BadRecordException = + new BadRecordException(record, partialResults, () => cause) +} /** * Exception thrown when the underlying parser parses a JSON array as a struct. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 10cd159c769b2..b005563aa824f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -70,7 +70,7 @@ class FailureSafeParser[IN]( case DropMalformedMode => Iterator.empty case FailFastMode => - e.getCause match { + e.cause() match { case _: JsonArraysAsStructsException => // SPARK-42298 we recreate the exception here to make sure the error message // have the record content. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index ab671e56a21e5..2b30fe2bfab19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -148,7 +148,7 @@ class StaxXmlParser( // XML parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => xmlRecord, () => Array.empty, e) + throw BadRecordException(() => xmlRecord, cause = e) case e: CharConversionException if options.charset.isEmpty => val msg = """XML parser cannot handle a character in its input. @@ -156,8 +156,7 @@ class StaxXmlParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => xmlRecord, () => Array.empty, - wrappedCharException) + throw BadRecordException(() => xmlRecord, cause = wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => xmlRecord,