Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbutcher committed Jan 21, 2013
0 parents commit cfcdb94
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
target/
.DS_Store
enwiki.xml
23 changes: 23 additions & 0 deletions consumer_pulls/src/main/scala/com/paulbutcher/Accumulator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.paulbutcher

import akka.actor._
import collection.Map
import collection.mutable.HashMap

case class Counts(counts: Map[String, Int])

class Accumulator extends Actor {

val counts = new HashMap[String, Int].withDefaultValue(0)

def receive = {
case Counts(partialCounts) =>
for ((word, count) <- partialCounts)
counts(word) += count
}

override def postStop() {
// for ((k, v) <- counts)
// println(s"$k=$v")
}
}
24 changes: 24 additions & 0 deletions consumer_pulls/src/main/scala/com/paulbutcher/Counter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.paulbutcher

import akka.actor._
import collection.mutable.HashMap

class Counter(parser: ActorRef, accumulator: ActorRef) extends Actor {

val counts = new HashMap[String, Int].withDefaultValue(0)

override def preStart() {
parser ! RequestWork
}

def receive = {
case Page(title, text) =>
for (word <- Words(text))
counts(word) += 1
parser ! RequestWork

case NoMoreWork =>
accumulator ! Counts(counts)
context.stop(self)
}
}
25 changes: 25 additions & 0 deletions consumer_pulls/src/main/scala/com/paulbutcher/Master.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.paulbutcher

import akka.actor._
import akka.routing.RoundRobinRouter

class Master extends Actor {

var counters = Set[ActorRef]()

val accumulator = context.actorOf(Props[Accumulator], "accumulator")
val parser = context.actorOf(Props[Parser])

for (i <- 0 to 8)
counters += context.actorOf(Props(new Counter(parser, accumulator)))
counters foreach context.watch _
context.watch(accumulator)

def receive = {
case Terminated(`accumulator`) => context.system.shutdown
case Terminated(counter) =>
counters -= counter
if (counters.isEmpty)
accumulator ! PoisonPill
}
}
19 changes: 19 additions & 0 deletions consumer_pulls/src/main/scala/com/paulbutcher/Parser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.paulbutcher

import akka.actor._

case class RequestWork()
case class NoMoreWork()

class Parser extends Actor {

val pages = Pages(100000, "enwiki.xml")

def receive = {
case RequestWork =>
if (pages.hasNext)
sender ! pages.next
else
sender ! NoMoreWork
}
}
10 changes: 10 additions & 0 deletions consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.paulbutcher

import akka.actor._

object WordCount extends App {

val system = ActorSystem("WordCount")

system.actorOf(Props[Master])
}
22 changes: 22 additions & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import sbt._
import Keys._

object WordCountBuild extends Build {
val buildSettings = Defaults.defaultSettings ++ Seq(
organization := "com.paulbutcher",
version := "0.1",
scalaVersion := "2.10.0",
scalacOptions ++= Seq("-deprecation", "-unchecked", "-feature"))

lazy val utils = Project(
"utils",
file("utils"),
settings = buildSettings ++ Seq(
name := "Utils",
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.1.0"))

lazy val consumer_pulls = Project(
"consumer_pulls",
file("consumer_pulls"),
settings = buildSettings ++ Seq(name := "Consumer Pulls")) dependsOn(utils)
}
43 changes: 43 additions & 0 deletions utils/src/main/scala/com/paulbutcher/Pages.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.paulbutcher

import javax.xml.stream.XMLInputFactory
import java.io.FileInputStream

case class Page(title: String, text: String)

class Pages(maxPages: Int, fileName: String) extends Iterator[Page] {
var remainingPages = maxPages
val reader = XMLInputFactory.newInstance.createXMLEventReader(new FileInputStream(fileName))

def hasNext = remainingPages > 0

def next(): Page = {
while (true) {
val event = reader.nextEvent
if (event.isStartElement &&
event.asStartElement.getName.getLocalPart == "page") {
var title: String = null
var text: String = null
while (true) {
val event = reader.nextEvent
if (event.isStartElement) {
event.asStartElement.getName.getLocalPart match {
case "title" => title = reader.getElementText
case "text" => text = reader.getElementText
case _ =>
}
} else if (event.isEndElement &&
event.asEndElement.getName.getLocalPart == "page") {
remainingPages -= 1
return Page(title, text)
}
}
}
}
throw new NoSuchElementException
}
}

object Pages {
def apply(maxPages: Int, fileName: String) = new Pages(maxPages, fileName)
}
23 changes: 23 additions & 0 deletions utils/src/main/scala/com/paulbutcher/Words.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.paulbutcher

import java.text.BreakIterator

class Words(text: String) extends Iterator[String] {
val wordBoundary = BreakIterator.getWordInstance
wordBoundary.setText(text)
var start = wordBoundary.first
var end = wordBoundary.next

def hasNext = end != BreakIterator.DONE

def next() = {
val s = text.subSequence(start, end)
start = end
end = wordBoundary.next
s.toString
}
}

object Words {
def apply(text:String) = new Words(text)
}

0 comments on commit cfcdb94

Please sign in to comment.