diff --git a/README.md b/README.md
index 503b114..79d1108 100644
--- a/README.md
+++ b/README.md
@@ -20,6 +20,7 @@ You can link against this library in your program at the following ways:
1.1.3
```
+or _2.12 for Scala 2.12
### SBT Dependency
```
diff --git a/build.sbt b/build.sbt
index 9dba6e0..9b14930 100644
--- a/build.sbt
+++ b/build.sbt
@@ -8,12 +8,13 @@ scalaVersion := "2.12.10"
crossScalaVersions := Seq("2.11.12", "2.12.10")
+resolvers += Resolver.mavenLocal
resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
libraryDependencies ++= Seq(
- "com.force.api" % "force-wsc" % "40.0.0",
- "com.force.api" % "force-partner-api" % "40.0.0",
- "com.springml" % "salesforce-wave-api" % "1.0.10",
+ "com.force.api" % "force-wsc" % "52.2.0",
+ "com.force.api" % "force-partner-api" % "52.2.0",
+ "com.springml" % "salesforce-wave-api" % "1.0.8-uber-2",
"org.mockito" % "mockito-core" % "2.0.31-beta"
)
@@ -27,7 +28,6 @@ resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositori
resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
-resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.3" % "test"
libraryDependencies += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.4.4"
@@ -53,7 +53,7 @@ spDescription := """Spark Salesforce Wave Connector
| - Constructs Salesforce Wave dataset's metadata using schema present in dataframe
| - Can use custom metadata for constructing Salesforce Wave dataset's metadata""".stripMargin
-// licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")
+licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")
credentials += Credentials(Path.userHome / ".ivy2" / ".credentials")
@@ -86,5 +86,3 @@ pomExtra := (
http://www.springml.com
)
-
-
diff --git a/project/plugins.sbt b/project/plugins.sbt
index f996987..2b32f36 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,4 +1,5 @@
-resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
+// resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
+resolvers += "bintray-spark-packages" at "https://repos.spark-packages.org"
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.6")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
diff --git a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala
index 49196d1..a325854 100644
--- a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala
+++ b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala
@@ -38,6 +38,9 @@ case class BulkRelation(
maxCharsPerColumn: Int) extends BaseRelation with TableScan {
import sqlContext.sparkSession.implicits._
+ import scala.collection.JavaConversions._
+
+ @transient lazy val logger: Logger = Logger.getLogger(classOf[BulkRelation])
def buildScan() = records.rdd
@@ -51,57 +54,165 @@ case class BulkRelation(
}
lazy val records: DataFrame = {
- val inputJobInfo = new JobInfo("CSV", sfObject, "query")
+ val inputJobInfo = new JobInfo("CSV", sfObject, "queryAll")
val jobInfo = bulkAPI.createJob(inputJobInfo, customHeaders.asJava)
val jobId = jobInfo.getId
+ logger.error(">>> Obtained jobId: " + jobId)
val batchInfo = bulkAPI.addBatch(jobId, query)
+ logger.error(">>> Obtained batchInfo: " + batchInfo)
if (awaitJobCompleted(jobId)) {
bulkAPI.closeJob(jobId)
val batchInfoList = bulkAPI.getBatchInfoList(jobId)
val batchInfos = batchInfoList.getBatchInfo().asScala.toList
+
+ logger.error(">>> Obtained batchInfos: " + batchInfos)
+ logger.error(">>>>>> Obtained batchInfos.size: " + batchInfos.size)
+
val completedBatchInfos = batchInfos.filter(batchInfo => batchInfo.getState().equals("Completed"))
val completedBatchInfoIds = completedBatchInfos.map(batchInfo => batchInfo.getId)
- val fetchBatchInfo = (batchInfoId: String) => {
- val resultIds = bulkAPI.getBatchResultIds(jobId, batchInfoId)
+ logger.error(">>> Obtained completedBatchInfoIds: " + completedBatchInfoIds)
+ logger.error(">>> Obtained completedBatchInfoIds.size: " + completedBatchInfoIds.size)
- val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultIds.get(resultIds.size() - 1))
+ def splitCsvByRows(csvString: String): Seq[String] = {
+ // The CsvParser interface only interacts with IO, so StringReader and StringWriter
+ val inputReader = new StringReader(csvString)
- // Use Csv parser to split CSV by rows to cover edge cases (ex. escaped characters, new line within string, etc)
- def splitCsvByRows(csvString: String): Seq[String] = {
- // The CsvParser interface only interacts with IO, so StringReader and StringWriter
- val inputReader = new StringReader(csvString)
+ val parserSettings = new CsvParserSettings()
+ parserSettings.setLineSeparatorDetectionEnabled(true)
+ parserSettings.getFormat.setNormalizedNewline(' ')
+ parserSettings.setMaxCharsPerColumn(maxCharsPerColumn)
+
+ val readerParser = new CsvParser(parserSettings)
+ val parsedInput = readerParser.parseAll(inputReader).asScala
+
+ val outputWriter = new StringWriter()
+
+ val writerSettings = new CsvWriterSettings()
+ writerSettings.setQuoteAllFields(true)
+ writerSettings.setQuoteEscapingEnabled(true)
+
+ val writer = new CsvWriter(outputWriter, writerSettings)
+ parsedInput.foreach {
+ writer.writeRow(_)
+ }
+
+ outputWriter.toString.lines.toList
+ }
+
+ val fetchAllResults = (resultId: String, batchInfoId: String) => {
+ logger.error("Getting Result for ResultId: " + resultId)
+ val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultId)
+
+ val splitRows = splitCsvByRows(result)
- val parserSettings = new CsvParserSettings()
- parserSettings.setLineSeparatorDetectionEnabled(true)
- parserSettings.getFormat.setNormalizedNewline(' ')
- parserSettings.setMaxCharsPerColumn(maxCharsPerColumn)
+ logger.error("Result Rows size: " + splitRows.size)
+ logger.error("Result Row - first: " + (if (splitRows.size > 0) splitRows.head else "not found"))
- val readerParser = new CsvParser(parserSettings)
- val parsedInput = readerParser.parseAll(inputReader).asScala
+ splitRows
+ }
+
+ val fetchBatchInfo = (batchInfoId: String) => {
+ logger.error(">>> About to fetch Results in batchInfoId: " + batchInfoId)
- val outputWriter = new StringWriter()
+ val resultIds = bulkAPI.getBatchResultIds(jobId, batchInfoId)
+ logger.error(">>> Got ResultsIds in batchInfoId: " + resultIds)
+ logger.error(">>> Got ResultsIds in batchInfoId.size: " + resultIds.size)
+ logger.error(">>> Got ResultsIds in Last Result Id: " + resultIds.get(resultIds.size() - 1))
- val writerSettings = new CsvWriterSettings()
- writerSettings.setQuoteAllFields(true)
- writerSettings.setQuoteEscapingEnabled(true)
+// val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultIds.get(resultIds.size() - 1))
- val writer = new CsvWriter(outputWriter, writerSettings)
- parsedInput.foreach { writer.writeRow(_) }
+// logger.error(">>> Got Results - Results (string) length: " + result.length)
- outputWriter.toString.lines.toList
+ // Use Csv parser to split CSV by rows to cover edge cases (ex. escaped characters, new line within string, etc)
+// def splitCsvByRows(csvString: String): Seq[String] = {
+// // The CsvParser interface only interacts with IO, so StringReader and StringWriter
+// val inputReader = new StringReader(csvString)
+//
+// val parserSettings = new CsvParserSettings()
+// parserSettings.setLineSeparatorDetectionEnabled(true)
+// parserSettings.getFormat.setNormalizedNewline(' ')
+// parserSettings.setMaxCharsPerColumn(maxCharsPerColumn)
+//
+// val readerParser = new CsvParser(parserSettings)
+// val parsedInput = readerParser.parseAll(inputReader).asScala
+//
+// val outputWriter = new StringWriter()
+//
+// val writerSettings = new CsvWriterSettings()
+// writerSettings.setQuoteAllFields(true)
+// writerSettings.setQuoteEscapingEnabled(true)
+//
+// val writer = new CsvWriter(outputWriter, writerSettings)
+// parsedInput.foreach { writer.writeRow(_) }
+//
+// outputWriter.toString.lines.toList
+// }
+
+ val resultIdsBatchInfoIdPairs: List[(String, String)] = resultIds.toList.map { resultId: String => {
+ (resultId, batchInfoId)
+ }}
+
+ // AS addition - START
+// val allRows: Seq[String] = resultIds.toList.flatMap { resultId: String => {
+// logger.error("Getting Result for ResultId: " + resultId)
+// val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultId)
+//
+// val splitRows = splitCsvByRows(result)
+//
+// logger.error("Result Rows size: " + splitRows.size)
+// logger.error("Result Row - first: " + (if (splitRows.size > 0) splitRows.head else "not found"))
+//
+// splitRows
+// }}
+
+ val allRows: Seq[String] = resultIdsBatchInfoIdPairs.flatMap { case(resultId, batchInfoId) =>
+ fetchAllResults(resultId, batchInfoId)
}
- splitCsvByRows(result)
+ allRows
+ // AS Addition - END
+
+// val splitRows = splitCsvByRows(result)
+// logger.error("Result Rows size: " + splitRows.size)
+// logger.error("Result Row - first: " + (if (splitRows.size > 0) splitRows.head else "not found"))
+// splitRows
+
+ }
+
+ // AS addition - START
+ val csvData: Dataset[String] = if (completedBatchInfoIds.size == 1) {
+ val resultIds = bulkAPI.getBatchResultIds(jobId, completedBatchInfoIds.head)
+
+ val resultIdsCompletedBatchInfoIdPairs: List[(String, String)] = resultIds.toList.map { resultId: String => {
+ (resultId, completedBatchInfoIds.head)
+ }}
+
+ logger.error(">>>> Will Parallelize Result IDs, CBatchInfoId: " + resultIdsCompletedBatchInfoIdPairs)
+
+ sqlContext
+ .sparkContext
+ .parallelize(resultIdsCompletedBatchInfoIdPairs)
+ .flatMap { case (resultId, batchInfoId) =>
+ fetchAllResults(resultId, batchInfoId)
+ }.toDS()
+ } else {
+ logger.error(">>>> Will Parallelize CompletedBatchInfoIds: " + completedBatchInfoIds)
+
+ sqlContext
+ .sparkContext
+ .parallelize(completedBatchInfoIds)
+ .flatMap(fetchBatchInfo).toDS()
}
+ // AS addition - END
- val csvData = sqlContext
- .sparkContext
- .parallelize(completedBatchInfoIds)
- .flatMap(fetchBatchInfo).toDS()
+// val csvData = sqlContext
+// .sparkContext
+// .parallelize(completedBatchInfoIds)
+// .flatMap(fetchBatchInfo).toDS()
sqlContext
.sparkSession
diff --git a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala
index f019814..a8905b5 100644
--- a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala
+++ b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala
@@ -189,6 +189,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
if (sfObject.isEmpty) {
throw new Exception("sfObject must not be empty when performing bulk query")
}
+ logger.info("createBulkRelation :: sfObject: " + sfObject)
val maxCharsPerColumnStr = parameters.getOrElse("maxCharsPerColumn", "4096")
val maxCharsPerColumn = try {
@@ -196,6 +197,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
} catch {
case e: Exception => throw new Exception("maxCharsPerColumn must be a valid integer")
}
+ logger.info("createBulkRelation :: maxCharsPerColumn: " + maxCharsPerColumn)
val timeoutStr = parameters.getOrElse("timeout", "600000")
val timeout = try {
@@ -203,13 +205,16 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
} catch {
case e: Exception => throw new Exception("timeout must be a valid integer")
}
+ logger.info("createBulkRelation :: timeout: " + timeout)
var customHeaders = ListBuffer[Header]()
val pkChunkingStr = parameters.getOrElse("pkChunking", "false")
val pkChunking = flag(pkChunkingStr, "pkChunkingStr")
+ logger.info("createBulkRelation :: pkChunking: " + pkChunking)
if (pkChunking) {
val chunkSize = parameters.get("chunkSize")
+ logger.info("createBulkRelation :: chunkSize: " + chunkSize)
if (!chunkSize.isEmpty) {
try {
@@ -219,6 +224,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
case e: Exception => throw new Exception("chunkSize must be a valid integer")
}
customHeaders += new BasicHeader("Sforce-Enable-PKChunking", s"chunkSize=${chunkSize.get}")
+// customHeaders += new BasicHeader("Sforce-Enable-PKChunking", s"chunkSize=${chunkSize.get}; parent=Account")
} else {
customHeaders += new BasicHeader("Sforce-Enable-PKChunking", "true")
}