Skip to content

Commit

Permalink
Allow number of consumers and batch size to be configured on command …
Browse files Browse the repository at this point in the history
…line
  • Loading branch information
paulbutcher committed Jan 24, 2013
1 parent cc9b2f5 commit 86d302e
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 9 deletions.
2 changes: 1 addition & 1 deletion consumer_pulls/src/main/scala/com/paulbutcher/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Master extends Actor {
var consumers = Set[ActorRef]()
val producer = context.actorOf(Props[Producer], "producer")

for (i <- 0 until 9)
for (i <- 0 until WordCount.numConsumers)
consumers += context.actorOf(Props(new Consumer(producer)), s"consumer$i")
consumers foreach context.watch _

Expand Down
4 changes: 4 additions & 0 deletions consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import akka.actor._

object WordCount extends App {

val numConsumers = args(0).toInt

val system = ActorSystem("WordCount")
val startTime = System.currentTimeMillis
system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}"))

system.actorOf(Props[Master], "master")
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ case class Work(pages: Array[Page])

class Consumer(producer: ActorRef) extends Actor {

val BATCH_SIZE = 10

val counts = new HashMap[String, Int].withDefaultValue(0)

override def preStart() {
producer ! RequestWork(BATCH_SIZE)
producer ! RequestWork(WordCount.batchSize)
}

def receive = {
case Work(pages) =>
for (Page(title, text) <- pages)
for (word <- Words(text))
counts(word) += 1
producer ! RequestWork(BATCH_SIZE)
producer ! RequestWork(WordCount.batchSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Master extends Actor {
var consumers = Set[ActorRef]()
val producer = context.actorOf(Props[Producer], "producer")

for (i <- 0 until 9)
for (i <- 0 until WordCount.numConsumers)
consumers += context.actorOf(Props(new Consumer(producer)), s"consumer$i")
consumers foreach context.watch _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import akka.actor._

object WordCount extends App {

val numConsumers = args(0).toInt
val batchSize = args(1).toInt

val system = ActorSystem("WordCount")
val startTime = System.currentTimeMillis
system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}"))

system.actorOf(Props[Master], "master")
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Master extends Actor {
context.watch(cache)

var consumers = Set[ActorRef]()
for (i <- 0 until 7)
for (i <- 0 until WordCount.numConsumers)
consumers += context.actorOf(Props(new Consumer(cache)), s"consumer$i")
consumers foreach context.watch _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import akka.actor._

object WordCount extends App {

val numConsumers = args(0).toInt

val system = ActorSystem("WordCount")
val startTime = System.currentTimeMillis
system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}"))

system.actorOf(Props[Master], "master")
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.routing.RoundRobinRouter
class Master extends Actor {

val consumers = context.actorOf(Props[Consumer].
withRouter(RoundRobinRouter(5)).
withRouter(RoundRobinRouter(WordCount.numConsumers)).
withDispatcher("consumer-dispatcher"), "consumers")
val producer = context.actorOf(Props(new Producer(consumers)), "producer")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import akka.actor._

object WordCount extends App {

val numConsumers = args(0).toInt

val system = ActorSystem("WordCount")
val startTime = System.currentTimeMillis
system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}"))

system.actorOf(Props[Master], "master")
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.routing.RoundRobinRouter
class Master extends Actor {

val consumers = context.actorOf(Props[Consumer].
withRouter(RoundRobinRouter(5)).
withRouter(RoundRobinRouter(WordCount.numConsumers)).
withDispatcher("consumer-dispatcher"), "consumers")
val producer = context.actorOf(Props(new Producer(consumers)), "producer")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import akka.actor._

object WordCount extends App {

val numConsumers = args(0).toInt

val system = ActorSystem("WordCount")
val startTime = System.currentTimeMillis
system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}"))

system.actorOf(Props[Master], "master")
}

0 comments on commit 86d302e

Please sign in to comment.