From b9ac7d38ea7f4f7e57c4bbb7b5e5bc836b2b1c58 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 20 Oct 2023 09:43:11 +0800 Subject: [PATCH] [SPARK-45484][SQL] Fix the bug that uses incorrect parquet compression codec lz4raw ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/41507 supported the new parquet compression codec `lz4raw`. But `lz4raw` is not a correct parquet compression codec name. This mistake causes error. Please refer https://github.com/apache/spark/pull/43310/files#r1352405312 The root cause is parquet uses `lz4_raw` as its name and store it into the metadata of parquet file. Please refer https://github.com/apache/spark/blob/6373f19f537f69c6460b2e4097f19903c01a608f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala#L65 We should use `lz4_raw` as its name. ### Why are the changes needed? Fix the bug that uses incorrect parquet compression codec `lz4raw`. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug. ### How was this patch tested? New test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43310 from beliefer/SPARK-45484. Authored-by: Jiaan Geng Signed-off-by: Jiaan Geng --- docs/sql-migration-guide.md | 1 + .../org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../datasources/parquet/ParquetOptions.scala | 2 +- .../datasources/FileSourceCodecSuite.scala | 2 +- .../ParquetCompressionCodecPrecedenceSuite.scala | 13 +++++++++++-- 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index c5d09c19b245f..b0dc49ed47683 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -27,6 +27,7 @@ license: | - Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set `spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`). - Since Spark 4.0, any read of SQL tables takes into consideration the SQL configs `spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` instead of the core config `spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`. - Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users should migrate to higher versions. +- Since Spark 4.0, `spark.sql.parquet.compression.codec` drops the support of codec name `lz4raw`, please use `lz4_raw` instead. ## Upgrading from Spark SQL 3.4 to 3.5 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e66eadaa914ef..1e759b6266c61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1015,12 +1015,12 @@ object SQLConf { "`parquet.compression` is specified in the table-specific options/properties, the " + "precedence would be `compression`, `parquet.compression`, " + "`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " + - "snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.") + "snappy, gzip, lzo, brotli, lz4, lz4_raw, zstd.") .version("1.1.1") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues( - Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd")) + Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4_raw", "zstd")) .createWithDefault("snappy") val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 023d2460959cd..559a994319d3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -96,7 +96,7 @@ object ParquetOptions extends DataSourceOptions { "lzo" -> CompressionCodecName.LZO, "brotli" -> CompressionCodecName.BROTLI, "lz4" -> CompressionCodecName.LZ4, - "lz4raw" -> CompressionCodecName.LZ4_RAW, + "lz4_raw" -> CompressionCodecName.LZ4_RAW, "zstd" -> CompressionCodecName.ZSTD) def getParquetCompressionCodecName(name: String): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 09a348cd29451..11e9f4665a9cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // on Maven Central. override protected def availableCodecs: Seq[String] = { - Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw") + Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 7e1a9becd23f7..1a387b7d2de63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -29,7 +29,16 @@ import org.apache.spark.sql.test.SharedSparkSession class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { test("Test `spark.sql.parquet.compression.codec` config") { - Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", "ZSTD").foreach { c => + Seq( + "NONE", + "UNCOMPRESSED", + "SNAPPY", + "GZIP", + "LZO", + "LZ4", + "BROTLI", + "ZSTD", + "LZ4_RAW").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { val expected = if (c == "NONE") "UNCOMPRESSED" else c val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) @@ -105,7 +114,7 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("Create parquet table with compression") { Seq(true, false).foreach { isPartitioned => - val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4") + val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW") codecs.foreach { compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned) }