diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Consumer.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Consumer.scala deleted file mode 100644 index 0d292cf..0000000 --- a/consumer_pulls/src/main/scala/com/paulbutcher/Consumer.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.paulbutcher - -import akka.actor._ -import collection.mutable.HashMap - -class Consumer(producer: ActorRef) extends Actor { - - val counts = new HashMap[String, Int].withDefaultValue(0) - - override def preStart() { - producer ! RequestWork - } - - def receive = { - case Page(title, text) => - for (word <- Words(text)) - counts(word) += 1 - producer ! RequestWork - } -} \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala deleted file mode 100644 index 54f8ae3..0000000 --- a/consumer_pulls/src/main/scala/com/paulbutcher/Master.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -class Master extends Actor { - - var consumers = Set[ActorRef]() - val producer = context.actorOf(Props[Producer], "producer") - - for (i <- 0 until WordCount.numConsumers) - consumers += context.actorOf(Props(new Consumer(producer)), s"consumer$i") - consumers foreach context.watch _ - - def receive = { - case Terminated(consumer) => - consumers -= consumer - if (consumers.isEmpty) - context.system.shutdown - } -} \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/Producer.scala b/consumer_pulls/src/main/scala/com/paulbutcher/Producer.scala deleted file mode 100644 index 19fbfb4..0000000 --- a/consumer_pulls/src/main/scala/com/paulbutcher/Producer.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -case class RequestWork() - -class Producer extends Actor { - - val pages = Pages(100000, "enwiki.xml") - - def receive = { - case RequestWork => - if (pages.hasNext) - sender ! pages.next - else - sender ! PoisonPill - } -} \ No newline at end of file diff --git a/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala b/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala deleted file mode 100644 index f42a848..0000000 --- a/consumer_pulls/src/main/scala/com/paulbutcher/WordCount.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -object WordCount extends App { - - val numConsumers = args(0).toInt - - val system = ActorSystem("WordCount") - val startTime = System.currentTimeMillis - system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) - - system.actorOf(Props[Master], "master") -} \ No newline at end of file diff --git a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Master.scala b/consumer_pulls_batched/src/main/scala/com/paulbutcher/Master.scala deleted file mode 100644 index 54f8ae3..0000000 --- a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Master.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -class Master extends Actor { - - var consumers = Set[ActorRef]() - val producer = context.actorOf(Props[Producer], "producer") - - for (i <- 0 until WordCount.numConsumers) - consumers += context.actorOf(Props(new Consumer(producer)), s"consumer$i") - consumers foreach context.watch _ - - def receive = { - case Terminated(consumer) => - consumers -= consumer - if (consumers.isEmpty) - context.system.shutdown - } -} \ No newline at end of file diff --git a/consumer_pulls_batched/src/main/scala/com/paulbutcher/WordCount.scala b/consumer_pulls_batched/src/main/scala/com/paulbutcher/WordCount.scala deleted file mode 100644 index e77611b..0000000 --- a/consumer_pulls_batched/src/main/scala/com/paulbutcher/WordCount.scala +++ /dev/null @@ -1,15 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -object WordCount extends App { - - val numConsumers = args(0).toInt - val batchSize = args(1).toInt - - val system = ActorSystem("WordCount") - val startTime = System.currentTimeMillis - system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) - - system.actorOf(Props[Master], "master") -} \ No newline at end of file diff --git a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Cache.scala b/consumer_pulls_cached/src/main/scala/com/paulbutcher/Cache.scala deleted file mode 100644 index 6fb5c9c..0000000 --- a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Cache.scala +++ /dev/null @@ -1,31 +0,0 @@ -package com.paulbutcher - -import akka.actor._ -import collection.mutable.Queue - -class Cache(producer: ActorRef) extends Actor { - - val workQueue = Queue[Page]() - val requestQueue = Queue[ActorRef]() - - for (i <- 0 until 100) - producer ! RequestWork - - def deliverWorkTo(consumer: ActorRef) { - consumer ! workQueue.dequeue - producer ! RequestWork - } - - def receive = { - case page: Page => - workQueue.enqueue(page) - if (requestQueue.nonEmpty) - deliverWorkTo(requestQueue.dequeue) - - case RequestWork => - if (workQueue.nonEmpty) - deliverWorkTo(sender) - else - requestQueue.enqueue(sender) - } -} \ No newline at end of file diff --git a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Consumer.scala b/consumer_pulls_cached/src/main/scala/com/paulbutcher/Consumer.scala deleted file mode 100644 index 0d292cf..0000000 --- a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Consumer.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.paulbutcher - -import akka.actor._ -import collection.mutable.HashMap - -class Consumer(producer: ActorRef) extends Actor { - - val counts = new HashMap[String, Int].withDefaultValue(0) - - override def preStart() { - producer ! RequestWork - } - - def receive = { - case Page(title, text) => - for (word <- Words(text)) - counts(word) += 1 - producer ! RequestWork - } -} \ No newline at end of file diff --git a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Master.scala b/consumer_pulls_cached/src/main/scala/com/paulbutcher/Master.scala deleted file mode 100644 index 4fbcd33..0000000 --- a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Master.scala +++ /dev/null @@ -1,27 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -class Master extends Actor { - - val producer = context.actorOf(Props[Producer], "producer") - - val cache = context.actorOf(Props(new Cache(producer))) - context.watch(cache) - - var consumers = Set[ActorRef]() - for (i <- 0 until WordCount.numConsumers) - consumers += context.actorOf(Props(new Consumer(cache)), s"consumer$i") - consumers foreach context.watch _ - - def receive = { - case Terminated(`cache`) => - for (consumer <- consumers) - consumer ! PoisonPill - - case Terminated(consumer) => - consumers -= consumer - if (consumers.isEmpty) - context.system.shutdown - } -} \ No newline at end of file diff --git a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Producer.scala b/consumer_pulls_cached/src/main/scala/com/paulbutcher/Producer.scala deleted file mode 100644 index 19fbfb4..0000000 --- a/consumer_pulls_cached/src/main/scala/com/paulbutcher/Producer.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -case class RequestWork() - -class Producer extends Actor { - - val pages = Pages(100000, "enwiki.xml") - - def receive = { - case RequestWork => - if (pages.hasNext) - sender ! pages.next - else - sender ! PoisonPill - } -} \ No newline at end of file diff --git a/consumer_pulls_cached/src/main/scala/com/paulbutcher/WordCount.scala b/consumer_pulls_cached/src/main/scala/com/paulbutcher/WordCount.scala deleted file mode 100644 index f42a848..0000000 --- a/consumer_pulls_cached/src/main/scala/com/paulbutcher/WordCount.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -object WordCount extends App { - - val numConsumers = args(0).toInt - - val system = ActorSystem("WordCount") - val startTime = System.currentTimeMillis - system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) - - system.actorOf(Props[Master], "master") -} \ No newline at end of file diff --git a/producer_pushes_bounded_queue/src/main/resources/application.conf b/producer_pushes_bounded_queue/src/main/resources/application.conf deleted file mode 100644 index 9e33ce8..0000000 --- a/producer_pushes_bounded_queue/src/main/resources/application.conf +++ /dev/null @@ -1,4 +0,0 @@ -consumer-dispatcher { - type = BalancingDispatcher - mailbox-capacity = 100 -} diff --git a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Master.scala b/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Master.scala deleted file mode 100644 index 7af02f1..0000000 --- a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Master.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.paulbutcher - -import akka.actor._ -import akka.routing.RoundRobinRouter - -class Master extends Actor { - - val consumers = context.actorOf(Props[Consumer]. - withRouter(RoundRobinRouter(WordCount.numConsumers)). - withDispatcher("consumer-dispatcher"), "consumers") - val producer = context.actorOf(Props(new Producer(consumers)), "producer") - - context.watch(consumers) - - def receive = { - case Terminated(`consumers`) => context.system.shutdown - } -} \ No newline at end of file diff --git a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Producer.scala b/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Producer.scala deleted file mode 100644 index 8b6bb5d..0000000 --- a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Producer.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -case class Produce() - -class Producer(consumers: ActorRef) extends Actor { - - val pages = Pages(100000, "enwiki.xml") - self ! Produce - - def receive = { - case Produce => - for (page <- pages) - consumers ! page - consumers ! PoisonPill - } -} \ No newline at end of file diff --git a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/WordCount.scala b/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/WordCount.scala deleted file mode 100644 index f42a848..0000000 --- a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/WordCount.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -object WordCount extends App { - - val numConsumers = args(0).toInt - - val system = ActorSystem("WordCount") - val startTime = System.currentTimeMillis - system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) - - system.actorOf(Props[Master], "master") -} \ No newline at end of file diff --git a/producer_pushes_unbounded_queue/src/main/resources/application.conf b/producer_pushes_unbounded_queue/src/main/resources/application.conf deleted file mode 100644 index 6b8bfa0..0000000 --- a/producer_pushes_unbounded_queue/src/main/resources/application.conf +++ /dev/null @@ -1,3 +0,0 @@ -consumer-dispatcher { - type = BalancingDispatcher -} \ No newline at end of file diff --git a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Master.scala b/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Master.scala deleted file mode 100644 index 7af02f1..0000000 --- a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Master.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.paulbutcher - -import akka.actor._ -import akka.routing.RoundRobinRouter - -class Master extends Actor { - - val consumers = context.actorOf(Props[Consumer]. - withRouter(RoundRobinRouter(WordCount.numConsumers)). - withDispatcher("consumer-dispatcher"), "consumers") - val producer = context.actorOf(Props(new Producer(consumers)), "producer") - - context.watch(consumers) - - def receive = { - case Terminated(`consumers`) => context.system.shutdown - } -} \ No newline at end of file diff --git a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/WordCount.scala b/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/WordCount.scala deleted file mode 100644 index f42a848..0000000 --- a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/WordCount.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.paulbutcher - -import akka.actor._ - -object WordCount extends App { - - val numConsumers = args(0).toInt - - val system = ActorSystem("WordCount") - val startTime = System.currentTimeMillis - system.registerOnTermination(println(s"Elapsed time: ${System.currentTimeMillis - startTime}")) - - system.actorOf(Props[Master], "master") -} \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index ef94286..56aede0 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -11,34 +11,9 @@ object WordCountBuild extends Build { scalacOptions ++= Seq("-deprecation", "-unchecked", "-feature")) lazy val core = Project( - "core", - file("core"), + "WordCount", + file("."), settings = buildSettings ++ Seq( - name := "core", + name := "WordCount", libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.1.0")) - - lazy val producer_pushes_bounded_queue = Project( - "producer_pushes_bounded_queue", - file("producer_pushes_bounded_queue"), - settings = buildSettings ++ Seq(name := "producer_pushes_bounded_queue")) dependsOn(core) - - lazy val producer_pushes_unbounded_queue = Project( - "producer_pushes_unbounded_queue", - file("producer_pushes_unbounded_queue"), - settings = buildSettings ++ Seq(name := "producer_pushes_unbounded_queue")) dependsOn(core) - - lazy val consumer_pulls = Project( - "consumer_pulls", - file("consumer_pulls"), - settings = buildSettings ++ Seq(name := "consumer_pulls")) dependsOn(core) - - lazy val consumer_pulls_batched = Project( - "consumer_pulls_batched", - file("consumer_pulls_batched"), - settings = buildSettings ++ Seq(name := "consumer_pulls_batched")) dependsOn(core) - - lazy val consumer_pulls_cached = Project( - "consumer_pulls_cached", - file("consumer_pulls_cached"), - settings = buildSettings ++ Seq(name := "consumer_pulls_cached")) dependsOn(core) } \ No newline at end of file diff --git a/src/main/scala/com/paulbutcher/Cache.scala b/src/main/scala/com/paulbutcher/Cache.scala new file mode 100644 index 0000000..1f9be0c --- /dev/null +++ b/src/main/scala/com/paulbutcher/Cache.scala @@ -0,0 +1,32 @@ +package com.paulbutcher + +import akka.actor._ +import collection.mutable.Queue + +class Cache(producer: ActorRef, batchSize: Int) extends Actor { + + val workQueue = Queue[Work]() + val workerQueue = Queue[ActorRef]() + + for (i <- 0 to 100) + producer ! RequestWork(batchSize) + + def sendToWorker(worker: ActorRef, work: Work) { + worker ! work + producer ! RequestWork(batchSize) + } + + def receive = { + case RequestWork(_) => + if (workQueue.nonEmpty) + sendToWorker(sender, workQueue.dequeue) + else + workerQueue.enqueue(sender) + + case work: Work => + if (workerQueue.nonEmpty) + sendToWorker(workerQueue.dequeue, work) + else + workQueue.enqueue(work) + } +} \ No newline at end of file diff --git a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Consumer.scala b/src/main/scala/com/paulbutcher/ConsumerFlowControl.scala similarity index 86% rename from producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Consumer.scala rename to src/main/scala/com/paulbutcher/ConsumerFlowControl.scala index 0ad3b60..f80a150 100644 --- a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Consumer.scala +++ b/src/main/scala/com/paulbutcher/ConsumerFlowControl.scala @@ -3,7 +3,7 @@ package com.paulbutcher import akka.actor._ import collection.mutable.HashMap -class Consumer extends Actor { +class ConsumerFlowControl extends Actor { val counts = new HashMap[String, Int].withDefaultValue(0) diff --git a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Consumer.scala b/src/main/scala/com/paulbutcher/ConsumerPull.scala similarity index 70% rename from consumer_pulls_batched/src/main/scala/com/paulbutcher/Consumer.scala rename to src/main/scala/com/paulbutcher/ConsumerPull.scala index 64ebf8e..f6bf047 100644 --- a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Consumer.scala +++ b/src/main/scala/com/paulbutcher/ConsumerPull.scala @@ -5,12 +5,12 @@ import collection.mutable.HashMap case class Work(pages: Array[Page]) -class Consumer(producer: ActorRef) extends Actor { +class ConsumerPull(producer: ActorRef, batchSize: Int) extends Actor { val counts = new HashMap[String, Int].withDefaultValue(0) override def preStart() { - producer ! RequestWork(WordCount.batchSize) + producer ! RequestWork(batchSize) } def receive = { @@ -18,6 +18,6 @@ class Consumer(producer: ActorRef) extends Actor { for (Page(title, text) <- pages) for (word <- Words(text)) counts(word) += 1 - producer ! RequestWork(WordCount.batchSize) + producer ! RequestWork(batchSize) } } \ No newline at end of file diff --git a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Consumer.scala b/src/main/scala/com/paulbutcher/ConsumerPush.scala similarity index 87% rename from producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Consumer.scala rename to src/main/scala/com/paulbutcher/ConsumerPush.scala index f20df0e..3e51c83 100644 --- a/producer_pushes_bounded_queue/src/main/scala/com/paulbutcher/Consumer.scala +++ b/src/main/scala/com/paulbutcher/ConsumerPush.scala @@ -3,7 +3,7 @@ package com.paulbutcher import akka.actor._ import collection.mutable.HashMap -class Consumer extends Actor { +class ConsumerPush extends Actor { val counts = new HashMap[String, Int].withDefaultValue(0) diff --git a/src/main/scala/com/paulbutcher/Main.scala b/src/main/scala/com/paulbutcher/Main.scala new file mode 100644 index 0000000..95c27b7 --- /dev/null +++ b/src/main/scala/com/paulbutcher/Main.scala @@ -0,0 +1,49 @@ +package com.paulbutcher + +import akka.actor._ +import com.typesafe.config.ConfigFactory + +case class Run(name: String, actor: () => Actor, config: (Int) => String) + +object Benchmarks extends App { + + val runs = Seq( + Run("push_rr", WordCountPush.getActor _, WordCountPush.getConfig("round-robin", "Dispatcher") _), + Run("push_sm", WordCountPush.getActor _, WordCountPush.getConfig("smallest-mailbox", "Dispatcher") _), + Run("push_bal", WordCountPush.getActor _, WordCountPush.getConfig("round-robin", "BalancingDispatcher") _), + Run("flow_rr", WordCountFlowControl.getActor _, WordCountFlowControl.getConfig("round-robin", "Dispatcher") _), + Run("flow_sm", WordCountFlowControl.getActor _, WordCountFlowControl.getConfig("smallest-mailbox", "Dispatcher") _), + Run("flow_bal", WordCountFlowControl.getActor _, WordCountFlowControl.getConfig("round-robin", "BalancingDispatcher") _), + Run("pull_1", WordCountPull.getActor(1) _, WordCountPull.getConfig _), + Run("pull_10", WordCountPull.getActor(10) _, WordCountPull.getConfig _), + Run("pull_20", WordCountPull.getActor(20) _, WordCountPull.getConfig _), + Run("pull_50", WordCountPull.getActor(20) _, WordCountPull.getConfig _), + Run("pull_cached", WordCountCached.getActor(1) _, WordCountCached.getConfig _) + ) + + while (true) + runs foreach execute _ + + def execute(run: Run) { + val times = for (i <- 1 to 8) yield { + System.gc + Thread.sleep(30000) + execute(run, i) + } + + println(s"${run.name},${times.mkString(",")}") + } + + def execute(run: Run, numConsumers: Int) = { + + val config = ConfigFactory.parseString(run.config(numConsumers)) + + val system = ActorSystem("Benchmarks", ConfigFactory.load(config)) + val startTime = System.currentTimeMillis + + system.actorOf(Props(run.actor()), "WordCount") + system.awaitTermination + + System.currentTimeMillis - startTime + } +} \ No newline at end of file diff --git a/core/src/main/scala/com/paulbutcher/Pages.scala b/src/main/scala/com/paulbutcher/Pages.scala similarity index 100% rename from core/src/main/scala/com/paulbutcher/Pages.scala rename to src/main/scala/com/paulbutcher/Pages.scala diff --git a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Producer.scala b/src/main/scala/com/paulbutcher/ProducerFlowControl.scala similarity index 75% rename from producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Producer.scala rename to src/main/scala/com/paulbutcher/ProducerFlowControl.scala index ae0b1a4..ff6ce2b 100644 --- a/producer_pushes_unbounded_queue/src/main/scala/com/paulbutcher/Producer.scala +++ b/src/main/scala/com/paulbutcher/ProducerFlowControl.scala @@ -4,7 +4,7 @@ import akka.actor._ case class Processed() -class Producer(consumers: ActorRef) extends Actor { +class ProducerFlowControl(consumers: ActorRef) extends Actor { val pages = Pages(100000, "enwiki.xml") @@ -16,6 +16,6 @@ class Producer(consumers: ActorRef) extends Actor { if (pages.hasNext) consumers ! pages.next else - consumers ! PoisonPill + context.stop(self) } } \ No newline at end of file diff --git a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Producer.scala b/src/main/scala/com/paulbutcher/ProducerPull.scala similarity index 81% rename from consumer_pulls_batched/src/main/scala/com/paulbutcher/Producer.scala rename to src/main/scala/com/paulbutcher/ProducerPull.scala index fba66f5..0350f05 100644 --- a/consumer_pulls_batched/src/main/scala/com/paulbutcher/Producer.scala +++ b/src/main/scala/com/paulbutcher/ProducerPull.scala @@ -4,7 +4,7 @@ import akka.actor._ case class RequestWork(batchSize: Int) -class Producer extends Actor { +class ProducerPull extends Actor { val pages = Pages(100000, "enwiki.xml") @@ -13,6 +13,6 @@ class Producer extends Actor { if (pages.hasNext) sender ! Work(pages.take(batchSize).toArray) else - sender ! PoisonPill + context.stop(self) } } \ No newline at end of file diff --git a/src/main/scala/com/paulbutcher/ProducerPush.scala b/src/main/scala/com/paulbutcher/ProducerPush.scala new file mode 100644 index 0000000..9ca75cb --- /dev/null +++ b/src/main/scala/com/paulbutcher/ProducerPush.scala @@ -0,0 +1,15 @@ +package com.paulbutcher + +import akka.actor._ + +class ProducerPush(consumers: ActorRef) extends Actor { + + val pages = Pages(100000, "enwiki.xml") + for (page <- pages) + consumers ! page + context.stop(self) + + def receive = { + case _ => ??? + } +} \ No newline at end of file diff --git a/src/main/scala/com/paulbutcher/WordCountCached.scala b/src/main/scala/com/paulbutcher/WordCountCached.scala new file mode 100644 index 0000000..3b90211 --- /dev/null +++ b/src/main/scala/com/paulbutcher/WordCountCached.scala @@ -0,0 +1,35 @@ +package com.paulbutcher + +import akka.actor._ +import akka.routing.{Broadcast, FromConfig} + +class WordCountCached(batchSize: Int) extends Actor { + + val producer = context.actorOf(Props[ProducerPull], "producer") + val cache = context.actorOf(Props(new Cache(producer, batchSize))) + val consumers = context.actorOf(Props(new ConsumerPull(cache, batchSize)). + withRouter(FromConfig()), "consumers") + + context.watch(consumers) + context.watch(cache) + context.watch(producer) + + def receive = { + case Terminated(`producer`) => cache ! PoisonPill + case Terminated(`cache`) => consumers ! Broadcast(PoisonPill) + case Terminated(`consumers`) => context.system.shutdown + } +} + +object WordCountCached { + + def getActor(batchSize: Int)() = new WordCountCached(batchSize) + + def getConfig(numConsumers: Int) = s""" + akka.actor.deployment { + /WordCount/consumers { + router = round-robin + nr-of-instances = $numConsumers + } + }""" +} \ No newline at end of file diff --git a/src/main/scala/com/paulbutcher/WordCountFlowControl.scala b/src/main/scala/com/paulbutcher/WordCountFlowControl.scala new file mode 100644 index 0000000..292bdb4 --- /dev/null +++ b/src/main/scala/com/paulbutcher/WordCountFlowControl.scala @@ -0,0 +1,37 @@ +package com.paulbutcher + +import akka.actor._ +import akka.routing.{Broadcast, FromConfig} + +class WordCountFlowControl extends Actor { + + val consumers = context.actorOf(Props[ConsumerFlowControl]. + withRouter(FromConfig()). + withDispatcher("consumer-dispatcher"), "consumers") + val producer = context.actorOf(Props(new ProducerFlowControl(consumers)), "producer") + + context.watch(consumers) + context.watch(producer) + + def receive = { + case Terminated(`producer`) => consumers ! Broadcast(PoisonPill) + case Terminated(`consumers`) => context.system.shutdown + } +} + +object WordCountFlowControl { + + def getActor() = new WordCountFlowControl + + def getConfig(routerType: String, dispatcherType: String)(numConsumers: Int) = s""" + akka.actor.deployment { + /WordCount/consumers { + router = $routerType + nr-of-instances = $numConsumers + } + } + + consumer-dispatcher { + type = $dispatcherType + }""" +} \ No newline at end of file diff --git a/src/main/scala/com/paulbutcher/WordCountPull.scala b/src/main/scala/com/paulbutcher/WordCountPull.scala new file mode 100644 index 0000000..292dc71 --- /dev/null +++ b/src/main/scala/com/paulbutcher/WordCountPull.scala @@ -0,0 +1,32 @@ +package com.paulbutcher + +import akka.actor._ +import akka.routing.{Broadcast, FromConfig} + +class WordCountPull(batchSize: Int) extends Actor { + + val producer = context.actorOf(Props[ProducerPull], "producer") + val consumers = context.actorOf(Props(new ConsumerPull(producer, batchSize)). + withRouter(FromConfig()), "consumers") + + context.watch(consumers) + context.watch(producer) + + def receive = { + case Terminated(`producer`) => consumers ! Broadcast(PoisonPill) + case Terminated(`consumers`) => context.system.shutdown + } +} + +object WordCountPull { + + def getActor(batchSize: Int)() = new WordCountPull(batchSize) + + def getConfig(numConsumers: Int) = s""" + akka.actor.deployment { + /WordCount/consumers { + router = round-robin + nr-of-instances = $numConsumers + } + }""" +} \ No newline at end of file diff --git a/src/main/scala/com/paulbutcher/WordCountPush.scala b/src/main/scala/com/paulbutcher/WordCountPush.scala new file mode 100644 index 0000000..5c86613 --- /dev/null +++ b/src/main/scala/com/paulbutcher/WordCountPush.scala @@ -0,0 +1,38 @@ +package com.paulbutcher + +import akka.actor._ +import akka.routing.{Broadcast, FromConfig} + +class WordCountPush extends Actor { + + val consumers = context.actorOf(Props[ConsumerPush]. + withRouter(FromConfig()). + withDispatcher("consumer-dispatcher"), "consumers") + val producer = context.actorOf(Props(new ProducerPush(consumers)), "producer") + + context.watch(consumers) + context.watch(producer) + + def receive = { + case Terminated(`producer`) => consumers ! Broadcast(PoisonPill) + case Terminated(`consumers`) => context.system.shutdown + } +} + +object WordCountPush { + + def getActor() = new WordCountPush + + def getConfig(routerType: String, dispatcherType: String)(numConsumers: Int) = s""" + akka.actor.deployment { + /WordCount/consumers { + router = $routerType + nr-of-instances = $numConsumers + } + } + + consumer-dispatcher { + type = $dispatcherType + mailbox-capacity = 100 + }""" +} \ No newline at end of file diff --git a/core/src/main/scala/com/paulbutcher/Words.scala b/src/main/scala/com/paulbutcher/Words.scala similarity index 100% rename from core/src/main/scala/com/paulbutcher/Words.scala rename to src/main/scala/com/paulbutcher/Words.scala