Skip to content

Commit f625e9d

Browse files
committed
Using dataframe to write JSON
1 parent 3d38428 commit f625e9d

File tree

12 files changed

+342
-43
lines changed

12 files changed

+342
-43
lines changed

Diff for: build.sbt

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ lazy val core = (project in file(".")).
3030
mainClass in assembly := Some("edu.vanderbilt.accre.stackex.StackExApp"),
3131
libraryDependencies ++= Seq(
3232
"org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
33-
"org.apache.spark" %% "spark-sql" % "1.6.2" % "provided"
33+
"org.apache.spark" %% "spark-sql" % "1.6.2" % "provided",
34+
"net.sourceforge.htmlcleaner" % "htmlcleaner" % "2.18",
35+
"com.databricks" %% "spark-xml" % "0.4.1"
3436
)
3537
).
3638
dependsOn(util)

Diff for: run_spark.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
#!/bin/bash
22

33
if [ $# -ne 0 ]; then
4-
echo $0: "usage: ./run_spark.sh input"
4+
echo $0: "usage: ./run_spark.sh"
55
exit 1
66
fi
77

88
echo $SPARK_HOME
99

10-
input1=src/main/resources/Posts.xml
11-
output=output
10+
input1=src/test/resources/Posts.xml
11+
output=output/$(date "+%Y-%m-%d_%H.%M.%S")
1212

1313
echo Reading input from $input1
1414
echo Writing output to $output
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package edu.vanderbilt.accre.stackex
2+
3+
import scala.xml.{NodeSeq, XML}
4+
import scala.util.Try
5+
6+
/**
7+
* Created by arnold-jr on 12/22/16.
8+
*/
9+
case class Post (id: Int,
10+
postTypeId: Int,
11+
body: String,
12+
score: Int,
13+
tags: List[String])
14+
15+
object Post {
16+
17+
val fieldNames = List("Id", "PostTypeId", "Body", "Score", "Tags")
18+
19+
/**
20+
* Parses a possibly malformed XML string and returns an XML element
21+
*
22+
* @param line
23+
* @return XML Node
24+
*
25+
*/
26+
private def loadString(line: String): NodeSeq =
27+
Try(XML.loadString(line)) getOrElse NodeSeq.Empty
28+
29+
30+
31+
/**
32+
* Gets attribute values from an XML NodeSeq
33+
*
34+
* @param elem XML.NodeSeq constructed from "row" tag
35+
* @param attr attribute identifier
36+
* @return String with attribute payload
37+
*/
38+
def getAttribute(elem: NodeSeq)(attr: String) = (elem \ ("@" + attr)).text
39+
40+
def apply(line: String)() = {
41+
val elem = loadString(line)
42+
val getAttr = getAttribute(elem)(_)
43+
44+
new Post(getInt(getAttr("Id")),
45+
getInt(getAttr("PostTypeId")),
46+
getTextFromHtml(getAttr("Body")),
47+
getInt(getAttr("Score")),
48+
getTags(getAttr("Tags"))
49+
)
50+
}
51+
52+
}

Diff for: src/main/scala-2.10/edu/vanderbilt/accre/stackex/StackExApp.scala

+45-13
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
/** StackExApp.scala
2-
* Created by arnold-jr on 11/8/16.
3-
*/
41
package edu.vanderbilt.accre.stackex
52

63
import edu.vanderbilt.accre.xmltojson.XMLToJSONConverter
4+
import org.apache.spark.sql.types.{StringType, StructField, StructType}
75
import org.apache.spark.{SparkConf, SparkContext}
6+
import org.apache.spark.sql.SQLContext
87

98

9+
/** StackExApp.scala
10+
* Created by arnold-jr on 11/8/16.
11+
*/
12+
1013
object StackExApp {
1114

1215
def parseArgs(args: Array[String]) = {
@@ -25,24 +28,53 @@ object StackExApp {
2528

2629
val conf = new SparkConf()
2730
.setAppName("Stack-Ex Application")
31+
val sc = new SparkContext(conf)
2832

2933

30-
val sc = new SparkContext(conf)
34+
def xmlToCustomJson() = {
35+
// Creates a new RDD with one XML element per line
36+
val postsXML = sc.textFile(postsFile)
37+
38+
// Specifies how to convert the data
39+
val attributeMapper = Map(
40+
"Body" -> getTextFromHtml,
41+
"Tags" -> getTags
42+
)
3143

32-
val postsXML = sc.textFile(postsFile)
44+
val converter = XMLToJSONConverter(attributeMapper)
3345

34-
val fString = (s: String) => s
35-
val converter = XMLToJSONConverter(Map("Body" -> fString))
46+
val postsJSON = postsXML
47+
.map(line => converter.xmlToJson(line))
3648

37-
val postsJSON = postsXML
38-
.map(line => converter.xmlToJson(line))
49+
if (true) {
50+
(postsJSON take 10) foreach println
51+
}
3952

40-
if (true) {
41-
(postsJSON take 10) foreach println
53+
postsJSON.saveAsTextFile(outputFile)
54+
55+
}
56+
57+
58+
def writeXMLToJSON() = {
59+
val sqlContext = new SQLContext(sc)
60+
61+
import sqlContext.implicits._
62+
63+
// Creates a new DataFrame with one XML element per line
64+
val df = sc.textFile(postsFile)
65+
.map(line => Post(line))
66+
.filter(p => p.id != Int.MinValue)
67+
.toDF(Post.fieldNames: _*)
68+
69+
val postsJSON = df.toJSON
70+
71+
postsJSON take 5 foreach println
72+
73+
postsJSON.saveAsTextFile(outputFile)
4274
}
4375

44-
// Writes output to file
45-
postsJSON.saveAsTextFile(outputFile)
76+
writeXMLToJSON()
77+
4678

4779
sc.stop()
4880

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package edu.vanderbilt.accre
2+
3+
import org.htmlcleaner.{HtmlCleaner, HtmlNode, TagNode, TagNodeVisitor}
4+
import org.apache.commons.lang.StringEscapeUtils.escapeHtml
5+
6+
import scala.util.Try
7+
8+
/**
9+
* Created by arnold-jr on 12/22/16.
10+
*/
11+
package object stackex {
12+
13+
val tagNodeVisitor = new TagNodeVisitor {
14+
15+
val includedTags = List("b", "blockquote", "dl", "dt", "em", "h1", "h2",
16+
"h3", "i", "li", "ol", "p", "strong", "ul")
17+
override def visit(tagNode: TagNode, htmlNode: HtmlNode): Boolean = {
18+
htmlNode match {
19+
case t: TagNode =>
20+
if (!(includedTags contains t.getName)) {
21+
t.removeFromTree()
22+
}
23+
case _ =>
24+
}
25+
true
26+
}
27+
}
28+
29+
def getInt(s: String): Int = Try(s.toInt) getOrElse Int.MinValue
30+
31+
32+
val getTextFromHtml: String => String = (html: String) => {
33+
val cleaner = new HtmlCleaner
34+
val rootNode = cleaner.clean(html).getElementsByName("body",false)(0)
35+
36+
// Prunes the tree
37+
rootNode.traverse(tagNodeVisitor)
38+
39+
rootNode.getText.toString
40+
}
41+
42+
val getFullText = (text: String) => escapeHtml(text)
43+
44+
val getTags: String => List[String] = (text: String) => {
45+
val tagPattern = "(?<=&lt;)\\S+?(?=&gt;)".r
46+
(tagPattern findAllIn escapeHtml(text)) toList
47+
}
48+
49+
}

Diff for: src/main/scala-2.10/scratch00.sc

-5
This file was deleted.

Diff for: src/test/scala-2.10/StringEscapeUtils.sc

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import org.apache.commons.lang.StringEscapeUtils.{escapeHtml, unescapeHtml}
2+
3+
escapeHtml("<neural-networks><definitions>")
4+
5+
escapeHtml("&lt;neural&gt;")
6+
7+
val text = "&lt;neural-networks&gt;&lt;definitions&gt;"
8+
9+
val e = escapeHtml(text)
10+
val u = unescapeHtml(text)
11+
12+
val tagPattern = "(?<=&lt;)\\S+?(?=&gt;)".r
13+
(tagPattern findAllIn text) toList
14+
15+
(tagPattern findAllIn e) toList
16+
17+
(tagPattern findAllIn u) toList
18+

Diff for: src/test/scala-2.10/TestStackExApp.scala

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import edu.vanderbilt.accre.stackex._
2+
import org.scalatest.WordSpec
3+
4+
/**
5+
* Created by arnold-jr on 12/21/16.
6+
*/
7+
8+
class TestStackExApp extends WordSpec {
9+
10+
val line =
11+
"""<row Id="2435" PostTypeId="1" CreationDate="2016-12-06T19:50:13.853"
12+
|Score="-1" ViewCount="12" Body="&lt;p&gt;If I have a dataset of images,
13+
|and I extract all cnn feature vectors from them.&#xA;After that I generate
14+
|the pca model of these features by doing:&lt;/p&gt; &#xA; &#xA; &lt;pre&gt;
15+
|&lt;code&gt;pca.fit(ALL_features)&#xA; &lt;/code&gt; &lt;/pre&gt; &#xA;
16+
|&#xA; &lt;p&gt;IF I have a new image and I need to check the similarity
17+
|between this image and the whole dataset, what I have to do?&lt;/p&gt;
18+
|&#xA; &#xA; &lt;ol&gt; &#xA; &lt;li&gt;Extract cnn features from this
19+
|image.&lt;/li&gt; &#xA; &lt;li&gt;How to use the previous pca
20+
|model?&lt;/li&gt; &#xA; &lt;li&gt;How to check the similarity between
21+
|the dataset features and the new image features?&lt;/li&gt; &#xA;
22+
|&lt;/ol&gt; &#xA; &#xA; &lt;p&gt;Is by doing this? or how?&lt;/p&gt;
23+
|&#xA; &#xA; &lt;pre&gt; &lt;code&gt;self.pca.transform(self.db_feats)&#xA;
24+
|&lt;/code&gt; &lt;/pre&gt; &#xA;" OwnerUserId="1644"
25+
|LastActivityDate="2016-12-06T19:50:13.853" Title="PCA pca.fit VS
26+
|pca.transform" Tags="&lt;machine-learning&gt; &lt;deep-learning&gt;
27+
|&lt;image-recognition&gt; &lt;conv-neural-network&gt;" AnswerCount="0"
28+
|CommentCount="0"/>""".stripMargin
29+
30+
val tagText = """"&lt;machine-learning&gt;&lt;deep-learning&gt;
31+
|&lt;image-recognition&gt;&lt;conv-neural-network&gt;""""
32+
33+
"getTags" when {
34+
"applied to Tag text" should {
35+
"return a list of tags as String" in {
36+
assert(getTags(tagText) ==
37+
List("machine-learning", "deep-learning", "image-recognition",
38+
"conv-neural-network")
39+
)
40+
}
41+
}
42+
}
43+
44+
"getTextFromHtml" when {
45+
"passed some nested html" should {
46+
"return the body text in the correct order" in {
47+
assert(
48+
getTextFromHtml("<p><em>Emphatic</em><a>excluded </a> parallel</p>") ==
49+
"Emphatic parallel"
50+
)
51+
}
52+
}
53+
"passed a valid html snippet" should {
54+
"return the body text" in {
55+
assert(getTextFromHtml("<p>some text.</p>") == "some text.")
56+
}
57+
}
58+
"passed some nested html" should {
59+
"return the body text" in {
60+
assert(getTextFromHtml("<p><p>some text.</p></p>") == "some text.")
61+
}
62+
}
63+
"passed some heterogeneous nested html" should {
64+
"return the body text" in {
65+
assert(getTextFromHtml("<i><p>some text.</p></i>") == "some text.")
66+
}
67+
}
68+
"passed some heterogeneous 3-level-nested html" should {
69+
"return the body text" in {
70+
assert(getTextFromHtml("<p><em><i>some text.</i></em></p>") ==
71+
"some text.")
72+
}
73+
}
74+
"passed some excluded html" should {
75+
"return the body text" in {
76+
assert(
77+
getTextFromHtml("<a>excluded text</a><p>some text.</p>") ==
78+
"some text."
79+
)
80+
}
81+
}
82+
}
83+
84+
85+
}

Diff for: src/test/scala-2.10/databricks.sc

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import org.apache.spark.{SparkConf, SparkContext}
2+
import org.apache.spark.sql.SQLContext
3+
import org.apache.spark.sql.types.{StringType, StructField, StructType}
4+
5+
val customSchema = StructType(Array(
6+
StructField("_Id", StringType, nullable = true),
7+
StructField("_PostTypeId", StringType, nullable = true),
8+
StructField("_ParentID", StringType, nullable = true),
9+
StructField("_AcceptedAnswerId", StringType, nullable = true),
10+
StructField("_CreationDate", StringType, nullable = true),
11+
StructField("_Score", StringType, nullable = true),
12+
StructField("_ViewCount", StringType, nullable = true),
13+
StructField("_Body", StringType, nullable = true),
14+
StructField("_OwnerUserId", StringType, nullable = true),
15+
StructField("_LastEditorUserId", StringType, nullable = true),
16+
StructField("_LastEditorDisplayName", StringType, nullable = true),
17+
StructField("_LastEditDate", StringType, nullable = true),
18+
StructField("_LastActivityDate", StringType, nullable = true),
19+
StructField("_CommunityOwnedDate", StringType, nullable = true),
20+
StructField("_ClosedDate", StringType, nullable = true),
21+
StructField("_Title", StringType, nullable = true),
22+
StructField("_Tags", StringType, nullable = true),
23+
StructField("_AnswerCount", StringType, nullable = true),
24+
StructField("_CommentCount", StringType, nullable = true),
25+
StructField("_FavoriteCount", StringType, nullable = true)
26+
))
27+
28+
val sc = new SparkContext(new SparkConf().setMaster("local(1)").setAppName("foo"))
29+
val sqlContext = new SQLContext(sc)
30+
val df = sqlContext.read
31+
.format("com.databricks.spark.xml")
32+
.option("rowTag", "book")
33+
.load("/Users/joshuaarnold/books.xml")
34+
35+
sc.stop()

0 commit comments

Comments
 (0)