From a184b200aa561a467b027c5a3d996fe77ebf0051 Mon Sep 17 00:00:00 2001 From: Mahdi Malverdi Date: Sat, 21 Dec 2024 15:18:49 +0330 Subject: [PATCH] Support ClickHouse insert settings for table writes (#369) - Enable custom ClickHouse insert settings when writing to tables. - Add support for `spark.clickhouse.write.settings` configuration. - Update documentation to describe usage of write settings. Closes #369 --- docs/configurations/02_sql_configurations.md | 1 + .../spark/write/ClickHouseWriter.scala | 11 ++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 11 ++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ 10 files changed, 70 insertions(+), 3 deletions(-) diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index bbd3d4ef..85b26718 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0 spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0 spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0 +spark.clickhouse.write.settings||Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 13953a2a..675d552d 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -220,12 +220,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) + match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index bedd827c..2abb4508 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -246,12 +246,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) + match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 6f9b267b..e0c8a622 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) }