Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] SparkSession settings for autoCompact and optimizeWrite do not take effect #3967

Open
2 of 8 tasks
anilporiya opened this issue Dec 12, 2024 · 0 comments
Open
2 of 8 tasks
Labels
bug Something isn't working

Comments

@anilporiya
Copy link

anilporiya commented Dec 12, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

When creating a delta table using spark 3.5.3 and delta 3.2.1, the spark session setting mentioned in https://docs.delta.io/latest/optimizations-oss.html#auto-compaction for enabling auto compaction and optimized write doesn't take effect.

More specifically, the properties tried here
spark.databricks.delta.autoCompact.enabled and spark.databricks.delta.optimizeWrite.enabled do not take effect.

Other properties were not tried/tested.

However, using spark.databricks.delta.properties.defaults.autoOptimize.autoCompact and spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite properties work.

just curious, should these properties be exposed with spark.delta prefix instead of spark.databricks.delta?

Steps to reproduce

  1. download spark 3.5.3, place the delta 3.2.1 jars under the jars folder.
(py39) I502458@C44304F57Y spark-3.5.3-bin-hadoop3 % ls -l jars | grep delta
-rw-r--r--@ 1 I502458  staff   6122814 Oct 10 12:51 delta-spark_2.12-3.2.1.jar
-rw-r--r--@ 1 I502458  staff     24948 Oct 10 12:52 delta-storage-3.2.1.jar
  1. run a pyspark shell with necessary spark configurations [including enabling auto-compaction and optimized write]
  2. create a delta table
  3. show table properties

Observed results

  1. Using spark.databricks.delta.autoCompact.enabled=true and spark.databricks.delta.optimizeWrite.enabled=true when invoking pyspark shell

./bin/pyspark --verbose --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf spark.databricks.delta.autoCompact.enabled="true" --conf spark.databricks.delta.autoCompact.minNumFiles="10" --conf spark.databricks.delta.optimizeWrite.enabled="true"

(py39) I502458@C44304F57Y spark-3.5.3-bin-hadoop3 % ./bin/pyspark --verbose --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf spark.databricks.delta.autoCompact.enabled="true" --conf spark.databricks.delta.autoCompact.minNumFiles="10" --conf spark.databricks.delta.optimizeWrite.enabled="true"
Python 3.9.16 (main, Jan 11 2023, 10:02:19)
[Clang 14.0.6 ] :: Anaconda, Inc. on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using properties file: null
Parsed arguments:
  master                  local[*]
  remote                  null
  deployMode              null
  executorMemory          null
  executorCores           null
  totalExecutorCores      null
  propertiesFile          null
  driverMemory            null
  driverCores             null
  driverExtraClassPath    null
  driverExtraLibraryPath  null
  driverExtraJavaOptions  null
  supervise               false
  queue                   null
  numExecutors            null
  files                   null
  pyFiles                 null
  archives                null
  mainClass               null
  primaryResource         pyspark-shell
  name                    PySparkShell
  childArgs               []
  jars                    null
  packages                null
  packagesExclusions      null
  repositories            null
  verbose                 true

Spark properties used, including those specified through
 --conf and those from the properties file null:
  (spark.databricks.delta.autoCompact.enabled,true)
  (spark.databricks.delta.autoCompact.minNumFiles,10)
  (spark.databricks.delta.optimizeWrite.enabled,true)
  (spark.sql.catalog.spark_catalog,org.apache.spark.sql.delta.catalog.DeltaCatalog)
  (spark.sql.extensions,io.delta.sql.DeltaSparkSessionExtension)


Main class:
org.apache.spark.api.python.PythonGatewayServer
Arguments:

Spark config:
(spark.app.name,PySparkShell)
(spark.app.submitTime,1733991708902)
(spark.databricks.delta.autoCompact.enabled,true)
(spark.databricks.delta.autoCompact.minNumFiles,10)
(spark.databricks.delta.optimizeWrite.enabled,true)
(spark.master,local[*])
(spark.sql.catalog.spark_catalog,org.apache.spark.sql.delta.catalog.DeltaCatalog)
(spark.sql.extensions,io.delta.sql.DeltaSparkSessionExtension)
(spark.submit.deployMode,client)
(spark.submit.pyFiles,)
(spark.ui.showConsoleProgress,true)
Classpath elements:



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/12 00:21:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.3
      /_/

Using Python version 3.9.16 (main, Jan 11 2023 10:02:19)
Spark context Web UI available at http://192.168.50.204:4040
Spark context available as 'sc' (master = local[*], app id = local-1733991711012).
SparkSession available as 'spark'.
>>> columns = ["Name", "Age"]
>>> data = [("ABC",25),("xyz",27),("zaq",31)]
>>> rdd=spark.sparkContext.parallelize(data)
>>> df=rdd.toDF(columns)
>>> df.write.format("delta").saveAsTable("MyFirstDeltaTable")
24/12/12 00:24:06 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/12/12 00:24:06 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/12/12 00:24:11 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/12/12 00:24:11 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore [email protected]
24/12/12 00:24:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/12/12 00:24:36 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`default`.`myfirstdeltatable` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
24/12/12 00:24:36 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/12/12 00:24:36 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/12/12 00:24:36 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/12/12 00:24:36 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
>>> from delta import *
>>> DeltaTable.isDeltaTable(spark,"spark-warehouse/myfirstdeltatable/")
True
>>> spark.sql("SHOW TBLPROPERTIES myfirstdeltatable").show(truncate=False)
+----------------------+-----+
|key                   |value|
+----------------------+-----+
|delta.minReaderVersion|1    |
|delta.minWriterVersion|2    |
+----------------------+-----+
>>> exit()
  1. Using spark.databricks.delta.properties.defaults.autoOptimize.autoCompact=true and spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite=true when invoking pyspark shell

