diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..874817c --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +target/ +.DS_Store +enwiki.xml \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Accumulator.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Accumulator.scala new file mode 100644 index 0000000..b90e231 --- /dev/null +++ b/consumer_pulls/src/main/scala/com/paulbutcher/Accumulator.scala @@ -0,0 +1,23 @@ +package com.paulbutcher + +import akka.actor._ +import collection.Map +import collection.mutable.HashMap + +case class Counts(counts: Map[String, Int]) + +class Accumulator extends Actor { + + val counts = new HashMap[String, Int].withDefaultValue(0) + + def receive = { + case Counts(partialCounts) => + for ((word, count) <- partialCounts) + counts(word) += count + } + + override def postStop() { + // for ((k, v) <- counts) + // println(s"$k=$v") + } +} \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Counter.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Counter.scala new file mode 100644 index 0000000..21d17e7 --- /dev/null +++ b/consumer_pulls/src/main/scala/com/paulbutcher/Counter.scala @@ -0,0 +1,24 @@ +package com.paulbutcher + +import akka.actor._ +import collection.mutable.HashMap + +class Counter(parser: ActorRef, accumulator: ActorRef) extends Actor { + + val counts = new HashMap[String, Int].withDefaultValue(0) + + override def preStart() { + parser ! RequestWork + } + + def receive = { + case Page(title, text) => + for (word <- Words(text)) + counts(word) += 1 + parser ! RequestWork + + case NoMoreWork => + accumulator ! Counts(counts) + context.stop(self) + } +} \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala new file mode 100644 index 0000000..141b1a9 --- /dev/null +++ b/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala @@ -0,0 +1,25 @@ +package com.paulbutcher + +import akka.actor._ +import akka.routing.RoundRobinRouter + +class Master extends Actor { + + var counters = Set[ActorRef]() + + val accumulator = context.actorOf(Props[Accumulator], "accumulator") + val parser = context.actorOf(Props[Parser]) + + for (i <- 0 to 8) + counters += context.actorOf(Props(new Counter(parser, accumulator))) + counters foreach context.watch _ + context.watch(accumulator) + + def receive = { + case Terminated(`accumulator`) => context.system.shutdown + case Terminated(counter) => + counters -= counter + if (counters.isEmpty) + accumulator ! PoisonPill + } +} \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Parser.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Parser.scala new file mode 100644 index 0000000..e960be8 --- /dev/null +++ b/consumer_pulls/src/main/scala/com/paulbutcher/Parser.scala @@ -0,0 +1,19 @@ +package com.paulbutcher + +import akka.actor._ + +case class RequestWork() +case class NoMoreWork() + +class Parser extends Actor { + + val pages = Pages(100000, "enwiki.xml") + + def receive = { + case RequestWork => + if (pages.hasNext) + sender ! pages.next + else + sender ! NoMoreWork + } +} \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala b/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala new file mode 100644 index 0000000..12872b8 --- /dev/null +++ b/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala @@ -0,0 +1,10 @@ +package com.paulbutcher + +import akka.actor._ + +object WordCount extends App { + + val system = ActorSystem("WordCount") + + system.actorOf(Props[Master]) +} \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala new file mode 100644 index 0000000..6a279f6 --- /dev/null +++ b/project/Build.scala @@ -0,0 +1,22 @@ +import sbt._ +import Keys._ + +object WordCountBuild extends Build { + val buildSettings = Defaults.defaultSettings ++ Seq( + organization := "com.paulbutcher", + version := "0.1", + scalaVersion := "2.10.0", + scalacOptions ++= Seq("-deprecation", "-unchecked", "-feature")) + + lazy val utils = Project( + "utils", + file("utils"), + settings = buildSettings ++ Seq( + name := "Utils", + libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.1.0")) + + lazy val consumer_pulls = Project( + "consumer_pulls", + file("consumer_pulls"), + settings = buildSettings ++ Seq(name := "Consumer Pulls")) dependsOn(utils) +} \ No newline at end of file diff --git a/utils/src/main/scala/com/paulbutcher/Pages.scala b/utils/src/main/scala/com/paulbutcher/Pages.scala new file mode 100644 index 0000000..dcf8d96 --- /dev/null +++ b/utils/src/main/scala/com/paulbutcher/Pages.scala @@ -0,0 +1,43 @@ +package com.paulbutcher + +import javax.xml.stream.XMLInputFactory +import java.io.FileInputStream + +case class Page(title: String, text: String) + +class Pages(maxPages: Int, fileName: String) extends Iterator[Page] { + var remainingPages = maxPages + val reader = XMLInputFactory.newInstance.createXMLEventReader(new FileInputStream(fileName)) + + def hasNext = remainingPages > 0 + + def next(): Page = { + while (true) { + val event = reader.nextEvent + if (event.isStartElement && + event.asStartElement.getName.getLocalPart == "page") { + var title: String = null + var text: String = null + while (true) { + val event = reader.nextEvent + if (event.isStartElement) { + event.asStartElement.getName.getLocalPart match { + case "title" => title = reader.getElementText + case "text" => text = reader.getElementText + case _ => + } + } else if (event.isEndElement && + event.asEndElement.getName.getLocalPart == "page") { + remainingPages -= 1 + return Page(title, text) + } + } + } + } + throw new NoSuchElementException + } +} + +object Pages { + def apply(maxPages: Int, fileName: String) = new Pages(maxPages, fileName) +} \ No newline at end of file diff --git a/utils/src/main/scala/com/paulbutcher/Words.scala b/utils/src/main/scala/com/paulbutcher/Words.scala new file mode 100644 index 0000000..667d525 --- /dev/null +++ b/utils/src/main/scala/com/paulbutcher/Words.scala @@ -0,0 +1,23 @@ +package com.paulbutcher + +import java.text.BreakIterator + +class Words(text: String) extends Iterator[String] { + val wordBoundary = BreakIterator.getWordInstance + wordBoundary.setText(text) + var start = wordBoundary.first + var end = wordBoundary.next + + def hasNext = end != BreakIterator.DONE + + def next() = { + val s = text.subSequence(start, end) + start = end + end = wordBoundary.next + s.toString + } +} + +object Words { + def apply(text:String) = new Words(text) +} \ No newline at end of file