From 86d302ef1f17da6faddf50f865702d53315a1495 Mon Sep 17 00:00:00 2001 From: Paul Butcher Date: Thu, 24 Jan 2013 12:01:33 +0000 Subject: [PATCH] Allow number of consumers and batch size to be configured on command line --- consumer_pulls/src/main/scala/com/paulbutcher/Master.scala | 2 +- .../src/main/scala/com/paulbutcher/WordCount.scala | 4 ++++ .../src/main/scala/com/paulbutcher/Consumer.scala | 6 ++---- .../src/main/scala/com/paulbutcher/Master.scala | 2 +- .../src/main/scala/com/paulbutcher/WordCount.scala | 5 +++++ .../src/main/scala/com/paulbutcher/Master.scala | 2 +- .../src/main/scala/com/paulbutcher/WordCount.scala | 4 ++++ .../src/main/scala/com/paulbutcher/Master.scala | 2 +- .../src/main/scala/com/paulbutcher/WordCount.scala | 4 ++++ .../src/main/scala/com/paulbutcher/Master.scala | 2 +- .../src/main/scala/com/paulbutcher/WordCount.scala | 4 ++++ 11 files changed, 28 insertions(+), 9 deletions(-) diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala index 1133951..54f8ae3 100644 --- a/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala +++ b/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala @@ -7,7 +7,7 @@ class Master extends Actor { var consumers = Set[ActorRef]() val producer = context.actorOf(Props[Producer], "producer") - for (i <- 0 until 9) + for (i <- 0 until WordCount.numConsumers) consumers += context.actorOf(Props(new Consumer(producer)), s"consumer$i") consumers foreach context.watch _ diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala b/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala index f45acd0..f42a848 100644 --- a/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala +++ b/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala @@ -4,7 +4,11 @@ import akka.actor._ object WordCount extends App { + val numConsumers = args(0).toInt + val system = ActorSystem("WordCount") + val startTime = System.currentTimeMillis + system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) system.actorOf(Props[Master], "master") } \ No newline at end of file diff --git a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Consumer.scala b/consumer_pulls_batched/src/main/scala/com/paulbutcher/Consumer.scala index 342745e..64ebf8e 100644 --- a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Consumer.scala +++ b/consumer_pulls_batched/src/main/scala/com/paulbutcher/Consumer.scala @@ -7,12 +7,10 @@ case class Work(pages: Array[Page]) class Consumer(producer: ActorRef) extends Actor { - val BATCH_SIZE = 10 - val counts = new HashMap[String, Int].withDefaultValue(0) override def preStart() { - producer ! RequestWork(BATCH_SIZE) + producer ! RequestWork(WordCount.batchSize) } def receive = { @@ -20,6 +18,6 @@ class Consumer(producer: ActorRef) extends Actor { for (Page(title, text) <- pages) for (word <- Words(text)) counts(word) += 1 - producer ! RequestWork(BATCH_SIZE) + producer ! RequestWork(WordCount.batchSize) } } \ No newline at end of file diff --git a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Master.scala b/consumer_pulls_batched/src/main/scala/com/paulbutcher/Master.scala index 1133951..54f8ae3 100644 --- a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Master.scala +++ b/consumer_pulls_batched/src/main/scala/com/paulbutcher/Master.scala @@ -7,7 +7,7 @@ class Master extends Actor { var consumers = Set[ActorRef]() val producer = context.actorOf(Props[Producer], "producer") - for (i <- 0 until 9) + for (i <- 0 until WordCount.numConsumers) consumers += context.actorOf(Props(new Consumer(producer)), s"consumer$i") consumers foreach context.watch _ diff --git a/consumer_pulls_batched/src/main/scala/com/paulbutcher/WordCount.scala b/consumer_pulls_batched/src/main/scala/com/paulbutcher/WordCount.scala index f45acd0..e77611b 100644 --- a/consumer_pulls_batched/src/main/scala/com/paulbutcher/WordCount.scala +++ b/consumer_pulls_batched/src/main/scala/com/paulbutcher/WordCount.scala @@ -4,7 +4,12 @@ import akka.actor._ object WordCount extends App { + val numConsumers = args(0).toInt + val batchSize = args(1).toInt + val system = ActorSystem("WordCount") + val startTime = System.currentTimeMillis + system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) system.actorOf(Props[Master], "master") } \ No newline at end of file diff --git a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Master.scala b/consumer_pulls_cached/src/main/scala/com/paulbutcher/Master.scala index 1cc08be..4fbcd33 100644 --- a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Master.scala +++ b/consumer_pulls_cached/src/main/scala/com/paulbutcher/Master.scala @@ -10,7 +10,7 @@ class Master extends Actor { context.watch(cache) var consumers = Set[ActorRef]() - for (i <- 0 until 7) + for (i <- 0 until WordCount.numConsumers) consumers += context.actorOf(Props(new Consumer(cache)), s"consumer$i") consumers foreach context.watch _ diff --git a/consumer_pulls_cached/src/main/scala/com/paulbutcher/WordCount.scala b/consumer_pulls_cached/src/main/scala/com/paulbutcher/WordCount.scala index f45acd0..f42a848 100644 --- a/consumer_pulls_cached/src/main/scala/com/paulbutcher/WordCount.scala +++ b/consumer_pulls_cached/src/main/scala/com/paulbutcher/WordCount.scala @@ -4,7 +4,11 @@ import akka.actor._ object WordCount extends App { + val numConsumers = args(0).toInt + val system = ActorSystem("WordCount") + val startTime = System.currentTimeMillis + system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) system.actorOf(Props[Master], "master") } \ No newline at end of file diff --git a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Master.scala b/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Master.scala index 841fb0e..7af02f1 100644 --- a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Master.scala +++ b/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Master.scala @@ -6,7 +6,7 @@ import akka.routing.RoundRobinRouter class Master extends Actor { val consumers = context.actorOf(Props[Consumer]. - withRouter(RoundRobinRouter(5)). + withRouter(RoundRobinRouter(WordCount.numConsumers)). withDispatcher("consumer-dispatcher"), "consumers") val producer = context.actorOf(Props(new Producer(consumers)), "producer") diff --git a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/WordCount.scala b/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/WordCount.scala index f45acd0..f42a848 100644 --- a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/WordCount.scala +++ b/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/WordCount.scala @@ -4,7 +4,11 @@ import akka.actor._ object WordCount extends App { + val numConsumers = args(0).toInt + val system = ActorSystem("WordCount") + val startTime = System.currentTimeMillis + system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) system.actorOf(Props[Master], "master") } \ No newline at end of file diff --git a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Master.scala b/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Master.scala index 841fb0e..7af02f1 100644 --- a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Master.scala +++ b/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Master.scala @@ -6,7 +6,7 @@ import akka.routing.RoundRobinRouter class Master extends Actor { val consumers = context.actorOf(Props[Consumer]. - withRouter(RoundRobinRouter(5)). + withRouter(RoundRobinRouter(WordCount.numConsumers)). withDispatcher("consumer-dispatcher"), "consumers") val producer = context.actorOf(Props(new Producer(consumers)), "producer") diff --git a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/WordCount.scala b/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/WordCount.scala index f45acd0..f42a848 100644 --- a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/WordCount.scala +++ b/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/WordCount.scala @@ -4,7 +4,11 @@ import akka.actor._ object WordCount extends App { + val numConsumers = args(0).toInt + val system = ActorSystem("WordCount") + val startTime = System.currentTimeMillis + system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) system.actorOf(Props[Master], "master") } \ No newline at end of file