./bin/pyspark --verbose --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf spark.databricks.delta.properties.defaults.autoOptimize.autoCompact="true" --conf spark.databricks.delta.autoCompact.minNumFiles="10" --conf spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite="true"

(py39) I502458@C44304F57Y spark-3.5.3-bin-hadoop3 % ./bin/pyspark --verbose --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf spark.databricks.delta.properties.defaults.autoOptimize.autoCompact="true" --conf spark.databricks.delta.autoCompact.minNumFiles="10" --conf spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite="true"
Python 3.9.16 (main, Jan 11 2023, 10:02:19)
[Clang 14.0.6 ] :: Anaconda, Inc. on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using properties file: null
Parsed arguments:
  master                  local[*]
  remote                  null
  deployMode              null
  executorMemory          null
  executorCores           null
  totalExecutorCores      null
  propertiesFile          null
  driverMemory            null
  driverCores             null
  driverExtraClassPath    null
  driverExtraLibraryPath  null
  driverExtraJavaOptions  null
  supervise               false
  queue                   null
  numExecutors            null
  files                   null
  pyFiles                 null
  archives                null
  mainClass               null
  primaryResource         pyspark-shell
  name                    PySparkShell
  childArgs               []
  jars                    null
  packages                null
  packagesExclusions      null
  repositories            null
  verbose                 true

Spark properties used, including those specified through
 --conf and those from the properties file null:
  (spark.databricks.delta.autoCompact.minNumFiles,10)
  (spark.databricks.delta.properties.defaults.autoOptimize.autoCompact,true)
  (spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite,true)
  (spark.sql.catalog.spark_catalog,org.apache.spark.sql.delta.catalog.DeltaCatalog)
  (spark.sql.extensions,io.delta.sql.DeltaSparkSessionExtension)


Main class:
org.apache.spark.api.python.PythonGatewayServer
Arguments:

Spark config:
(spark.app.name,PySparkShell)
(spark.app.submitTime,1733992215035)
(spark.databricks.delta.autoCompact.minNumFiles,10)
(spark.databricks.delta.properties.defaults.autoOptimize.autoCompact,true)
(spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite,true)
(spark.master,local[*])
(spark.sql.catalog.spark_catalog,org.apache.spark.sql.delta.catalog.DeltaCatalog)
(spark.sql.extensions,io.delta.sql.DeltaSparkSessionExtension)
(spark.submit.deployMode,client)
(spark.submit.pyFiles,)
(spark.ui.showConsoleProgress,true)
Classpath elements:



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/12 00:30:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.3
      /_/

Using Python version 3.9.16 (main, Jan 11 2023 10:02:19)
Spark context Web UI available at http://192.168.50.204:4040
Spark context available as 'sc' (master = local[*], app id = local-1733992217224).
SparkSession available as 'spark'.
>>> columns = ["Name", "Age"]
>>> data = [("ABC",25),("xyz",27),("zaq",31)]
>>> rdd=spark.sparkContext.parallelize(data)
>>> df=rdd.toDF(columns)
>>> df.write.format("delta").saveAsTable("MySecondDeltaTable")
24/12/12 00:31:50 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/12/12 00:31:50 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/12/12 00:31:54 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/12/12 00:31:54 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore [email protected]
24/12/12 00:32:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/12/12 00:32:17 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`default`.`myseconddeltatable` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
24/12/12 00:32:18 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/12/12 00:32:18 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/12/12 00:32:18 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/12/12 00:32:18 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
>>> from delta import *
>>> DeltaTable.isDeltaTable(spark,"spark-warehouse/myseconddeltatable/")
True
>>> spark.sql("SHOW TBLPROPERTIES myseconddeltatable").show(truncate=False)
+--------------------------------+-----+
|key                             |value|
+--------------------------------+-----+
|delta.autoOptimize.autoCompact  |true |
|delta.autoOptimize.optimizeWrite|true |
|delta.minReaderVersion          |1    |
|delta.minWriterVersion          |2    |
+--------------------------------+-----+
>>> exit()

Expected results

As per the documentation https://docs.delta.io/latest/optimizations-oss.html#auto-compaction, using using "spark.databricks.delta.autoCompact.enabled=true" and "spark.databricks.delta.optimizeWrite.enabled=true" should have enabled these properties on the delta table

Further details

Environment information

  • Delta Lake version: 3.2.1
  • Spark version: 3.5.3
  • Scala version:

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@anilporiya anilporiya added the bug Something isn't working label Dec 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant