Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51269][SQL] SQLConf should manage the default value for avro compression level #50021

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.util.{Locale, Properties, TimeZone}
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater

import scala.collection.immutable
import scala.jdk.CollectionConverters._
import scala.util.Try
import scala.util.control.NonFatal
import scala.util.matching.Regex

import org.apache.avro.file.CodecFactory
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.OutputCommitter

Expand Down Expand Up @@ -4196,8 +4196,8 @@ object SQLConf {
"The default value is -1 which corresponds to 6 level in the current implementation.")
.version("2.4.0")
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createOptional
.checkValues((1 to 9).toSet + CodecFactory.DEFAULT_DEFLATE_LEVEL)
.createWithDefault(CodecFactory.DEFAULT_DEFLATE_LEVEL)

val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level")
.doc("Compression level for the xz codec used in writing of AVRO files. " +
Expand All @@ -4206,14 +4206,14 @@ object SQLConf {
.version("4.0.0")
.intConf
.checkValue(v => v > 0 && v <= 9, "The value must be in the range of from 1 to 9 inclusive.")
.createOptional
.createWithDefault(CodecFactory.DEFAULT_XZ_LEVEL)

val AVRO_ZSTANDARD_LEVEL = buildConf("spark.sql.avro.zstandard.level")
.doc("Compression level for the zstandard codec used in writing of AVRO files. " +
"The default value is 3.")
.version("4.0.0")
.intConf
.createOptional
.createWithDefault(CodecFactory.DEFAULT_ZSTANDARD_LEVEL)

val AVRO_ZSTANDARD_BUFFER_POOL_ENABLED = buildConf("spark.sql.avro.zstandard.bufferPool.enabled")
.doc("If true, enable buffer pool of ZSTD JNI library when writing of AVRO files")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,27 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.avro.file.*;
import org.apache.avro.file.DataFileConstants;

/**
* A mapper class from Spark supported avro compression codecs to avro compression codecs.
*/
public enum AvroCompressionCodec {
UNCOMPRESSED(DataFileConstants.NULL_CODEC, false, -1),
DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),
SNAPPY(DataFileConstants.SNAPPY_CODEC, false, -1),
BZIP2(DataFileConstants.BZIP2_CODEC, false, -1),
XZ(DataFileConstants.XZ_CODEC, true, CodecFactory.DEFAULT_XZ_LEVEL),
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL);
UNCOMPRESSED(DataFileConstants.NULL_CODEC, false),
DEFLATE(DataFileConstants.DEFLATE_CODEC, true),
SNAPPY(DataFileConstants.SNAPPY_CODEC, false),
BZIP2(DataFileConstants.BZIP2_CODEC, false),
XZ(DataFileConstants.XZ_CODEC, true),
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true);

private final String codecName;
private final boolean supportCompressionLevel;
private final int defaultCompressionLevel;

AvroCompressionCodec(
String codecName,
boolean supportCompressionLevel, int defaultCompressionLevel) {
boolean supportCompressionLevel) {
this.codecName = codecName;
this.supportCompressionLevel = supportCompressionLevel;
this.defaultCompressionLevel = defaultCompressionLevel;
}

public String getCodecName() {
Expand All @@ -55,10 +53,6 @@ public boolean getSupportCompressionLevel() {
return this.supportCompressionLevel;
}

public int getDefaultCompressionLevel() {
return this.defaultCompressionLevel;
}

private static final Map<String, String> codecNameMap =
Arrays.stream(AvroCompressionCodec.values()).collect(
Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));
Expand Down
24 changes: 13 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,20 @@ private[sql] object AvroUtils extends Logging {
jobConf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
jobConf.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
if (compressed.getSupportCompressionLevel) {
val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
compressed.getDefaultCompressionLevel.toString)
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec " +
log"at level ${MDC(CODEC_LEVEL, level)}")
val s = if (compressed == ZSTANDARD) {
val bufferPoolEnabled = sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, bufferPoolEnabled)
"zstd"
} else {
codecName
val levelAndCodecName = compressed match {
case DEFLATE => Some(sqlConf.getConf(SQLConf.AVRO_DEFLATE_LEVEL), codecName)
case XZ => Some(sqlConf.getConf(SQLConf.AVRO_XZ_LEVEL), codecName)
case ZSTANDARD =>
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY,
sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED))
Some(sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_LEVEL), "zstd")
case _ => None
}
levelAndCodecName.foreach { case (level, mapredCodecName) =>
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} " +
log"codec at level ${MDC(CODEC_LEVEL, level)}")
jobConf.setInt(s"avro.mapred.$mapredCodecName.level", level.toInt)
}
jobConf.setInt(s"avro.mapred.$s.level", level.toInt)
} else {
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec")
}
Expand Down