From 1e73b7cb9ca005d5171896149123c54948e68bed Mon Sep 17 00:00:00 2001 From: Mahdi Malverdi Date: Sat, 21 Dec 2024 15:18:49 +0330 Subject: [PATCH 1/3] Support ClickHouse insert settings for table writes (#369) - Enable custom ClickHouse insert settings when writing to tables. - Add support for `spark.clickhouse.write.settings.*` configuration. - Ensure settings are applied to the `INSERT INTO ... SETTINGS ...` SQL command. - Update documentation to describe usage of write settings. - Add unit tests to validate behavior. Closes #369 --- docs/configurations/02_sql_configurations.md | 1 + .../spark/write/ClickHouseWriter.scala | 11 ++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 11 ++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ 10 files changed, 70 insertions(+), 3 deletions(-) diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index bbd3d4ef..85b26718 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0 spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0 spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0 +spark.clickhouse.write.settings||Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 13953a2a..675d552d 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -220,12 +220,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) + match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index bedd827c..2abb4508 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -246,12 +246,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) + match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 6f9b267b..e0c8a622 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } From d6b98d243b6f0340a14395e386c8ae315ffc3686 Mon Sep 17 00:00:00 2001 From: Mahdi Malverdi Date: Sat, 21 Dec 2024 15:18:49 +0330 Subject: [PATCH 2/3] Support ClickHouse insert settings for table writes (#369) - Enable custom ClickHouse insert settings when writing to tables. - Add support for `spark.clickhouse.write.settings` configuration. - Update documentation to describe usage of write settings. Closes #369 --- docs/configurations/02_sql_configurations.md | 1 + .../spark/write/ClickHouseWriter.scala | 11 ++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 11 ++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ 10 files changed, 70 insertions(+), 3 deletions(-) diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index bbd3d4ef..85b26718 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0 spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0 spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0 +spark.clickhouse.write.settings||Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 13953a2a..675d552d 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -220,12 +220,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) + match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index bedd827c..2abb4508 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -246,12 +246,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) + match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 6f9b267b..e0c8a622 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } From f1c469598571c4a575a510d87a6579dcba2db4d3 Mon Sep 17 00:00:00 2001 From: Mahdi Malverdi Date: Sat, 21 Dec 2024 16:04:46 +0330 Subject: [PATCH 3/3] reformat the codes Closes #369 --- .../com/clickhouse/spark/write/ClickHouseWriter.scala | 10 ++++++++-- .../com/clickhouse/spark/write/ClickHouseWriter.scala | 10 ++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 675d552d..6383c1f1 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -233,8 +233,14 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) - match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 2abb4508..c3d7d106 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -259,8 +259,14 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings) - match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime)