Skip to content

Commit 3ff8b37

Browse files
committed
Addition of syntactic sugar in Scala for MapReduce classes, as well as Scala implementations of all Java MapReduce applications.
In addition to the implementations, local integration tests were written to verify the correctness of the original Java implementations running in local mode, as well as ensuring that the Scala implementations returned the same results. Squashed commit message history: Initial commit of MapReduce syntactic sugar and Bigram implementations Moved MapReduce-specific utilities into io.bespin.scala.mapreduce.util Small cleanup and comments Added more comments, renamed some traits for clarity Updated syntax for readability and conciseness Added compare script, changed WordCount to use new style Added ports of CooccurrenceMatrix stripes/pairs. Still need to add more comments and make things a bit more idiomatic (the window calculations are not very 'scala-y' right now) Added block comments and changed the sliding window code to be a bit more scala-idiomatic. Also added ported version of search code Changed signature of TypedReducer's reduce method to use scala iterables and remove the need for explicit conversion from java Added in implicit conversion coverage for pair datatypes Beginning of unit tests; Beginning work on PageRank scala MR implementation - Basic page rank results match Added in more integration tests; integration tests now pull required source files from the internet before running Major refactoring of many of the traits in MapReduceSugar This should make the code a bit more modular and simple. It also solves some of the problems with the previous implementation having trouble with things such as running a simple partition job with no mapper or reducer being set. Addition of more integration tests and some unit tests for Hadoop<->Scala conversions Removed nullMapper/nullReducer objects in favor of having Optional mappers and reducers Main BFS classes implemented and results match. Removed unused "compareJavaScala.py" file. Created proper maven target for integration tests. Added integration tests for BFS, refactored other integration tests Added integration tests for non-schimmy versions of PageRank. Fixed bug in scala PageRank related to strange iterator behavior. Added test fixture and integration tests for search/boolean retrieval Full application parity between scala and java versions. Fixed bug in RunPageRankSchimmy that caused failures in local mode of IMC due to mapper reuse. Added integration tests for PageRank and PageRankSchimmy.
1 parent d1b7b20 commit 3ff8b37

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4249
-244
lines changed

pom.xml

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
44

55
<modelVersion>4.0.0</modelVersion>
66
<groupId>io.bespin</groupId>
@@ -75,6 +75,16 @@
7575
<goal>compile</goal>
7676
</goals>
7777
</execution>
78+
<execution>
79+
<id>scala-integration-test-add</id>
80+
<phase>process-test-resources</phase>
81+
<goals>
82+
<goal>add-source</goal>
83+
</goals>
84+
<configuration>
85+
<testSourceDir>src/it/scala</testSourceDir>
86+
</configuration>
87+
</execution>
7888
<execution>
7989
<id>scala-test-compile</id>
8090
<phase>process-test-resources</phase>
@@ -118,6 +128,46 @@
118128
</execution>
119129
</executions>
120130
</plugin>
131+
<!-- disable surefire tests-->
132+
<plugin>
133+
<groupId>org.apache.maven.plugins</groupId>
134+
<artifactId>maven-surefire-plugin</artifactId>
135+
<version>2.7</version>
136+
<configuration>
137+
<skipTests>true</skipTests>
138+
</configuration>
139+
</plugin>
140+
<!--enable scalatest -->
141+
<plugin>
142+
<groupId>org.scalatest</groupId>
143+
<artifactId>scalatest-maven-plugin</artifactId>
144+
<version>1.0</version>
145+
<configuration>
146+
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
147+
<junitxml>.</junitxml>
148+
</configuration>
149+
<executions>
150+
<execution>
151+
<id>test</id>
152+
<goals>
153+
<goal>test</goal>
154+
</goals>
155+
<configuration>
156+
<suffixes>(?&lt;!IT)</suffixes>
157+
</configuration>
158+
</execution>
159+
<execution>
160+
<id>integration-test</id>
161+
<phase>integration-test</phase>
162+
<goals>
163+
<goal>test</goal>
164+
</goals>
165+
<configuration>
166+
<suffixes>(?&lt;=IT)</suffixes>
167+
</configuration>
168+
</execution>
169+
</executions>
170+
</plugin>
121171
</plugins>
122172
</build>
123173

@@ -179,6 +229,12 @@
179229
<artifactId>spark-core_2.10</artifactId>
180230
<version>${spark.version}</version>
181231
</dependency>
232+
<dependency>
233+
<groupId>org.scalatest</groupId>
234+
<artifactId>scalatest_2.10</artifactId>
235+
<version>2.2.6</version>
236+
<scope>test</scope>
237+
</dependency>
182238
</dependencies>
183239

184240
</project>

src/it/resources/log4j.properties

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Root logger option
2+
log4j.rootLogger=WARN, stdout
3+
4+
# Redirect log messages to console
5+
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
6+
log4j.appender.stdout.Target=System.out
7+
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.bespin.scala.mapreduce
2+
3+
import io.bespin.scala.util._
4+
import org.apache.hadoop.util.ToolRunner
5+
import org.scalatest.{FlatSpec, Matchers}
6+
7+
sealed abstract class BfsLocalIT(override val url: String)
8+
extends FlatSpec with Matchers with TestLogging with IterableKVTest[Int, (Int, Int)] with WithExternalFile{
9+
10+
private val tupleRegex = "\\{(\\d+) (\\d+) \\[(.*)\\]\\}".r
11+
override protected def tupleConv(key: String, value: String): (Int, (Int, Int)) = {
12+
val s = value match {
13+
case tupleRegex(selfNode, dist, connected) =>
14+
(selfNode.toInt, dist.toInt)
15+
}
16+
(key.toInt, s)
17+
}
18+
19+
override protected val iterations: Int = 15
20+
override protected def resultDir: String = outputDir + s"/reachable-iter000$iterations"
21+
22+
s"BFS:$suiteName" should "find all reachable nodes after 15 iterations" in programOutput { map =>
23+
map.size shouldBe 6028
24+
}
25+
26+
it should "get correct distance for 367" in programOutput { map =>
27+
map(367)._2 shouldBe 0
28+
}
29+
30+
it should "find correct distance for node 101" in programOutput { map =>
31+
map(101)._2 shouldBe 6
32+
}
33+
34+
it should "find correct distance for node 201" in programOutput { map =>
35+
map(201)._2 shouldBe 8
36+
}
37+
38+
it should "find correct distance for node 6276" in programOutput { map =>
39+
map(6276)._2 shouldBe 13
40+
}
41+
42+
it should "find correct distance for node 6258" in programOutput { map =>
43+
map(6258)._2 shouldBe 11
44+
}
45+
46+
it should "find correct distance for node 909" in programOutput { map =>
47+
map(909)._2 shouldBe 4
48+
}
49+
50+
it should "find correct distance for node 5664" in programOutput { map =>
51+
map(5664)._2 shouldBe 7
52+
}
53+
}
54+
55+
class BfsJavaIT extends BfsLocalIT(TestConstants.Graph_Url) {
56+
override protected def initialJob: Any =
57+
ToolRunner.run(new io.bespin.java.mapreduce.bfs.EncodeBfsGraph, Array(
58+
"-input", filePath,
59+
"-output", outputDir + "/" + "iter0000",
60+
"-src", "367"
61+
))
62+
63+
override protected def iterJob(itr: Int): Any = {
64+
ToolRunner.run(new io.bespin.java.mapreduce.bfs.IterateBfs, Array(
65+
"-input", outputDir + "/" + s"iter000$itr",
66+
"-output", outputDir + "/" + s"iter000${itr + 1}",
67+
"-partitions", "5"
68+
))
69+
ToolRunner.run(new io.bespin.java.mapreduce.bfs.FindReachableNodes, Array(
70+
"-input", outputDir + "/" + s"iter000${itr + 1}",
71+
"-output", outputDir + "/" + s"reachable-iter000${itr + 1}"
72+
))
73+
}
74+
}
75+
76+
class BfsScalaIT extends BfsLocalIT(TestConstants.Graph_Url) {
77+
override protected def initialJob: Any =
78+
ToolRunner.run(io.bespin.scala.mapreduce.bfs.EncodeBfsGraph, Array(
79+
"--input", filePath,
80+
"--output", outputDir + "/" + "iter0000",
81+
"--src", "367"
82+
))
83+
84+
override protected def iterJob(itr: Int): Any = {
85+
ToolRunner.run(io.bespin.scala.mapreduce.bfs.IterateBfs, Array(
86+
"--input", outputDir + "/" + s"iter000$itr",
87+
"--output", outputDir + "/" + s"iter000${itr + 1}",
88+
"--partitions", "5"
89+
))
90+
ToolRunner.run(io.bespin.scala.mapreduce.bfs.FindReachableNodes, Array(
91+
"--input", outputDir + "/" + s"iter000${itr + 1}",
92+
"--output", outputDir + "/" + s"reachable-iter000${itr + 1}"
93+
))
94+
}
95+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.bespin.scala.mapreduce
2+
3+
import io.bespin.scala.util._
4+
import org.apache.hadoop.util.ToolRunner
5+
import org.scalatest.{FlatSpec, Matchers}
6+
7+
sealed abstract class BigramCountLocalIT(override val url: String)
8+
extends FlatSpec with Matchers with TestLogging with SingleKVTest[String, Long] with WithExternalFile {
9+
10+
override protected def tupleConv(key: String, value: String): (String, Long) = (key, value.toLong)
11+
12+
s"BigramCount:$suiteName" should "produce expected count for 'a baboon'" in programOutput { map =>
13+
map("a baboon") shouldBe 1
14+
}
15+
16+
it should "produce expected count for 'poor yorick'" in programOutput { map =>
17+
map("poor yorick") shouldBe 1
18+
}
19+
20+
it should "produce expected count for 'dream again'" in programOutput { map =>
21+
map("dream again") shouldBe 2
22+
}
23+
24+
it should "produce expected count for 'dream away'" in programOutput { map =>
25+
map("dream away") shouldBe 2
26+
}
27+
28+
it should "produce expected count for 'dream as'" in programOutput { map =>
29+
map("dream as") shouldBe 1
30+
}
31+
32+
}
33+
34+
class BigramCountScalaIT extends BigramCountLocalIT(TestConstants.Shakespeare_Url) {
35+
override protected def initialJob: Any =
36+
ToolRunner.run(io.bespin.scala.mapreduce.bigram.BigramCount, Array(
37+
"--input", filePath,
38+
"--output", outputDir,
39+
"--reducers", "1"
40+
))
41+
}
42+
43+
class BigramCountJavaIT extends BigramCountLocalIT(TestConstants.Shakespeare_Url) {
44+
override protected def initialJob: Any =
45+
ToolRunner.run(new io.bespin.java.mapreduce.bigram.BigramCount, Array(
46+
"-input", filePath,
47+
"-output", outputDir,
48+
"-reducers", "1"
49+
))
50+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package io.bespin.scala.mapreduce
2+
3+
import java.io.{ByteArrayInputStream, PrintStream}
4+
5+
import io.bespin.scala.util.{TestConstants, TestLogging, WithExternalFile, WithTempOutputDir}
6+
import org.apache.hadoop.util.ToolRunner
7+
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
8+
9+
import scala.collection.immutable.TreeMap
10+
11+
abstract sealed class BooleanRetrievalIT(override val url: String)
12+
extends FlatSpec with Matchers with TestLogging
13+
with BeforeAndAfterAll with WithTempOutputDir with WithExternalFile {
14+
15+
protected final case class RetrievalResult(occurrences: TreeMap[Int, String])
16+
17+
protected def runQuery(queryString: String): Unit
18+
19+
private val resultRegex = "(\\d+)\t(.*)".r
20+
21+
private def retrievalOutput(queryString: String)(f: RetrievalResult => Any): Unit = {
22+
val stream = new java.io.ByteArrayOutputStream()
23+
val originalOutStream = System.out
24+
val interceptionStream = new PrintStream(stream)
25+
// Temporarily redirect stdout to a stream we can capture
26+
System.setOut(interceptionStream)
27+
Console.withOut(stream) {
28+
runQuery(queryString)
29+
}
30+
System.setOut(originalOutStream)
31+
val input = new ByteArrayInputStream(stream.toByteArray)
32+
33+
val builder = TreeMap.newBuilder[Int, String]
34+
scala.io.Source.fromInputStream(input).getLines().foreach {
35+
case resultRegex(num, str) => builder += ((num.toInt, str.trim))
36+
case _ =>
37+
}
38+
39+
f(RetrievalResult(builder.result()))
40+
}
41+
42+
43+
s"BooleanRetrieval:$suiteName" should "produce expected count for 'poor'" in retrievalOutput("poor") { results =>
44+
results.occurrences.size shouldBe 623
45+
}
46+
47+
it should "produce expected count for 'white red OR rose AND pluck AND'" in retrievalOutput("white red OR rose AND pluck AND") { results =>
48+
results.occurrences.size shouldBe 5
49+
}
50+
51+
it should "produce expected lines for 'white red OR rose AND pluck AND'" in retrievalOutput("white red OR rose AND pluck AND") { results =>
52+
results.occurrences.values.head shouldBe "From off this brier pluck a white rose with me."
53+
}
54+
55+
it should "produce expected lines for 'poor yorick AND'" in retrievalOutput("poor yorick AND") { results =>
56+
results.occurrences.size shouldBe 1
57+
results.occurrences.values.head should include ("Alas, poor Yorick!")
58+
}
59+
60+
it should "produce nothing for query with no result" in retrievalOutput("romeo hamlet AND") { results =>
61+
results.occurrences shouldBe empty
62+
}
63+
64+
it should "produce expected lines for 'romeo juliet AND'" in retrievalOutput("romeo juliet AND") { results =>
65+
results.occurrences.values.head should startWith ("THE TRAGEDY")
66+
}
67+
68+
}
69+
70+
class BooleanRetrievalScalaIT extends BooleanRetrievalIT(TestConstants.Shakespeare_Url) {
71+
override def beforeAll = {
72+
super.beforeAll
73+
ToolRunner.run(io.bespin.scala.mapreduce.search.BuildInvertedIndex, Array(
74+
"--input", filePath,
75+
"--output", outputDir + "/index"
76+
))
77+
}
78+
79+
override protected def runQuery(queryString: String): Unit =
80+
ToolRunner.run(io.bespin.scala.mapreduce.search.BooleanRetrieval, Array(
81+
"--index", outputDir + "/index",
82+
"--collection", filePath,
83+
"--query", queryString
84+
))
85+
}
86+
87+
class BooleanRetrievalJavaIT extends BooleanRetrievalIT(TestConstants.Shakespeare_Url) {
88+
override def beforeAll = {
89+
super.beforeAll
90+
ToolRunner.run(new io.bespin.java.mapreduce.search.BuildInvertedIndex, Array(
91+
"-input", filePath,
92+
"-output", outputDir + "/index"
93+
))
94+
}
95+
96+
override protected def runQuery(queryString: String): Unit =
97+
ToolRunner.run(new io.bespin.java.mapreduce.search.BooleanRetrieval, Array(
98+
"-index", outputDir + "/index",
99+
"-collection", filePath,
100+
"-query", queryString
101+
))
102+
}

0 commit comments

Comments
 (0)