From 46e4f577870a2d1a52ad1f2a0105caf327ba0c53 Mon Sep 17 00:00:00 2001 From: mweldon Date: Sat, 27 Jun 2020 23:44:43 -0700 Subject: [PATCH 01/31] add batchSize parameter for controlling bulk upload batch size. defaults to 5000 --- .../springml/spark/salesforce/DefaultSource.scala | 15 +++++++++++++-- .../spark/salesforce/SFObjectWriter.scala | 8 ++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala index fc077a6..b65f034 100644 --- a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala +++ b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} /** * Default source for Salesforce wave data source. @@ -123,6 +124,15 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val encodeFields = parameters.get("encodeFields") val monitorJob = parameters.getOrElse("monitorJob", "false") val externalIdFieldName = parameters.getOrElse("externalIdFieldName", "Id") + val batchSizeStr = parameters.getOrElse("batchSize", "5000") + val batchSize = Try(batchSizeStr.toInt) match { + case Success(v)=> v + case Failure(e)=> { + val errorMsg = "batchSize parameter not an integer." + logger.error(errorMsg) + throw new Exception(errorMsg) + } + } validateMutualExclusive(datasetName, sfObject, "datasetName", "sfObject") @@ -141,7 +151,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr } else { logger.info("Updating Salesforce Object") updateSalesforceObject(username, password, login, version, sfObject.get, mode, - flag(upsert, "upsert"), externalIdFieldName, data) + flag(upsert, "upsert"), externalIdFieldName, batchSize, data) } return createReturnRelation(data) @@ -156,6 +166,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr mode: SaveMode, upsert: Boolean, externalIdFieldName: String, + batchSize: Integer, data: DataFrame) { val csvHeader = Utils.csvHeadder(data.schema) @@ -164,7 +175,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val repartitionedRDD = Utils.repartition(data.rdd) logger.info("no of partitions after repartitioning is " + repartitionedRDD.partitions.length) - val writer = new SFObjectWriter(username, password, login, version, sfObject, mode, upsert, externalIdFieldName, csvHeader) + val writer = new SFObjectWriter(username, password, login, version, sfObject, mode, upsert, externalIdFieldName, csvHeader, batchSize) logger.info("Writing data") val successfulWrite = writer.writeData(repartitionedRDD) logger.info(s"Writing data was successful was $successfulWrite") diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala index 1856bbb..4114f20 100644 --- a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala @@ -23,7 +23,8 @@ class SFObjectWriter ( val mode: SaveMode, val upsert: Boolean, val externalIdFieldName: String, - val csvHeader: String + val csvHeader: String, + val batchSize: Integer ) extends Serializable { @transient val logger = Logger.getLogger(classOf[SFObjectWriter]) @@ -31,12 +32,15 @@ class SFObjectWriter ( def writeData(rdd: RDD[Row]): Boolean = { val csvRDD = rdd.map(row => row.toSeq.map(value => Utils.rowValue(value)).mkString(",")) + val partitionCnt = (1 + csvRDD.count() / batchSize).toInt + val partitionedRDD = csvRDD.repartition(partitionCnt) + val jobInfo = new JobInfo(WaveAPIConstants.STR_CSV, sfObject, operation(mode, upsert)) jobInfo.setExternalIdFieldName(externalIdFieldName) val jobId = bulkAPI.createJob(jobInfo).getId - csvRDD.mapPartitionsWithIndex { + partitionedRDD.mapPartitionsWithIndex { case (index, iterator) => { val records = iterator.toArray.mkString("\n") var batchInfoId : String = null From 144f9f9a696513551d4472720fc3fa3bd1ea2dfb Mon Sep 17 00:00:00 2001 From: mweldon Date: Sat, 27 Jun 2020 23:47:27 -0700 Subject: [PATCH 02/31] update readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 9d24698..bf65d19 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,8 @@ $ bin/spark-shell --packages com.springml:spark-salesforce_2.11:1.1.3 * `timeout`: (Optional) The maximum time spent polling for the completion of bulk query job. This option can only be used when `bulk` is `true`. * `externalIdFieldName`: (Optional) The name of the field used as the external ID for Salesforce Object. This value is only used when doing an update or upsert. Default "Id". * `queryAll`: (Optional) Toggle to retrieve deleted and archived records for SOQL queries. Default value is `false`. +### Options only supported for fetching Salesforce Objects. +* `batchSize`: (Optional) maximum number of records per batch when performing updates. Defaults to 5000 (note that batches greater than 10000 will result in a error) ### Scala API From bd21e6a4f91fdd3fb5acc88cb27b6453a0b5ee26 Mon Sep 17 00:00:00 2001 From: mweldon Date: Sat, 11 Jul 2020 17:33:47 -0700 Subject: [PATCH 03/31] use updated wave-api dependency --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index e6f5f62..25c15c6 100644 --- a/build.sbt +++ b/build.sbt @@ -7,11 +7,12 @@ organization := "com.springml" scalaVersion := "2.11.8" resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" +resolvers += "jitpack" at "https://jitpack.io" libraryDependencies ++= Seq( "com.force.api" % "force-wsc" % "40.0.0", "com.force.api" % "force-partner-api" % "40.0.0", - "com.springml" % "salesforce-wave-api" % "1.0.10", + "com.github.loanpal-engineering" % "salesforce-wave-api" % "c9da246", "org.mockito" % "mockito-core" % "2.0.31-beta" ) From 707921d92e1cc8e5dd9e1192794eb957a466a8ee Mon Sep 17 00:00:00 2001 From: mweldon Date: Mon, 13 Jul 2020 20:55:49 -0700 Subject: [PATCH 04/31] null instead of empty for null objects --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 25c15c6..6bef940 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ resolvers += "jitpack" at "https://jitpack.io" libraryDependencies ++= Seq( "com.force.api" % "force-wsc" % "40.0.0", "com.force.api" % "force-partner-api" % "40.0.0", - "com.github.loanpal-engineering" % "salesforce-wave-api" % "c9da246", + "com.github.loanpal-engineering" % "salesforce-wave-api" % "329ceb4", "org.mockito" % "mockito-core" % "2.0.31-beta" ) From af32243567d2228bedace72a7bf0f657b3ddabf3 Mon Sep 17 00:00:00 2001 From: mweldon Date: Mon, 13 Jul 2020 21:10:45 -0700 Subject: [PATCH 05/31] cast null as null --- .../scala/com/springml/spark/salesforce/DatasetRelation.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala index 4dc46d0..5e71f8d 100644 --- a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala @@ -116,7 +116,9 @@ case class DatasetRelation( private def cast(fieldValue: String, toType: DataType, nullable: Boolean = true, fieldName: String): Any = { - if (fieldValue == "" && nullable && !toType.isInstanceOf[StringType]) { + if (fieldValue == null) + null + else if (fieldValue == "" && nullable && !toType.isInstanceOf[StringType]) { null } else { toType match { From 6b4fb6d94c35162ef8a34362e384e484be17358f Mon Sep 17 00:00:00 2001 From: mweldon Date: Mon, 13 Jul 2020 21:53:27 -0700 Subject: [PATCH 06/31] use master --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 6bef940..275ff7a 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ resolvers += "jitpack" at "https://jitpack.io" libraryDependencies ++= Seq( "com.force.api" % "force-wsc" % "40.0.0", "com.force.api" % "force-partner-api" % "40.0.0", - "com.github.loanpal-engineering" % "salesforce-wave-api" % "329ceb4", + "com.github.loanpal-engineering" % "salesforce-wave-api" % "eb71436", "org.mockito" % "mockito-core" % "2.0.31-beta" ) From 131c4f3d4332f61370757ebd238dd28603cb86bf Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Thu, 1 Oct 2020 14:22:15 -0400 Subject: [PATCH 07/31] spark 3.0/scala 2.12 compatibility --- pom.xml | 148 ++++++++++++++++++ .../spark/salesforce/DataWriter.scala | 29 ++-- .../com/springml/spark/salesforce/Utils.scala | 62 ++++---- 3 files changed, 194 insertions(+), 45 deletions(-) create mode 100644 pom.xml diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..5789921 --- /dev/null +++ b/pom.xml @@ -0,0 +1,148 @@ + + + 4.0.0 + com.springml + spark-salesforce_2.12 + jar + spark-salesforce + 1.1.3 + spark-salesforce + + com.springml + + https://github.com/springml/spark-salesforce + + + Apache License, Verision 2.0 + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + scm:git:github.com/springml/spark-salesforce + scm:git:git@github.com:springml/spark-salesforce + github.com/springml/spark-salesforce + + + 1.8 + 1.8 + UTF-8 + 3.0.0 + 2.12.11 + 2.12 + + + + springml + Springml + http://www.springml.com + + + + + org.scala-lang + scala-library + ${scala.version} + + + org.apache.spark + spark-sql_${scala.compat.version} + ${spark.version} + provided + + + com.force.api + force-wsc + 40.0.0 + + + com.force.api + force-partner-api + 40.0.0 + + + com.springml + salesforce-wave-api + 1.0.8-loanpal + + + org.mockito + mockito-core + 2.0.31-beta + + + org.scalatest + scalatest_2.12 + 3.0.1 + test + + + com.madhukaraphatak + java-sizeof_2.12 + 0.1 + + + + + + + + org.codehaus.woodstox + woodstox-core-asl + 4.4.0 + + + + src/main/scala + + + + net.alchim31.maven + scala-maven-plugin + 3.4.6 + + + + compile + testCompile + + + + -Xss16m + -Xms1028m + -Xmx4096m + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + uber-${project.artifactId}-${project.version} + + + + + \ No newline at end of file diff --git a/src/main/scala/com/springml/spark/salesforce/DataWriter.scala b/src/main/scala/com/springml/spark/salesforce/DataWriter.scala index 2565dad..33354ec 100644 --- a/src/main/scala/com/springml/spark/salesforce/DataWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/DataWriter.scala @@ -28,18 +28,18 @@ import org.apache.spark.sql.SaveMode * It uses Partner External Metadata SOAP API to write the dataset */ class DataWriter ( - val userName: String, - val password: String, - val login: String, - val version: String, - val datasetName: String, - val appName: String - ) extends Serializable { + val userName: String, + val password: String, + val login: String, + val version: String, + val datasetName: String, + val appName: String + ) extends Serializable { @transient val logger = Logger.getLogger(classOf[DataWriter]) def writeMetadata(metaDataJson: String, - mode: SaveMode, - upsert: Boolean): Option[String] = { + mode: SaveMode, + upsert: Boolean): Option[String] = { val partnerConnection = createConnection(userName, password, login, version) val oper = operation(mode, upsert) @@ -62,12 +62,12 @@ class DataWriter ( Some(saveResult.getId) } else { logger.error("failed to write metadata") - println("******************************************************************") - println("failed to write metadata") + logger.error("******************************************************************") + logger.error("failed to write metadata") logSaveResultError(saveResult) - println("******************************************************************") - println(metaDataJson) - println("******************************************************************") + logger.error("******************************************************************") + logger.error(metaDataJson) + logger.error("******************************************************************") None } }).head @@ -128,7 +128,6 @@ class DataWriter ( saved } - private def operation(mode: SaveMode, upsert: Boolean): String = { if (upsert) { logger.warn("Ignoring SaveMode as upsert set to true") diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index e3f0363..a944054 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -18,13 +18,13 @@ package com.springml.spark.salesforce import scala.io.Source import scala.util.parsing.json._ -import com.sforce.soap.partner.{Connector, PartnerConnection, SaveResult} +import com.sforce.soap.partner.{ Connector, PartnerConnection, SaveResult } import com.sforce.ws.ConnectorConfig import com.madhukaraphatak.sizeof.SizeEstimator import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} +import org.apache.spark.sql.types.{ DoubleType, IntegerType, StructType } import scala.collection.immutable.HashMap import com.springml.spark.salesforce.metadata.MetadataConstructor @@ -42,7 +42,7 @@ object Utils extends Serializable { @transient val logger = Logger.getLogger("Utils") def createConnection(username: String, password: String, - login: String, version: String):PartnerConnection = { + login: String, version: String): PartnerConnection = { val config = new ConnectorConfig() config.setUsername(username) config.setPassword(password) @@ -58,7 +58,9 @@ object Utils extends Serializable { logger.error(error.getMessage) println(error.getMessage) error.getFields.map(logger.error(_)) - error.getFields.map { println } + error.getFields.map { + println + } }) } @@ -92,22 +94,22 @@ object Utils extends Serializable { totalSize } - def rddSize(rdd: RDD[Row]) : Long = { + def rddSize(rdd: RDD[Row]): Long = { rowSize(rdd.collect()) } - def rowSize(rows: Array[Row]) : Long = { - var sizeOfRows = 0l - for (row <- rows) { - // Converting to bytes - val rowSize = SizeEstimator.estimate(row.toSeq.map { value => rowValue(value) }.mkString(",")) - sizeOfRows += rowSize - } + def rowSize(rows: Array[Row]): Long = { + var sizeOfRows = 0l + for (row <- rows) { + // Converting to bytes + val rowSize = SizeEstimator.estimate(row.toSeq.map { value => rowValue(value) }.mkString(",")) + sizeOfRows += rowSize + } - sizeOfRows + sizeOfRows } - def rowValue(rowVal: Any) : String = { + def rowValue(rowVal: Any): String = { if (rowVal == null) { "" } else { @@ -132,15 +134,15 @@ object Utils extends Serializable { systemMetadataConfig } - def csvHeadder(schema: StructType) : String = { + def csvHeadder(schema: StructType): String = { schema.fields.map(field => field.name).mkString(",") } def metadata( - metadataFile: Option[String], - usersMetadataConfig: Option[String], - schema: StructType, - datasetName: String) : String = { + metadataFile: Option[String], + usersMetadataConfig: Option[String], + schema: StructType, + datasetName: String): String = { if (metadataFile != null && metadataFile.isDefined) { logger.info("Using provided Metadata Configuration") @@ -155,8 +157,8 @@ object Utils extends Serializable { } def monitorJob(objId: String, username: String, password: - String, login: String, version: String) : Boolean = { - var partnerConnection = Utils.createConnection(username, password, login, version) + String, login: String, version: String): Boolean = { + val partnerConnection = Utils.createConnection(username, password, login, version) try { monitorJob(partnerConnection, objId, 500) } catch { @@ -165,7 +167,7 @@ object Utils extends Serializable { logger.info("Error Message from Salesforce Wave " + exMsg) if (exMsg contains "Invalid Session") { logger.info("Session expired. Monitoring Job using new connection") - return monitorJob(objId, username, password, login, version) + monitorJob(objId, username, password, login, version) } else { throw uefault } @@ -180,10 +182,10 @@ object Utils extends Serializable { } def retryWithExponentialBackoff( - func:() => Boolean, - timeoutDuration: FiniteDuration, - initSleepInterval: FiniteDuration, - maxSleepInterval: FiniteDuration): Boolean = { + func: () => Boolean, + timeoutDuration: FiniteDuration, + initSleepInterval: FiniteDuration, + maxSleepInterval: FiniteDuration): Boolean = { val timeout = timeoutDuration.toMillis var waited = 0L @@ -208,7 +210,7 @@ object Utils extends Serializable { } private def monitorJob(connection: PartnerConnection, - objId: String, waitDuration: Long) : Boolean = { + objId: String, waitDuration: Long): Boolean = { val sobjects = connection.retrieve("Status", "InsightsExternalData", Array(objId)) if (sobjects != null && sobjects.length > 0) { val status = sobjects(0).getField("Status") @@ -255,20 +257,20 @@ object Utils extends Serializable { } } - private def maxWaitSeconds(waitDuration: Long) : Long = { + private def maxWaitSeconds(waitDuration: Long): Long = { // 2 Minutes val maxWaitDuration = 120000 if (waitDuration >= maxWaitDuration) maxWaitDuration else waitDuration * 2 } - private def readMetadataConfig() : Map[String, Map[String, String]] = { + private def readMetadataConfig(): Map[String, Map[String, String]] = { val source = Source.fromURL(getClass.getResource("/metadata_config.json")) val jsonContent = try source.mkString finally source.close() readJSON(jsonContent) } - private def readJSON(jsonContent : String) : Map[String, Map[String, String]]= { + private def readJSON(jsonContent: String): Map[String, Map[String, String]] = { val result = JSON.parseFull(jsonContent) val resMap: Map[String, Map[String, String]] = result.get.asInstanceOf[Map[String, Map[String, String]]] resMap From 67ec60a1b1dca638942baa9c707b2922f6c3777d Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Thu, 8 Oct 2020 13:11:19 -0400 Subject: [PATCH 08/31] change to jitpack --- pom.xml | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index 5789921..9bb7e22 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,12 @@ scm:git:git@github.com:springml/spark-salesforce github.com/springml/spark-salesforce + + + jitpack.io + https://jitpack.io + + 1.8 1.8 @@ -62,9 +68,9 @@ 40.0.0 - com.springml + com.github.loanpal-engineering salesforce-wave-api - 1.0.8-loanpal + 1d662ac org.mockito @@ -82,11 +88,6 @@ java-sizeof_2.12 0.1 - - - - - org.codehaus.woodstox woodstox-core-asl From 32effa7d6ac77cb5ab794a297e902cbc89ee80b4 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Thu, 8 Oct 2020 16:20:43 -0400 Subject: [PATCH 09/31] swap out spark version --- pom.xml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 9bb7e22..619d735 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"> 4.0.0 com.springml - spark-salesforce_2.12 + spark-salesforce jar spark-salesforce 1.1.3 @@ -84,9 +84,9 @@ test - com.madhukaraphatak - java-sizeof_2.12 - 0.1 + com.github.loanpal-engineering + java-sizeof + 6d786c8 org.codehaus.woodstox From 8f7264305273d021980a634213d3ffa783dbd06d Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Mon, 1 Feb 2021 18:04:03 -0500 Subject: [PATCH 10/31] use salesforce magic null string value --- .../spark/salesforce/DatasetRelation.scala | 68 +++++++++---------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala index 5e71f8d..4e19c63 100644 --- a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala @@ -21,26 +21,26 @@ import scala.collection.JavaConversions.{asScalaBuffer, mapAsScalaMap} * Relation class for reading data from Salesforce and construct RDD */ case class DatasetRelation( - waveAPI: WaveAPI, - forceAPI: ForceAPI, - query: String, - userSchema: StructType, - sqlContext: SQLContext, - resultVariable: Option[String], - pageSize: Int, - sampleSize: Int, - encodeFields: Option[String], - inferSchema: Boolean, - replaceDatasetNameWithId: Boolean, - sdf: SimpleDateFormat, - queryAll: Boolean) extends BaseRelation with TableScan { + waveAPI: WaveAPI, + forceAPI: ForceAPI, + query: String, + userSchema: StructType, + sqlContext: SQLContext, + resultVariable: Option[String], + pageSize: Int, + sampleSize: Int, + encodeFields: Option[String], + inferSchema: Boolean, + replaceDatasetNameWithId: Boolean, + sdf: SimpleDateFormat, + queryAll: Boolean) extends BaseRelation with TableScan { private val logger = Logger.getLogger(classOf[DatasetRelation]) val records = read() def read(): java.util.List[java.util.Map[String, String]] = { - var records: java.util.List[java.util.Map[String, String]]= null + var records: java.util.List[java.util.Map[String, String]] = null // Query getting executed here if (waveAPI != null) { records = queryWave() @@ -52,7 +52,7 @@ case class DatasetRelation( } private def queryWave(): java.util.List[java.util.Map[String, String]] = { - var records: java.util.List[java.util.Map[String, String]]= null + var records: java.util.List[java.util.Map[String, String]] = null var saql = query if (replaceDatasetNameWithId) { @@ -77,7 +77,7 @@ case class DatasetRelation( records } - def replaceDatasetNameWithId(query : String, startIndex : Integer) : String = { + def replaceDatasetNameWithId(query: String, startIndex: Integer): String = { var modQuery = query logger.debug("start Index : " + startIndex) @@ -101,25 +101,23 @@ case class DatasetRelation( } private def querySF(): java.util.List[java.util.Map[String, String]] = { - var records: java.util.List[java.util.Map[String, String]]= null + var records: java.util.List[java.util.Map[String, String]] = null - var resultSet = forceAPI.query(query, queryAll) - records = resultSet.filterRecords() + var resultSet = forceAPI.query(query, queryAll) + records = resultSet.filterRecords() - while (!resultSet.isDone()) { - resultSet = forceAPI.queryMore(resultSet) - records.addAll(resultSet.filterRecords()) - } + while (!resultSet.isDone()) { + resultSet = forceAPI.queryMore(resultSet) + records.addAll(resultSet.filterRecords()) + } - return records + return records } private def cast(fieldValue: String, toType: DataType, - nullable: Boolean = true, fieldName: String): Any = { - if (fieldValue == null) - null - else if (fieldValue == "" && nullable && !toType.isInstanceOf[StringType]) { - null + nullable: Boolean = true, fieldName: String): Any = { + if (fieldValue == null || fieldValue == "") { + "#N/A" } else { toType match { case _: ByteType => fieldValue.toByte @@ -152,7 +150,7 @@ case class DatasetRelation( } } - private def shouldEncode(fieldName: String) : Boolean = { + private def shouldEncode(fieldName: String): Boolean = { if (encodeFields != null && encodeFields.isDefined) { val toBeEncodedField = encodeFields.get.split(",") return toBeEncodedField.contains(fieldName) @@ -183,14 +181,14 @@ case class DatasetRelation( sqlContext.sparkContext.parallelize(sampleRowArray) } - private def getSampleSize : Integer = { + private def getSampleSize: Integer = { // If the record is less than sampleSize, then the whole data is used as sample val totalRecordsSize = records.size() logger.debug("Total Record Size: " + totalRecordsSize) if (totalRecordsSize < sampleSize) { logger.debug("Total Record Size " + totalRecordsSize - + " is Smaller than Sample Size " - + sampleSize + ". So total records are used for sampling") + + " is Smaller than Sample Size " + + sampleSize + ". So total records are used for sampling") totalRecordsSize } else { sampleSize @@ -200,7 +198,7 @@ case class DatasetRelation( private def header: Array[String] = { val sampleList = sample - var header : Array[String] = null + var header: Array[String] = null for (currentRecord <- sampleList) { logger.debug("record size " + currentRecord.size()) val recordHeader = new Array[String](currentRecord.size()) @@ -275,7 +273,7 @@ case class DatasetRelation( sqlContext.sparkContext.parallelize(rowArray) } - private def fieldValue(row: java.util.Map[String, String], name: String) : String = { + private def fieldValue(row: java.util.Map[String, String], name: String): String = { if (row.contains(name)) { row(name) } else { From e9b423c22d7b54f127efd53434df809d8481700c Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 12:20:35 -0500 Subject: [PATCH 11/31] maybe empty string is dumb --- .../scala/com/springml/spark/salesforce/DatasetRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala index 4e19c63..a9ea795 100644 --- a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala @@ -116,7 +116,7 @@ case class DatasetRelation( private def cast(fieldValue: String, toType: DataType, nullable: Boolean = true, fieldName: String): Any = { - if (fieldValue == null || fieldValue == "") { + if (fieldValue == null || fieldValue.isEmpty) { "#N/A" } else { toType match { From d934530c57e499f829e7390c5a339b13924ee737 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 13:07:52 -0500 Subject: [PATCH 12/31] make note of reading/writing --- .../spark/salesforce/DatasetRelation.scala | 5 ++++- .../com/springml/spark/salesforce/Utils.scala | 20 ++++++++----------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala index a9ea795..bf01854 100644 --- a/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/DatasetRelation.scala @@ -114,10 +114,13 @@ case class DatasetRelation( return records } + // NOTE: this function is only called when reading the data NOT when writing the data private def cast(fieldValue: String, toType: DataType, nullable: Boolean = true, fieldName: String): Any = { if (fieldValue == null || fieldValue.isEmpty) { - "#N/A" + null + } else if (fieldValue == "" && nullable && !toType.isInstanceOf[StringType]) { + null } else { toType match { case _: ByteType => fieldValue.toByte diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index a944054..3d09eda 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -16,24 +16,20 @@ package com.springml.spark.salesforce -import scala.io.Source -import scala.util.parsing.json._ -import com.sforce.soap.partner.{ Connector, PartnerConnection, SaveResult } -import com.sforce.ws.ConnectorConfig import com.madhukaraphatak.sizeof.SizeEstimator +import com.sforce.soap.partner.fault.UnexpectedErrorFault +import com.sforce.soap.partner.{Connector, PartnerConnection, SaveResult} +import com.sforce.ws.ConnectorConfig +import com.springml.spark.salesforce.metadata.MetadataConstructor import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{ DoubleType, IntegerType, StructType } - -import scala.collection.immutable.HashMap -import com.springml.spark.salesforce.metadata.MetadataConstructor -import com.sforce.soap.partner.sobject.SObject -import scala.concurrent.duration._ -import com.sforce.soap.partner.fault.UnexpectedErrorFault +import org.apache.spark.sql.types.StructType import scala.concurrent.duration.FiniteDuration +import scala.io.Source import scala.util.Try +import scala.util.parsing.json._ /** * Utility to construct metadata and repartition RDD @@ -111,7 +107,7 @@ object Utils extends Serializable { def rowValue(rowVal: Any): String = { if (rowVal == null) { - "" + "#NA" } else { var value = rowVal.toString() if (value.contains("\"")) { From dedec34ae19623d45aa09d8bfb09110eb81c1730 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 14:09:03 -0500 Subject: [PATCH 13/31] switch to cast --- .../spark/salesforce/SFObjectWriter.scala | 36 ++++++++------- .../com/springml/spark/salesforce/Utils.scala | 45 ++++++++++++++++++- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala index 4114f20..877217e 100644 --- a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala @@ -14,23 +14,29 @@ import com.springml.salesforce.wave.model.JobInfo * First column of dataframe contains Salesforce Object * Next subsequent columns are fields to be updated */ -class SFObjectWriter ( - val username: String, - val password: String, - val login: String, - val version: String, - val sfObject: String, - val mode: SaveMode, - val upsert: Boolean, - val externalIdFieldName: String, - val csvHeader: String, - val batchSize: Integer - ) extends Serializable { +class SFObjectWriter( + val username: String, + val password: String, + val login: String, + val version: String, + val sfObject: String, + val mode: SaveMode, + val upsert: Boolean, + val externalIdFieldName: String, + val csvHeader: String, + val batchSize: Integer + ) extends Serializable { @transient val logger = Logger.getLogger(classOf[SFObjectWriter]) def writeData(rdd: RDD[Row]): Boolean = { - val csvRDD = rdd.map(row => row.toSeq.map(value => Utils.rowValue(value)).mkString(",")) + + val csvRDD = rdd.map { row => + val schema = row.schema + row.toSeq.indices.map( + index => Utils.cast(row, schema, index) + ).mkString(",") + } val partitionCnt = (1 + csvRDD.count() / batchSize).toInt val partitionedRDD = csvRDD.repartition(partitionCnt) @@ -43,7 +49,7 @@ class SFObjectWriter ( partitionedRDD.mapPartitionsWithIndex { case (index, iterator) => { val records = iterator.toArray.mkString("\n") - var batchInfoId : String = null + var batchInfoId: String = null if (records != null && !records.isEmpty()) { val data = csvHeader + "\n" + records val batchInfo = bulkAPI.addBatch(jobId, data) @@ -70,7 +76,7 @@ class SFObjectWriter ( } print("Returning false...") - logger.info("Job not completed. Timeout..." ) + logger.info("Job not completed. Timeout...") false } diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index 3d09eda..58fcd8f 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -16,6 +16,8 @@ package com.springml.spark.salesforce +import java.text.SimpleDateFormat + import com.madhukaraphatak.sizeof.SizeEstimator import com.sforce.soap.partner.fault.UnexpectedErrorFault import com.sforce.soap.partner.{Connector, PartnerConnection, SaveResult} @@ -24,7 +26,7 @@ import com.springml.spark.salesforce.metadata.MetadataConstructor import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, DateType, StringType, StructType, TimestampType} import scala.concurrent.duration.FiniteDuration import scala.io.Source @@ -48,6 +50,8 @@ object Utils extends Serializable { Connector.newConnection(config) } + @transient val formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") + def logSaveResultError(result: SaveResult): Unit = { result.getErrors.map(error => { @@ -107,7 +111,7 @@ object Utils extends Serializable { def rowValue(rowVal: Any): String = { if (rowVal == null) { - "#NA" + "" } else { var value = rowVal.toString() if (value.contains("\"")) { @@ -120,6 +124,43 @@ object Utils extends Serializable { } } + def cast(row: Row, toType: DataType, index: Int): String = { + toType match { + case _: StringType => { + val fieldValue = row.getAs[String](index) + if (fieldValue == null) { + "#NA" + } else { + var value = fieldValue + if (value.contains("\"")) { + value = value.replaceAll("\"", "\"\"") + } + if (value.contains("\"") || value.contains("\n") || value.contains(",")) { + value = "\"" + value + "\"" + } + value + } + } + case _: DateType => { + val fieldValue = row.getAs[java.sql.Date](index) + if (fieldValue == null) { + "" + } else { + formatter.format(fieldValue) + } + } + case _: TimestampType => { + val fieldValue = row.getAs[java.sql.Timestamp](index) + if (fieldValue == null) { + "" + } else { + formatter.format(fieldValue) + } + } + case _ => rowValue(row.get(index)) + } + } + def metadataConfig(usersMetadataConfig: Option[String]) = { var systemMetadataConfig = readMetadataConfig() if (usersMetadataConfig != null && usersMetadataConfig.isDefined) { From 5f4437f63c90560bf8f67ff0f48fc470de45e1f6 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 14:23:34 -0500 Subject: [PATCH 14/31] fix --- .../scala/com/springml/spark/salesforce/SFObjectWriter.scala | 4 ++-- src/main/scala/com/springml/spark/salesforce/Utils.scala | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala index 877217e..e024f2b 100644 --- a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala @@ -32,9 +32,9 @@ class SFObjectWriter( def writeData(rdd: RDD[Row]): Boolean = { val csvRDD = rdd.map { row => - val schema = row.schema + val schema = row.schema.fields row.toSeq.indices.map( - index => Utils.cast(row, schema, index) + index => Utils.cast(row, schema(index).dataType, index) ).mkString(",") } diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index 58fcd8f..2bcbd40 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -125,6 +125,7 @@ object Utils extends Serializable { } def cast(row: Row, toType: DataType, index: Int): String = { + toType toType match { case _: StringType => { val fieldValue = row.getAs[String](index) From 6770197ea343d0d691651a52a79481f033f78548 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 14:38:56 -0500 Subject: [PATCH 15/31] a --- src/main/scala/com/springml/spark/salesforce/Utils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index 2bcbd40..58fcd8f 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -125,7 +125,6 @@ object Utils extends Serializable { } def cast(row: Row, toType: DataType, index: Int): String = { - toType toType match { case _: StringType => { val fieldValue = row.getAs[String](index) From c1d8e0bc7ef77ce5a9028579091816a8e1352d79 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 14:53:32 -0500 Subject: [PATCH 16/31] add empty string na --- src/main/scala/com/springml/spark/salesforce/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index 58fcd8f..dfbb4f9 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -128,7 +128,7 @@ object Utils extends Serializable { toType match { case _: StringType => { val fieldValue = row.getAs[String](index) - if (fieldValue == null) { + if (fieldValue == null || fieldValue == "") { "#NA" } else { var value = fieldValue From 2451cb6f7a11e1f3c2b235ead564e00491c4de1c Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 16:52:53 -0500 Subject: [PATCH 17/31] make all null values #N/A aside from dates update gitignore a --- .gitignore | 1 + .../com/springml/spark/salesforce/Utils.scala | 15 ++++----------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 07a31f7..6058ad4 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ target/ project/target dependency-reduced-pom.xml /bin/ +.DS_Store \ No newline at end of file diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index dfbb4f9..4295ecb 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -111,7 +111,7 @@ object Utils extends Serializable { def rowValue(rowVal: Any): String = { if (rowVal == null) { - "" + "#NA" } else { var value = rowVal.toString() if (value.contains("\"")) { @@ -128,17 +128,10 @@ object Utils extends Serializable { toType match { case _: StringType => { val fieldValue = row.getAs[String](index) - if (fieldValue == null || fieldValue == "") { - "#NA" + if (fieldValue == "") { + rowValue(null) } else { - var value = fieldValue - if (value.contains("\"")) { - value = value.replaceAll("\"", "\"\"") - } - if (value.contains("\"") || value.contains("\n") || value.contains(",")) { - value = "\"" + value + "\"" - } - value + rowValue(fieldValue) } } case _: DateType => { From 7e068e5c86383594a9f58a543fdb763e85df7790 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 17:13:43 -0500 Subject: [PATCH 18/31] fix dumb mistake --- src/main/scala/com/springml/spark/salesforce/Utils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index 4295ecb..349d1a4 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -111,7 +111,7 @@ object Utils extends Serializable { def rowValue(rowVal: Any): String = { if (rowVal == null) { - "#NA" + "#N/A" } else { var value = rowVal.toString() if (value.contains("\"")) { @@ -137,7 +137,7 @@ object Utils extends Serializable { case _: DateType => { val fieldValue = row.getAs[java.sql.Date](index) if (fieldValue == null) { - "" + rowValue(null) } else { formatter.format(fieldValue) } @@ -145,7 +145,7 @@ object Utils extends Serializable { case _: TimestampType => { val fieldValue = row.getAs[java.sql.Timestamp](index) if (fieldValue == null) { - "" + rowValue(null) } else { formatter.format(fieldValue) } From 03491b12ecd28fa8699a48e87fc7fc994491630d Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 17:57:23 -0500 Subject: [PATCH 19/31] add special case for booleans --- .../spark/salesforce/SFObjectWriter.scala | 2 +- .../com/springml/spark/salesforce/Utils.scala | 20 ++++--------------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala index e024f2b..b5be4b3 100644 --- a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala @@ -34,7 +34,7 @@ class SFObjectWriter( val csvRDD = rdd.map { row => val schema = row.schema.fields row.toSeq.indices.map( - index => Utils.cast(row, schema(index).dataType, index) + index => Utils.rowValue(row, schema(index).dataType, index) ).mkString(",") } diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index 349d1a4..e6847a9 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -26,7 +26,7 @@ import com.springml.spark.salesforce.metadata.MetadataConstructor import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{DataType, DateType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, DataType, StringType, StructType} import scala.concurrent.duration.FiniteDuration import scala.io.Source @@ -134,21 +134,9 @@ object Utils extends Serializable { rowValue(fieldValue) } } - case _: DateType => { - val fieldValue = row.getAs[java.sql.Date](index) - if (fieldValue == null) { - rowValue(null) - } else { - formatter.format(fieldValue) - } - } - case _: TimestampType => { - val fieldValue = row.getAs[java.sql.Timestamp](index) - if (fieldValue == null) { - rowValue(null) - } else { - formatter.format(fieldValue) - } + case _: BooleanType => { + // salesforce doesn't allow null booleans + rowValue(row.getAs[Boolean](index)) } case _ => rowValue(row.get(index)) } From 3f9358dc8dc689f81aa985a31e9b0049b751053f Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 18:19:45 -0500 Subject: [PATCH 20/31] dumb mistake --- .../com/springml/spark/salesforce/SFObjectWriter.scala | 2 +- src/main/scala/com/springml/spark/salesforce/Utils.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala index b5be4b3..e024f2b 100644 --- a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala @@ -34,7 +34,7 @@ class SFObjectWriter( val csvRDD = rdd.map { row => val schema = row.schema.fields row.toSeq.indices.map( - index => Utils.rowValue(row, schema(index).dataType, index) + index => Utils.cast(row, schema(index).dataType, index) ).mkString(",") } diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index e6847a9..a2c444c 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -126,6 +126,10 @@ object Utils extends Serializable { def cast(row: Row, toType: DataType, index: Int): String = { toType match { + case _: BooleanType => { + // salesforce doesn't allow null booleans + rowValue(row.getAs[Boolean](index)) + } case _: StringType => { val fieldValue = row.getAs[String](index) if (fieldValue == "") { @@ -134,10 +138,6 @@ object Utils extends Serializable { rowValue(fieldValue) } } - case _: BooleanType => { - // salesforce doesn't allow null booleans - rowValue(row.getAs[Boolean](index)) - } case _ => rowValue(row.get(index)) } } From f7ed7f2ca67a77ec75d2de4ae9813717695f9461 Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 18:49:00 -0500 Subject: [PATCH 21/31] fix data writer --- .../springml/spark/salesforce/DataWriter.scala | 7 ++++++- .../com/springml/spark/salesforce/Utils.scala | 15 ++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/DataWriter.scala b/src/main/scala/com/springml/spark/salesforce/DataWriter.scala index 33354ec..487456d 100644 --- a/src/main/scala/com/springml/spark/salesforce/DataWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/DataWriter.scala @@ -74,7 +74,12 @@ class DataWriter ( } def writeData(rdd: RDD[Row], metadataId: String): Boolean = { - val csvRDD = rdd.map(row => row.toSeq.map(value => Utils.rowValue(value)).mkString(",")) + val csvRDD = rdd.map{row => + val schema = row.schema.fields + row.toSeq.indices.map( + index => Utils.cast(row, schema(index).dataType, index) + ).mkString(",") + } csvRDD.mapPartitionsWithIndex { case (index, iterator) => { @transient val logger = Logger.getLogger(classOf[DataWriter]) diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index a2c444c..f8709ad 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -128,7 +128,7 @@ object Utils extends Serializable { toType match { case _: BooleanType => { // salesforce doesn't allow null booleans - rowValue(row.getAs[Boolean](index)) + Option(row.getAs[Boolean](index)).getOrElse(false).toString } case _: StringType => { val fieldValue = row.getAs[String](index) @@ -142,6 +142,19 @@ object Utils extends Serializable { } } +// def cast(row: Row, toType: DataType, index: Int): String = { +// toType match { +// case _: BooleanType => { +// // salesforce doesn't allow null booleans +// row.getAs[Boolean](index).toString +// } +// case _: StringType => { +// "b" +// } +// case _ => "c" +// } +// } + def metadataConfig(usersMetadataConfig: Option[String]) = { var systemMetadataConfig = readMetadataConfig() if (usersMetadataConfig != null && usersMetadataConfig.isDefined) { From 99c125d8e1fddd9619f58ee78ba6993bb1541eec Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Tue, 2 Feb 2021 18:51:02 -0500 Subject: [PATCH 22/31] fix --- src/main/scala/com/springml/spark/salesforce/DataWriter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/springml/spark/salesforce/DataWriter.scala b/src/main/scala/com/springml/spark/salesforce/DataWriter.scala index 487456d..c6ce2d5 100644 --- a/src/main/scala/com/springml/spark/salesforce/DataWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/DataWriter.scala @@ -80,6 +80,7 @@ class DataWriter ( index => Utils.cast(row, schema(index).dataType, index) ).mkString(",") } + csvRDD.mapPartitionsWithIndex { case (index, iterator) => { @transient val logger = Logger.getLogger(classOf[DataWriter]) From 4661c15819205914e72aff716276fedaeb17863d Mon Sep 17 00:00:00 2001 From: Stephen Kinser Date: Wed, 3 Feb 2021 12:23:44 -0500 Subject: [PATCH 23/31] delete commented code --- .../scala/com/springml/spark/salesforce/Utils.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index f8709ad..d46589a 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -142,19 +142,6 @@ object Utils extends Serializable { } } -// def cast(row: Row, toType: DataType, index: Int): String = { -// toType match { -// case _: BooleanType => { -// // salesforce doesn't allow null booleans -// row.getAs[Boolean](index).toString -// } -// case _: StringType => { -// "b" -// } -// case _ => "c" -// } -// } - def metadataConfig(usersMetadataConfig: Option[String]) = { var systemMetadataConfig = readMetadataConfig() if (usersMetadataConfig != null && usersMetadataConfig.isDefined) { From b6fe136f17409ddae5c9ac4ace960949c9d9c09c Mon Sep 17 00:00:00 2001 From: Michael Weldon Date: Wed, 27 Apr 2022 14:09:28 -0700 Subject: [PATCH 24/31] add support for max column width to csv parser (#6) * add support for max column width to csv parser * pom changes --- pom.xml | 4 ++-- .../com/springml/spark/salesforce/BulkRelation.scala | 4 +++- .../com/springml/spark/salesforce/DefaultSource.scala | 10 +++++++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 619d735..3a996d9 100644 --- a/pom.xml +++ b/pom.xml @@ -2,11 +2,11 @@ 4.0.0 - com.springml + com.goodleap spark-salesforce jar spark-salesforce - 1.1.3 + 1.1.4 spark-salesforce com.springml diff --git a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala index e7c5333..49196d1 100644 --- a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala @@ -34,7 +34,8 @@ case class BulkRelation( userSchema: StructType, sqlContext: SQLContext, inferSchema: Boolean, - timeout: Long) extends BaseRelation with TableScan { + timeout: Long, + maxCharsPerColumn: Int) extends BaseRelation with TableScan { import sqlContext.sparkSession.implicits._ @@ -77,6 +78,7 @@ case class BulkRelation( val parserSettings = new CsvParserSettings() parserSettings.setLineSeparatorDetectionEnabled(true) parserSettings.getFormat.setNormalizedNewline(' ') + parserSettings.setMaxCharsPerColumn(maxCharsPerColumn) val readerParser = new CsvParser(parserSettings) val parsedInput = readerParser.parseAll(inputReader).asScala diff --git a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala index b65f034..7637327 100644 --- a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala +++ b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala @@ -201,6 +201,13 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr throw new Exception("sfObject must not be empty when performing bulk query") } + val maxCharsPerColumnStr = parameters.getOrElse("maxCharsPerColumn", "4096") + val maxCharsPerColumn = try { + maxCharsPerColumnStr.toInt + } catch { + case e: Exception => throw new Exception("maxCharsPerColumn must be a valid integer") + } + val timeoutStr = parameters.getOrElse("timeout", "600000") val timeout = try { timeoutStr.toLong @@ -239,7 +246,8 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr schema, sqlContext, inferSchemaFlag, - timeout + timeout, + maxCharsPerColumn ) } From 7581bf87183b4581eb967193afc73239f3313e7f Mon Sep 17 00:00:00 2001 From: SKinserLoanpal Date: Tue, 15 Nov 2022 13:57:18 -0600 Subject: [PATCH 25/31] shade dependency --- pom.xml | 51 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 3a996d9..2253464 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ 1.8 1.8 UTF-8 - 3.0.0 + 3.3.0 2.12.11 2.12 @@ -68,10 +68,21 @@ 40.0.0 - com.github.loanpal-engineering + com.springml salesforce-wave-api - 1d662ac + 1.0.8-loanpal + + + + + + + + + + + org.mockito mockito-core @@ -91,8 +102,19 @@ org.codehaus.woodstox woodstox-core-asl - 4.4.0 + 4.4.1 + + + org.codehaus.woodstox + stax2-api + + + + + + + src/main/scala @@ -118,6 +140,27 @@ + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + package + + shade + + + + + com.fasterxml.jackson.dataformat + com.shaded.fasterxml.jackson.dataformat + + + + + + org.apache.maven.plugins maven-shade-plugin From b19abd06f548ea0df532a61f786cefd393f91427 Mon Sep 17 00:00:00 2001 From: unintellisense Date: Thu, 16 Mar 2023 18:37:16 -0700 Subject: [PATCH 26/31] support pkChunks with filtering (handling empty batches) --- pom.xml | 8 ++++---- .../com/springml/spark/salesforce/BulkRelation.scala | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 2253464..3c5d58f 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ spark-salesforce jar spark-salesforce - 1.1.4 + 1.1.5 spark-salesforce com.springml @@ -68,9 +68,9 @@ 40.0.0 - com.springml + com.github.loanpal-engineering salesforce-wave-api - 1.0.8-loanpal + v1.1.0 @@ -97,7 +97,7 @@ com.github.loanpal-engineering java-sizeof - 6d786c8 + 0.1 org.codehaus.woodstox diff --git a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala index 49196d1..dbe593a 100644 --- a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala @@ -72,6 +72,7 @@ case class BulkRelation( // Use Csv parser to split CSV by rows to cover edge cases (ex. escaped characters, new line within string, etc) def splitCsvByRows(csvString: String): Seq[String] = { + if (csvString == "Records not found for this query") Seq.empty // The CsvParser interface only interacts with IO, so StringReader and StringWriter val inputReader = new StringReader(csvString) From 6a658f14c3dfba026e84535b16ccfed225bb42fd Mon Sep 17 00:00:00 2001 From: gwadley-goodleap <94405592+gwadley-goodleap@users.noreply.github.com> Date: Wed, 24 May 2023 16:25:41 -0500 Subject: [PATCH 27/31] Create codeowners --- .github/codeowners | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .github/codeowners diff --git a/.github/codeowners b/.github/codeowners new file mode 100644 index 0000000..20a7ed5 --- /dev/null +++ b/.github/codeowners @@ -0,0 +1,2 @@ +* @loanpal-engineering/business-intelligence-bi +.github/codeowners @loanpal-engineering/security @loanpal-engineering/DevOps From 850f035ffc265ae4940ffdf0db2e9d813c431161 Mon Sep 17 00:00:00 2001 From: SKinserLoanpal Date: Wed, 29 Nov 2023 13:26:55 -0800 Subject: [PATCH 28/31] bulk api 2.0 --- build.sbt | 90 ---------- pom.xml | 47 ++--- project/plugins.sbt | 7 - .../spark/salesforce/DefaultSource.scala | 41 ++++- .../spark/salesforce/SFObjectWriter.scala | 7 +- .../spark/salesforce/SFObjectWriter2.scala | 160 ++++++++++++++++++ .../com/springml/spark/salesforce/Utils.scala | 3 +- 7 files changed, 234 insertions(+), 121 deletions(-) delete mode 100644 build.sbt delete mode 100644 project/plugins.sbt create mode 100644 src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala diff --git a/build.sbt b/build.sbt deleted file mode 100644 index 275ff7a..0000000 --- a/build.sbt +++ /dev/null @@ -1,90 +0,0 @@ -name := "spark-salesforce" - -version := "1.1.3" - -organization := "com.springml" - -scalaVersion := "2.11.8" - -resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" -resolvers += "jitpack" at "https://jitpack.io" - -libraryDependencies ++= Seq( - "com.force.api" % "force-wsc" % "40.0.0", - "com.force.api" % "force-partner-api" % "40.0.0", - "com.github.loanpal-engineering" % "salesforce-wave-api" % "eb71436", - "org.mockito" % "mockito-core" % "2.0.31-beta" -) - -parallelExecution in Test := false - -resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) - -resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" - -resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" - -resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" - -resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven" - -libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" -libraryDependencies += "com.madhukaraphatak" %% "java-sizeof" % "0.1" -libraryDependencies += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.4.4" -libraryDependencies += "org.codehaus.woodstox" % "woodstox-core-asl" % "4.4.0" - -// Spark Package Details (sbt-spark-package) -spName := "springml/spark-salesforce" - -spAppendScalaVersion := true - -sparkVersion := "2.2.0" - -sparkComponents += "sql" - -publishMavenStyle := true - -spIncludeMaven := true - -spShortDescription := "Spark Salesforce Wave Connector" - -spDescription := """Spark Salesforce Wave Connector - | - Creates Salesforce Wave Datasets using dataframe - | - Constructs Salesforce Wave dataset's metadata using schema present in dataframe - | - Can use custom metadata for constructing Salesforce Wave dataset's metadata""".stripMargin - -// licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0") - -credentials += Credentials(Path.userHome / ".ivy2" / ".credentials") - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (version.value.endsWith("SNAPSHOT")) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomExtra := ( - https://github.com/springml/spark-salesforce - - - Apache License, Verision 2.0 - http://www.apache.org/licenses/LICENSE-2.0.html - repo - - - - scm:git:github.com/springml/spark-salesforce - scm:git:git@github.com:springml/spark-salesforce - github.com/springml/spark-salesforce - - - - springml - Springml - http://www.springml.com - - ) - - diff --git a/pom.xml b/pom.xml index 3c5d58f..636708b 100644 --- a/pom.xml +++ b/pom.xml @@ -68,21 +68,21 @@ 40.0.0 - com.github.loanpal-engineering + com.springml salesforce-wave-api - v1.1.0 + 1.0.8-loanpal - - - - - - - - - - - + + + + + + + + + + + org.mockito mockito-core @@ -95,9 +95,14 @@ test - com.github.loanpal-engineering - java-sizeof - 0.1 + endolabs.salesforce + bulkv2 + 1.0.0 + + + com.frejo + force-rest-api + 0.0.42 org.codehaus.woodstox @@ -110,11 +115,11 @@ - - - - - + + + + + src/main/scala diff --git a/project/plugins.sbt b/project/plugins.sbt deleted file mode 100644 index f996987..0000000 --- a/project/plugins.sbt +++ /dev/null @@ -1,7 +0,0 @@ -resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/" - -addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.6") -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.5.0") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") diff --git a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala index 7637327..8af1d1b 100644 --- a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala +++ b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala @@ -125,6 +125,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val monitorJob = parameters.getOrElse("monitorJob", "false") val externalIdFieldName = parameters.getOrElse("externalIdFieldName", "Id") val batchSizeStr = parameters.getOrElse("batchSize", "5000") + val bulkApiV2Str = parameters.getOrElse("bulkApiV2", "false") val batchSize = Try(batchSizeStr.toInt) match { case Success(v)=> v case Failure(e)=> { @@ -134,6 +135,14 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr } } + val bulkAPIV2 = Try(bulkApiV2Str.toBoolean) match { + case Success(v) => v + case Failure(e) => { + val errorMsg = "bulkAPIV2 parameter not a boolean." + logger.error(errorMsg) + throw new Exception(errorMsg) + } + } validateMutualExclusive(datasetName, sfObject, "datasetName", "sfObject") if (datasetName.isDefined) { @@ -150,8 +159,14 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr flag(upsert, "upsert"), flag(monitorJob, "monitorJob"), data, metadataFile) } else { logger.info("Updating Salesforce Object") - updateSalesforceObject(username, password, login, version, sfObject.get, mode, + if(bulkAPIV2) { + updateSalesforceObjectV2(username, password, login, version, sfObject.get, mode, + flag(upsert, "upsert"), externalIdFieldName, batchSize, data) + } else { + updateSalesforceObject(username, password, login, version, sfObject.get, mode, flag(upsert, "upsert"), externalIdFieldName, batchSize, data) + } + } return createReturnRelation(data) @@ -185,6 +200,30 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr } + private def updateSalesforceObjectV2( + username: String, + password: String, + login: String, + version: String, + sfObject: String, + mode: SaveMode, + upsert: Boolean, + externalIdFieldName: String, + batchSize: Integer, + data: DataFrame) { + + val csvHeader = Utils.csvHeadder(data.schema) + + val writer = new SFObjectWriter2(username, password, login, version, sfObject, mode, upsert, externalIdFieldName, csvHeader, batchSize) + logger.info("Writing data") + val successfulWrite = writer.writeData(data.rdd) + logger.info(s"Writing data was successful was $successfulWrite") + if (!successfulWrite) { + sys.error("Unable to update salesforce object") + } + + } + private def createBulkRelation( sqlContext: SQLContext, username: String, diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala index e024f2b..68313e1 100644 --- a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter.scala @@ -8,6 +8,9 @@ import com.springml.salesforce.wave.api.BulkAPI import com.springml.salesforce.wave.util.WaveAPIConstants import com.springml.salesforce.wave.model.JobInfo +import scala.collection.JavaConverters.asScalaBufferConverter +import scala.util.Try + /** * Write class responsible for update Salesforce object using data provided in dataframe @@ -43,6 +46,8 @@ class SFObjectWriter( val jobInfo = new JobInfo(WaveAPIConstants.STR_CSV, sfObject, operation(mode, upsert)) jobInfo.setExternalIdFieldName(externalIdFieldName) +// jobInfo.setConcurrencyMode("Serial") +// jobInfo.setNumberRetries("10") val jobId = bulkAPI.createJob(jobInfo).getId @@ -65,11 +70,11 @@ class SFObjectWriter( bulkAPI.closeJob(jobId) var i = 1 while (i < 999999) { + val isComplete = bulkAPI.isCompleted(jobId) if (bulkAPI.isCompleted(jobId)) { logger.info("Job completed") return true } - logger.info("Job not completed, waiting...") Thread.sleep(200) i = i + 1 diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala new file mode 100644 index 0000000..10ee52e --- /dev/null +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala @@ -0,0 +1,160 @@ +package com.springml.spark.salesforce + +import java.io.BufferedReader + +import com.springml.salesforce.wave.api.{APIFactory, BulkAPI} +import com.springml.salesforce.wave.model.JobInfo +import com.springml.salesforce.wave.util.WaveAPIConstants +import org.apache.log4j.Logger +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SaveMode} +import endolabs.salesforce.bulkv2.{AccessToken, Bulk2Client, Bulk2ClientBuilder} +import com.force.api._ +import endolabs.salesforce.bulkv2.`type`.OperationEnum + +import scala.collection.JavaConverters.{asScalaBufferConverter, asScalaIteratorConverter} + +class SFObjectWriter2(val username: String, + val password: String, + val login: String, + val version: String, + val sfObject: String, + val mode: SaveMode, + val upsert: Boolean, + val externalIdFieldName: String, + val csvHeader: String, + val batchSize: Integer) extends Serializable { + + @transient val logger = Logger.getLogger(classOf[SFObjectWriter]) + + def writeData(rdd: RDD[Row]): Boolean = { + + val csvRDD = rdd.map { row => + val schema = row.schema.fields + row.toSeq.indices.map( + index => Utils.cast(row, schema(index).dataType, index) + ).mkString(",") + } + +// val partitionCnt = (1 + csvRDD.count() / batchSize).toInt +// val partitionedRDD = csvRDD.repartition(partitionCnt) + +// val jobInfo = new JobInfo(WaveAPIConstants.STR_CSV, sfObject, operation(mode, upsert)) +// jobInfo.setExternalIdFieldName(externalIdFieldName) + // jobInfo.setConcurrencyMode("Serial") + + // jobInfo.setNumberRetries("10") + val OperationEnum = operation(mode, upsert) +// val jobId = bulkAPI.createJob(jobInfo).getId + var bulkJobIDs = csvRDD.mapPartitionsWithIndex { + case (index, iterator) => { + + val records = iterator.toArray.mkString("\n") + var batchInfoId: String = null + var id = "" + if (records != null && !records.isEmpty()) { + val createResponse = client.createJob(sfObject, OperationEnum) + id = createResponse.getId + val data = csvHeader + "\n" + records +// val batchInfo = bulkAPI.addBatch(jobId, data) +// batchInfoId = batchInfo.getId + client.uploadJobData(id, data) + client.closeJob(createResponse.getId) +// id = createResponse.getId + } + +// val success = (batchInfoId != null) + // Job status will be checked after completing all batches + List(id).iterator + } + }.collect().filter(_ != null) + + + var i = 1 + var failedRecords = 0 + val TIMEOUT_MAX = 7000 + val BREAK_LOOP = 9000 + while (i < TIMEOUT_MAX) { + val data = bulkJobIDs.map{ + id => client.getJobInfo(id) + } + failedRecords += data.map(x => x.getNumberRecordsFailed.toInt).sum + +// val printFailedResults = new BufferedReader(client.getJobFailedRecordResults(data.head)) + data.foreach{ + x => if (x.getNumberRecordsFailed != 0) { + logger.info(s"${x.getRetries} number of retries") + val results = new BufferedReader(client.getJobFailedRecordResults(x.getId)) + results.lines().iterator().asScala.foreach{ + line => + println(line) + logger.info(line) + } + } + } + println(s"${data.count(x=> x.isFinished)} finished jobs ${data.length} remanining jobs") + println(s"${data.take(10).map(x => (x.getId, x.getState.toJsonValue)).mkString(",")} sample states") + if (data.count(x=> x.isFinished) == data.length) { + i = BREAK_LOOP + } +// if(client().getAllJobs.getRecords.asScala.filter(!_.isFinished).isEmpty) { +// i = BREAK_LOOP +// } + bulkJobIDs = data.filter(x => !x.isFinished).map(x => x.getId) + logger.info("Job not completed, waiting...") + Thread.sleep(2000) + i = i + 1 + } + + if (i == BREAK_LOOP && failedRecords != 0){ + return true + } else if(failedRecords != 0) { + logger.info("Job failed. Timeout...") + return true + } + print("Returning false...") + logger.info("Job not completed. Timeout...") + false + + } + +// // Create new instance of BulkAPI every time because Spark workers cannot serialize the object +// private def bulkAPI(): BulkAPI = { +// APIFactory.getInstance().bulkAPI(username, password, login, version) +// } +// val bulkAPIClient = new ForceApi(new ApiConfig() +// .setUsername(username) +// .setPassword(password) +// .setLoginEndpoint(login) +// .setApiVersion(ApiVersion.V48) +//) + private def bulkAPIClient(): ForceApi = { + new ForceApi(new ApiConfig() + .setUsername(username) + .setPassword(password) + .setLoginEndpoint(login) + .setApiVersion(ApiVersion.V48)) +} + private def client() = { + + new Bulk2ClientBuilder().withSessionId(bulkAPIClient.getSession.getAccessToken, bulkAPIClient.getSession.getApiEndpoint) + .build() + } + + private def operation(mode: SaveMode, upsert: Boolean): OperationEnum = { + if (upsert) { + OperationEnum.UPSERT + } else if (mode != null && SaveMode.Overwrite.name().equalsIgnoreCase(mode.name())) { +// WaveAPIConstants.STR_UPDATE + OperationEnum.UPDATE + } else if (mode != null && SaveMode.Append.name().equalsIgnoreCase(mode.name())) { +// WaveAPIConstants.STR_INSERT + OperationEnum.INSERT + } else { + logger.warn("SaveMode " + mode + " Not supported. Using 'insert' operation") + OperationEnum.INSERT + } + } + + +} diff --git a/src/main/scala/com/springml/spark/salesforce/Utils.scala b/src/main/scala/com/springml/spark/salesforce/Utils.scala index d46589a..a337ebc 100644 --- a/src/main/scala/com/springml/spark/salesforce/Utils.scala +++ b/src/main/scala/com/springml/spark/salesforce/Utils.scala @@ -18,7 +18,8 @@ package com.springml.spark.salesforce import java.text.SimpleDateFormat -import com.madhukaraphatak.sizeof.SizeEstimator +//import com.madhukaraphatak.sizeof.SizeEstimator +import org.apache.spark.util.SizeEstimator import com.sforce.soap.partner.fault.UnexpectedErrorFault import com.sforce.soap.partner.{Connector, PartnerConnection, SaveResult} import com.sforce.ws.ConnectorConfig From e1d7e59da2238182449941f3d18d79b0a3e20062 Mon Sep 17 00:00:00 2001 From: SKinserLoanpal Date: Thu, 11 Jan 2024 10:38:08 -0800 Subject: [PATCH 29/31] fix --- .../spark/salesforce/SFObjectWriter2.scala | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala index 10ee52e..3eaca3b 100644 --- a/src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala +++ b/src/main/scala/com/springml/spark/salesforce/SFObjectWriter2.scala @@ -11,7 +11,9 @@ import org.apache.spark.sql.{Row, SaveMode} import endolabs.salesforce.bulkv2.{AccessToken, Bulk2Client, Bulk2ClientBuilder} import com.force.api._ import endolabs.salesforce.bulkv2.`type`.OperationEnum +import endolabs.salesforce.bulkv2.response.GetJobInfoResponse +import scala.util.Try import scala.collection.JavaConverters.{asScalaBufferConverter, asScalaIteratorConverter} class SFObjectWriter2(val username: String, @@ -68,15 +70,19 @@ class SFObjectWriter2(val username: String, List(id).iterator } }.collect().filter(_ != null) - + val allIds = bulkJobIDs var i = 1 var failedRecords = 0 val TIMEOUT_MAX = 7000 val BREAK_LOOP = 9000 + var isEmpty = false while (i < TIMEOUT_MAX) { - val data = bulkJobIDs.map{ - id => client.getJobInfo(id) + var data: Array[GetJobInfoResponse] = Array() + try { + data = bulkJobIDs.map{ + id => + client.getJobInfo(id) } failedRecords += data.map(x => x.getNumberRecordsFailed.toInt).sum @@ -94,19 +100,25 @@ class SFObjectWriter2(val username: String, } println(s"${data.count(x=> x.isFinished)} finished jobs ${data.length} remanining jobs") println(s"${data.take(10).map(x => (x.getId, x.getState.toJsonValue)).mkString(",")} sample states") + } catch { + case e: Exception => println(e) + bulkJobIDs = allIds + } if (data.count(x=> x.isFinished) == data.length) { i = BREAK_LOOP + isEmpty = data.isEmpty + } else { + bulkJobIDs = data.filter(x => !x.isFinished).map(x => x.getId) + logger.info("Job not completed, waiting...") + Thread.sleep(10000) + i = i + 1 } -// if(client().getAllJobs.getRecords.asScala.filter(!_.isFinished).isEmpty) { -// i = BREAK_LOOP -// } - bulkJobIDs = data.filter(x => !x.isFinished).map(x => x.getId) - logger.info("Job not completed, waiting...") - Thread.sleep(2000) - i = i + 1 + } + if(isEmpty) { + return true } - if (i == BREAK_LOOP && failedRecords != 0){ + if (i == BREAK_LOOP && failedRecords == 0){ return true } else if(failedRecords != 0) { logger.info("Job failed. Timeout...") From 7316f09cb5b1d9e68c7af0a5b02e770380c0102f Mon Sep 17 00:00:00 2001 From: SKinserLoanpal Date: Tue, 16 Jan 2024 09:53:35 -0800 Subject: [PATCH 30/31] change force api to 53 --- pom.xml | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 636708b..e5221dc 100644 --- a/pom.xml +++ b/pom.xml @@ -24,12 +24,6 @@ scm:git:git@github.com:springml/spark-salesforce github.com/springml/spark-salesforce - - - jitpack.io - https://jitpack.io - - 1.8 1.8 @@ -57,15 +51,16 @@ ${spark.version} provided + com.force.api force-wsc - 40.0.0 + 53.0.0 com.force.api force-partner-api - 40.0.0 + 53.0.0 com.springml From 8ccd4323721d2e6c016518ab31b8ff116125eea1 Mon Sep 17 00:00:00 2001 From: SKinserLoanpal Date: Thu, 23 May 2024 10:18:26 -0700 Subject: [PATCH 31/31] fix max columns --- .../com/springml/spark/salesforce/BulkRelation.scala | 5 ++++- .../com/springml/spark/salesforce/DefaultSource.scala | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala index dbe593a..3842ea4 100644 --- a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala @@ -35,7 +35,8 @@ case class BulkRelation( sqlContext: SQLContext, inferSchema: Boolean, timeout: Long, - maxCharsPerColumn: Int) extends BaseRelation with TableScan { + maxCharsPerColumn: Int, + maxColumns: Int) extends BaseRelation with TableScan { import sqlContext.sparkSession.implicits._ @@ -80,6 +81,7 @@ case class BulkRelation( parserSettings.setLineSeparatorDetectionEnabled(true) parserSettings.getFormat.setNormalizedNewline(' ') parserSettings.setMaxCharsPerColumn(maxCharsPerColumn) + parserSettings.setMaxColumns(maxColumns) val readerParser = new CsvParser(parserSettings) val parsedInput = readerParser.parseAll(inputReader).asScala @@ -112,6 +114,7 @@ case class BulkRelation( .option("quote", "\"") .option("escape", "\"") .option("multiLine", true) + .option("maxColumns", maxColumns) .csv(csvData) } else { bulkAPI.closeJob(jobId) diff --git a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala index 8af1d1b..060cc08 100644 --- a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala +++ b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala @@ -76,6 +76,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val bulkFlag = flag(bulkStr, "bulk") val queryAllStr = parameters.getOrElse("queryAll", "false") + // val maxColumns = parameters.getOrElse("maxColumns", "512") val queryAllFlag = flag(queryAllStr, "queryAll") validateMutualExclusive(saql, soql, "saql", "soql") @@ -273,6 +274,11 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr customHeaders += new BasicHeader("Sforce-Enable-PKChunking", "true") } } + val maxColumns = try { + parameters.getOrElse("maxColumns", "512").toInt + } catch { + case e: Exception => throw new Exception("max columns needs to be a valid integer") + } BulkRelation( username, @@ -286,7 +292,8 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr sqlContext, inferSchemaFlag, timeout, - maxCharsPerColumn + maxCharsPerColumn, + maxColumns ) }