Skip to content

Commit

Permalink
Add extended JDBC options in output configuration (#434)
Browse files Browse the repository at this point in the history
Co-authored-by: raphael.luta <[email protected]>
Co-authored-by: Amir Halatzi <[email protected]>
  • Loading branch information
3 people authored Jun 28, 2021
1 parent c5c8fcb commit aebd7e3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
7 changes: 6 additions & 1 deletion schemas/job_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@
"connectionUrl": { "type": "string" },
"user": { "type": "string" },
"password": { "type": "string" },
"driver": { "type": "string" }
"driver": { "type": "string" },
"sessionInitStatement": { "type": "string" },
"truncate": { "type": "string" },
"cascadeTruncate": { "type": "string" },
"createTableOptions": { "type": "string" },
"createTableColumnTypes": { "type": "string" }
},
"required": [
"connectionUrl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ package com.yotpo.metorikku.configuration.job.output
case class JDBC(connectionUrl: String,
user: String,
password: String,
driver: String
driver: String,
sessionInitStatement: Option[String],
truncate: Option[String],
cascadeTruncate: Option[String],
createTableOptions: Option[String],
createTableColumnTypes: Option[String]
) {
require(Option(connectionUrl).isDefined, "JDBC connection: connection url is mandatory")
require(Option(user).isDefined, "JDBC connection: user is mandatory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ class JDBCOutputWriter(props: Map[String, String], jdbcConf: Option[JDBC]) exten
connectionProperties.put("user", jdbcConf.user)
connectionProperties.put("password", jdbcConf.password)
connectionProperties.put("driver", jdbcConf.driver)
if (jdbcConf.truncate.isDefined) {
connectionProperties.put("truncate", jdbcConf.truncate.get)
}
if (jdbcConf.cascadeTruncate.isDefined) {
connectionProperties.put("cascadeTruncate", jdbcConf.cascadeTruncate.get)
}
if (jdbcConf.createTableColumnTypes.isDefined) {
connectionProperties.put("createTableColumnTypes", jdbcConf.createTableColumnTypes.get)
}
if (jdbcConf.createTableOptions.isDefined) {
connectionProperties.put("createTableOptions", jdbcConf.createTableOptions.get)
}
if (jdbcConf.sessionInitStatement.isDefined) {
connectionProperties.put("sessionInitStatement", jdbcConf.sessionInitStatement.get)
}
var df = dataFrame
val writer = df.write.format(jdbcConf.driver)
.mode(dbOptions.saveMode)
Expand Down

0 comments on commit aebd7e3

Please sign in to comment.