Skip to content

Commit

Permalink
Clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
nwrs committed Oct 19, 2018
1 parent ae6bec1 commit 61aac97
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 27 deletions.
62 changes: 53 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,7 @@
<artifactId>hdfs-parquet-importer</artifactId>
<version>1.0-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>My wonderfull scala app</description>
<inceptionYear>2018</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down Expand Up @@ -49,7 +41,6 @@
<!--<scope>provided</scope>-->
</dependency>


<!-- Test -->
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -130,6 +121,59 @@
</execution>
</executions>
</plugin>


<!--
TODO
If the following error is seen when running from a shared jar:
Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
Then run the following to remove the files:
zip -d hdfs-parquet-importer-1.0-SNAPSHOT-packaged.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>packaged</shadedClassifierName>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<!-- required to merge akka config files -->
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>com.github.nwrs.parquet.importer.App</Main-Class>
</manifestEntries>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>
</project>
25 changes: 15 additions & 10 deletions src/main/scala/com/github/nwrs/parquet/importer/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ object App {
val schemaFile = opt[String]("schemaFile", descr = "Schema file", short = 'f', argName="/path/to/file.schema")
val dateEnrich = opt[String]("dateEnrich", descr = "Enrich string formatted date/time col to a date/year/month columns to allow smarter partitioning", short = 'e', argName="date_time src col")
val partitionCols = opt[String]("partitionCols", descr = "Partition columns", short = 'p', argName="column,column,...")
val sortCol = opt[String]("sortCol", descr = "Sort column", short = 'o', argName="column")
val sortCols = opt[String]("sortCols", descr = "Sort columns", short = 'o', argName="column")
val slashEscapes = opt[Boolean]("slashEscapes", descr = """Use '\"' as an escape character instead of '""' to denote quotes within a quote""", short = 'q')
val delimeter = opt[String]("delimeter", descr = "CSV delimeter character, default is ','", short = 'l', default = Some(","))
val sparkThreads = opt[String]("sparkThreads", descr = "Numbner of Spark threads, default is # processors", short = 't', default = Some("*"))
val twitterCleanse = opt[Boolean]("twitterCleanse", descr = "Remove corrupted rows in Twitter sourced CSV files", short = 'w')
verify()
}

// TOO Should more spark settings be configurable here?
implicit val sc = SparkSession.builder.master(s"local[${opts.sparkThreads}]").getOrCreate()
implicit val sc = SparkSession.builder.master(s"local[${opts.sparkThreads()}]").getOrCreate()

// create csv reader
val reader = sc.read
Expand All @@ -43,19 +43,24 @@ object App {
// populate schema from config file if configured
if (opts.schemaFile.isDefined) reader.schema(createSchema(opts.schemaFile()))

//read file
// TODO remove suspect rows workaround !
val df = filterOutSuspectRows(reader.csv(opts.srcFile()))
// read file
val df = reader.csv(opts.srcFile())

// Cleanse any corrupted rows that can occur in some Twitter sourced datasets
val cleansed = if (opts.twitterCleanse.isDefined)
filterOutSuspectTwitterRows(df)
else
df

// enrich with expanded date fields if required
val enriched = if(opts.dateEnrich.isDefined)
dateEnrichFromDateTimeStr(opts.dateEnrich(), df)
dateEnrichFromDateTimeStr(opts.dateEnrich(), cleansed)
else
df
cleansed

// sort as required
val sorted = if (opts.sortCol.isDefined) {
enriched.sort(opts.sortCol().split(",").map(_.trim).map(enriched(_)) :_*)
val sorted = if (opts.sortCols.isDefined) {
enriched.sort(opts.sortCols().split(",").map(_.trim).map(enriched(_)) :_*)
} else
enriched

Expand Down
38 changes: 30 additions & 8 deletions src/main/scala/com/github/nwrs/parquet/importer/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,38 @@ import scala.io.Source

package object importer {

/**
* Populate a spark schema using a config file of the format columnName=Type
* Order of fields in config file must strictly match CSV file.
* @param file Path to schema file
* @return StructType schema
*/
def createSchema(file:String):StructType = {
val fields = Source.fromFile(file)
.getLines()
.filter(!_.startsWith("#"))
.map(_.split("="))
.map( s => (s(0).trim,s(1).trim))
.map( e => e._2 match {
case "String" => new ColumnName(s"${e._1}").string
case "Long" => new ColumnName(s"${e._1}").long
case "Double" => new ColumnName(s"${e._1}").double
case "Boolean" => new ColumnName(s"${e._1}").boolean
// TODO more types as required...
case "String" => new ColumnName(e._1).string
case "Long" => new ColumnName(e._1).long
case "Int" => new ColumnName(e._1).int
case "Double" => new ColumnName(e._1).double
case "Float" => new ColumnName(e._1).float
case "Boolean" => new ColumnName(e._1).boolean
// Allow match error for unsupported types
// TODO more types as required ?
})
StructType(fields.toSeq)
}

// TODO scala doc and comment me please and hide in a utility class :-)
/**
* Enriches a dataframe with date, year and column fields to allow for smarter Parquet partitioning
* @param dateTimeCol A date time string of format "yyyy-mm-dd hh:mm"
* @param df Dataframe
* @param sc Implicit spark session
* @return enriched dataframe
*/
def dateEnrichFromDateTimeStr(dateTimeCol:String, df:DataFrame)(implicit sc:SparkSession):DataFrame= {
val dateTimeRegEx = """([0-9]{4})-([0-9]{2})-([0-9]{2}) ([0-9]{2}):([0-9]{2})""".r
sc.sqlContext.udf.register("extract_date", (dateTime: String) => if (dateTime !=null) dateTime.split(" ")(0) else "")
Expand All @@ -51,8 +66,15 @@ package object importer {
.withColumn("month", callUDF("extract_month", col(dateTimeCol)))
}

// filter out suspect rows - weird hack to get around rare problem in twitter datasets, only used manually
def filterOutSuspectRows(df: DataFrame)(implicit sc:SparkSession):DataFrame = {
/**
* Filter out suspect rows, a workaround for occasional (~ 1/100000) corrupt rows in twitter datasets that can break the parquet export
* Issues may be be related to double quote escaping at the start of non-latin character-set tweets?
* N.B. Only usable for datasets using the specific Twitter schema!
* @param df Dataframe to cleanse
* @param sc Implicit Spark session
* @return Cleansed dataframe
*/
def filterOutSuspectTwitterRows(df: DataFrame)(implicit sc:SparkSession):DataFrame = {
val pattern = """([0-9]{4})-([0-9]{2})-([0-9]{2}) ([0-9]{2}):([0-9]{2})""".r
import sc.implicits._
val badTweetIds = df.select("tweetid", "tweet_time").map { r =>
Expand Down

0 comments on commit 61aac97

Please sign in to comment.