Skip to content

Commit

Permalink
Update ParquetIOSuite.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed May 29, 2024
1 parent 7e49844 commit 0a426d2
Showing 1 changed file with 38 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, TestUtils}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -1206,6 +1206,43 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
errorMessage.contains("is not a valid DFS filename"))
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
if (m2.getErrorClass != null) {
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
} else {
assert(m2.getMessage.contains("TASK_WRITE_FAILED"))
}
}
}
}

test("SPARK-11044 Parquet writer version fixed as version1 ") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
Expand Down Expand Up @@ -1550,52 +1587,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

// Parquet IO test suite with output commit coordination disabled.
// This test suite is separated ParquetIOSuite to avoid race condition of failure events
// from `OutputCommitCoordination` and `TaskSetManager`.
class ParquetIOWithoutOutputCommitCoordinationSuite
extends QueryTest with ParquetTest with SharedSparkSession {
import testImplicits._

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.hadoop.outputCommitCoordination.enabled", "false")
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

Expand Down

0 comments on commit 0a426d2

Please sign in to comment.