diff --git a/README.md b/README.md index 503b114..79d1108 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ You can link against this library in your program at the following ways: 1.1.3 ``` +or _2.12 for Scala 2.12 ### SBT Dependency ``` diff --git a/build.sbt b/build.sbt index 9dba6e0..9b14930 100644 --- a/build.sbt +++ b/build.sbt @@ -8,12 +8,13 @@ scalaVersion := "2.12.10" crossScalaVersions := Seq("2.11.12", "2.12.10") +resolvers += Resolver.mavenLocal resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" libraryDependencies ++= Seq( - "com.force.api" % "force-wsc" % "40.0.0", - "com.force.api" % "force-partner-api" % "40.0.0", - "com.springml" % "salesforce-wave-api" % "1.0.10", + "com.force.api" % "force-wsc" % "52.2.0", + "com.force.api" % "force-partner-api" % "52.2.0", + "com.springml" % "salesforce-wave-api" % "1.0.8-uber-2", "org.mockito" % "mockito-core" % "2.0.31-beta" ) @@ -27,7 +28,6 @@ resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositori resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" -resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven" libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.3" % "test" libraryDependencies += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.4.4" @@ -53,7 +53,7 @@ spDescription := """Spark Salesforce Wave Connector | - Constructs Salesforce Wave dataset's metadata using schema present in dataframe | - Can use custom metadata for constructing Salesforce Wave dataset's metadata""".stripMargin -// licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0") +licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0") credentials += Credentials(Path.userHome / ".ivy2" / ".credentials") @@ -86,5 +86,3 @@ pomExtra := ( http://www.springml.com ) - - diff --git a/project/plugins.sbt b/project/plugins.sbt index f996987..2b32f36 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,5 @@ -resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/" +// resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/" +resolvers += "bintray-spark-packages" at "https://repos.spark-packages.org" addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.6") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") diff --git a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala index 49196d1..a325854 100644 --- a/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala +++ b/src/main/scala/com/springml/spark/salesforce/BulkRelation.scala @@ -38,6 +38,9 @@ case class BulkRelation( maxCharsPerColumn: Int) extends BaseRelation with TableScan { import sqlContext.sparkSession.implicits._ + import scala.collection.JavaConversions._ + + @transient lazy val logger: Logger = Logger.getLogger(classOf[BulkRelation]) def buildScan() = records.rdd @@ -51,57 +54,165 @@ case class BulkRelation( } lazy val records: DataFrame = { - val inputJobInfo = new JobInfo("CSV", sfObject, "query") + val inputJobInfo = new JobInfo("CSV", sfObject, "queryAll") val jobInfo = bulkAPI.createJob(inputJobInfo, customHeaders.asJava) val jobId = jobInfo.getId + logger.error(">>> Obtained jobId: " + jobId) val batchInfo = bulkAPI.addBatch(jobId, query) + logger.error(">>> Obtained batchInfo: " + batchInfo) if (awaitJobCompleted(jobId)) { bulkAPI.closeJob(jobId) val batchInfoList = bulkAPI.getBatchInfoList(jobId) val batchInfos = batchInfoList.getBatchInfo().asScala.toList + + logger.error(">>> Obtained batchInfos: " + batchInfos) + logger.error(">>>>>> Obtained batchInfos.size: " + batchInfos.size) + val completedBatchInfos = batchInfos.filter(batchInfo => batchInfo.getState().equals("Completed")) val completedBatchInfoIds = completedBatchInfos.map(batchInfo => batchInfo.getId) - val fetchBatchInfo = (batchInfoId: String) => { - val resultIds = bulkAPI.getBatchResultIds(jobId, batchInfoId) + logger.error(">>> Obtained completedBatchInfoIds: " + completedBatchInfoIds) + logger.error(">>> Obtained completedBatchInfoIds.size: " + completedBatchInfoIds.size) - val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultIds.get(resultIds.size() - 1)) + def splitCsvByRows(csvString: String): Seq[String] = { + // The CsvParser interface only interacts with IO, so StringReader and StringWriter + val inputReader = new StringReader(csvString) - // Use Csv parser to split CSV by rows to cover edge cases (ex. escaped characters, new line within string, etc) - def splitCsvByRows(csvString: String): Seq[String] = { - // The CsvParser interface only interacts with IO, so StringReader and StringWriter - val inputReader = new StringReader(csvString) + val parserSettings = new CsvParserSettings() + parserSettings.setLineSeparatorDetectionEnabled(true) + parserSettings.getFormat.setNormalizedNewline(' ') + parserSettings.setMaxCharsPerColumn(maxCharsPerColumn) + + val readerParser = new CsvParser(parserSettings) + val parsedInput = readerParser.parseAll(inputReader).asScala + + val outputWriter = new StringWriter() + + val writerSettings = new CsvWriterSettings() + writerSettings.setQuoteAllFields(true) + writerSettings.setQuoteEscapingEnabled(true) + + val writer = new CsvWriter(outputWriter, writerSettings) + parsedInput.foreach { + writer.writeRow(_) + } + + outputWriter.toString.lines.toList + } + + val fetchAllResults = (resultId: String, batchInfoId: String) => { + logger.error("Getting Result for ResultId: " + resultId) + val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultId) + + val splitRows = splitCsvByRows(result) - val parserSettings = new CsvParserSettings() - parserSettings.setLineSeparatorDetectionEnabled(true) - parserSettings.getFormat.setNormalizedNewline(' ') - parserSettings.setMaxCharsPerColumn(maxCharsPerColumn) + logger.error("Result Rows size: " + splitRows.size) + logger.error("Result Row - first: " + (if (splitRows.size > 0) splitRows.head else "not found")) - val readerParser = new CsvParser(parserSettings) - val parsedInput = readerParser.parseAll(inputReader).asScala + splitRows + } + + val fetchBatchInfo = (batchInfoId: String) => { + logger.error(">>> About to fetch Results in batchInfoId: " + batchInfoId) - val outputWriter = new StringWriter() + val resultIds = bulkAPI.getBatchResultIds(jobId, batchInfoId) + logger.error(">>> Got ResultsIds in batchInfoId: " + resultIds) + logger.error(">>> Got ResultsIds in batchInfoId.size: " + resultIds.size) + logger.error(">>> Got ResultsIds in Last Result Id: " + resultIds.get(resultIds.size() - 1)) - val writerSettings = new CsvWriterSettings() - writerSettings.setQuoteAllFields(true) - writerSettings.setQuoteEscapingEnabled(true) +// val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultIds.get(resultIds.size() - 1)) - val writer = new CsvWriter(outputWriter, writerSettings) - parsedInput.foreach { writer.writeRow(_) } +// logger.error(">>> Got Results - Results (string) length: " + result.length) - outputWriter.toString.lines.toList + // Use Csv parser to split CSV by rows to cover edge cases (ex. escaped characters, new line within string, etc) +// def splitCsvByRows(csvString: String): Seq[String] = { +// // The CsvParser interface only interacts with IO, so StringReader and StringWriter +// val inputReader = new StringReader(csvString) +// +// val parserSettings = new CsvParserSettings() +// parserSettings.setLineSeparatorDetectionEnabled(true) +// parserSettings.getFormat.setNormalizedNewline(' ') +// parserSettings.setMaxCharsPerColumn(maxCharsPerColumn) +// +// val readerParser = new CsvParser(parserSettings) +// val parsedInput = readerParser.parseAll(inputReader).asScala +// +// val outputWriter = new StringWriter() +// +// val writerSettings = new CsvWriterSettings() +// writerSettings.setQuoteAllFields(true) +// writerSettings.setQuoteEscapingEnabled(true) +// +// val writer = new CsvWriter(outputWriter, writerSettings) +// parsedInput.foreach { writer.writeRow(_) } +// +// outputWriter.toString.lines.toList +// } + + val resultIdsBatchInfoIdPairs: List[(String, String)] = resultIds.toList.map { resultId: String => { + (resultId, batchInfoId) + }} + + // AS addition - START +// val allRows: Seq[String] = resultIds.toList.flatMap { resultId: String => { +// logger.error("Getting Result for ResultId: " + resultId) +// val result = bulkAPI.getBatchResult(jobId, batchInfoId, resultId) +// +// val splitRows = splitCsvByRows(result) +// +// logger.error("Result Rows size: " + splitRows.size) +// logger.error("Result Row - first: " + (if (splitRows.size > 0) splitRows.head else "not found")) +// +// splitRows +// }} + + val allRows: Seq[String] = resultIdsBatchInfoIdPairs.flatMap { case(resultId, batchInfoId) => + fetchAllResults(resultId, batchInfoId) } - splitCsvByRows(result) + allRows + // AS Addition - END + +// val splitRows = splitCsvByRows(result) +// logger.error("Result Rows size: " + splitRows.size) +// logger.error("Result Row - first: " + (if (splitRows.size > 0) splitRows.head else "not found")) +// splitRows + + } + + // AS addition - START + val csvData: Dataset[String] = if (completedBatchInfoIds.size == 1) { + val resultIds = bulkAPI.getBatchResultIds(jobId, completedBatchInfoIds.head) + + val resultIdsCompletedBatchInfoIdPairs: List[(String, String)] = resultIds.toList.map { resultId: String => { + (resultId, completedBatchInfoIds.head) + }} + + logger.error(">>>> Will Parallelize Result IDs, CBatchInfoId: " + resultIdsCompletedBatchInfoIdPairs) + + sqlContext + .sparkContext + .parallelize(resultIdsCompletedBatchInfoIdPairs) + .flatMap { case (resultId, batchInfoId) => + fetchAllResults(resultId, batchInfoId) + }.toDS() + } else { + logger.error(">>>> Will Parallelize CompletedBatchInfoIds: " + completedBatchInfoIds) + + sqlContext + .sparkContext + .parallelize(completedBatchInfoIds) + .flatMap(fetchBatchInfo).toDS() } + // AS addition - END - val csvData = sqlContext - .sparkContext - .parallelize(completedBatchInfoIds) - .flatMap(fetchBatchInfo).toDS() +// val csvData = sqlContext +// .sparkContext +// .parallelize(completedBatchInfoIds) +// .flatMap(fetchBatchInfo).toDS() sqlContext .sparkSession diff --git a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala index f019814..a8905b5 100644 --- a/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala +++ b/src/main/scala/com/springml/spark/salesforce/DefaultSource.scala @@ -189,6 +189,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr if (sfObject.isEmpty) { throw new Exception("sfObject must not be empty when performing bulk query") } + logger.info("createBulkRelation :: sfObject: " + sfObject) val maxCharsPerColumnStr = parameters.getOrElse("maxCharsPerColumn", "4096") val maxCharsPerColumn = try { @@ -196,6 +197,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr } catch { case e: Exception => throw new Exception("maxCharsPerColumn must be a valid integer") } + logger.info("createBulkRelation :: maxCharsPerColumn: " + maxCharsPerColumn) val timeoutStr = parameters.getOrElse("timeout", "600000") val timeout = try { @@ -203,13 +205,16 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr } catch { case e: Exception => throw new Exception("timeout must be a valid integer") } + logger.info("createBulkRelation :: timeout: " + timeout) var customHeaders = ListBuffer[Header]() val pkChunkingStr = parameters.getOrElse("pkChunking", "false") val pkChunking = flag(pkChunkingStr, "pkChunkingStr") + logger.info("createBulkRelation :: pkChunking: " + pkChunking) if (pkChunking) { val chunkSize = parameters.get("chunkSize") + logger.info("createBulkRelation :: chunkSize: " + chunkSize) if (!chunkSize.isEmpty) { try { @@ -219,6 +224,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr case e: Exception => throw new Exception("chunkSize must be a valid integer") } customHeaders += new BasicHeader("Sforce-Enable-PKChunking", s"chunkSize=${chunkSize.get}") +// customHeaders += new BasicHeader("Sforce-Enable-PKChunking", s"chunkSize=${chunkSize.get}; parent=Account") } else { customHeaders += new BasicHeader("Sforce-Enable-PKChunking", "true") }