Skip to content

Commit

Permalink
Support ClickHouse insert settings for table writes (ClickHouse#369)
Browse files Browse the repository at this point in the history
- Enable custom ClickHouse insert settings when writing to tables.
- Add support for `spark.clickhouse.write.settings` configuration.
- Update documentation to describe usage of write settings.

Closes ClickHouse#369
  • Loading branch information
Mahdi Malverdi authored and mahdimalverdi committed Dec 21, 2024
1 parent 803e156 commit a184b20
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/configurations/02_sql_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio
spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0
spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0
spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0
spark.clickhouse.write.settings|<undefined>|Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings)
match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,21 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data), settings)
match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(
database,
table,
format,
codec,
new ByteArrayInputStream(data),
settings
) match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}

0 comments on commit a184b20

Please sign in to comment.