Skip to content

Commit

Permalink
feat(fixed mongo loader date issues) (#479)
Browse files Browse the repository at this point in the history
Co-authored-by: Golan Kiviti <[email protected]>
  • Loading branch information
golankiviti and Golan Kiviti authored Apr 20, 2022
1 parent 6636c9e commit cd0d99f
Showing 1 changed file with 70 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package com.yotpo.metorikku.input.readers.mongodb
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import com.yotpo.metorikku.input.Reader
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import com.yotpo.metorikku.input.readers.file.SchemaConverter
import com.yotpo.metorikku.utils.FileUtils
import com.mongodb.spark.sql._
import com.yotpo.metorikku.input.readers.mongodb.MongoDBInput.buildDf
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import com.yotpo.metorikku.input.readers.mongodb.MongoDBInput.sanitizeRow
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType, TimestampType}

import scala.collection.mutable

case class MongoDBInput(name: String,
Expand Down Expand Up @@ -39,11 +41,14 @@ case class MongoDBInput(name: String,
case None => sparkSession.loadFromMongoDB(ReadConfig(mongoDBOptions)).schema
}

buildDf(sparkSession, mongoDBOptions, schema)
val df = buildDf(sparkSession, mongoDBOptions, schema)
sparkSession.createDataFrame(df.rdd.map(row => sanitizeRow(row)), df.schema)
}
}

object MongoDBInput {
val BSONRegex = """\{ ?\"\$.*\" ?: ?\"?(.*?)\"? ?\}""".r

private def buildDf(sparkSession: SparkSession, options: Map[String, String], schema: StructType): DataFrame = {
MongoSpark
.builder()
Expand Down Expand Up @@ -82,4 +87,66 @@ object MongoDBInput {
private def isSingleField(struct : StructType): Boolean = {
struct.fields.size == 1
}

private def sanitizeRow(row : Row): Row = {
val sanitizedRow = mutable.ArrayBuffer[Any]()

for (i <- 0 to row.size - 1) {
row.get(i) match {
case strValue: String => sanitizedRow += sanitizeObject(strValue)
case subRow: Row => {
row.schema(i).dataType match {
case struct : StructType => {
if (isSingleField(struct)) {
sanitizedRow += sanitizeObject(subRow.get(0))
} else {
sanitizedRow += sanitizeRow(subRow)
}
}
case _ => sanitizedRow += sanitizeObject(subRow.get(0))
}
}
case array : Seq[Any] => {
if (array.isEmpty) {
sanitizedRow += array
}
else {
array(0) match {
case _: Row => {
val sanitizedSubRowArray = mutable.ArrayBuffer[Any]()
array.asInstanceOf[Seq[Row]].foreach(innerRow => {
sanitizedSubRowArray += sanitizeRow(innerRow)
})
sanitizedRow += sanitizedSubRowArray
}
case _ => {
val sanitizedStringsArray = mutable.ArrayBuffer[Any]()
array.foreach(el => {
sanitizedStringsArray += sanitizeObject(el)
})
sanitizedRow += sanitizedStringsArray
}
}
}
}
case default =>
sanitizedRow += sanitizeObject(default)
}
}
Row.fromSeq(sanitizedRow)
}

private def sanitizeObject(obj : Any): String = {
if (obj == null) {
// scalastyle:off null
return null
// scalastyle:on null
}

val str = obj.toString
str match {
case BSONRegex(innerValue) => innerValue
case default => default
}
}
}

0 comments on commit cd0d99f

Please sign in to comment.