Skip to content

Commit

Permalink
Merge pull request mDialog#28 from damienlevin/master
Browse files Browse the repository at this point in the history
Listeners for state changes are now cleaned up
  • Loading branch information
chrisdinn committed Apr 21, 2014
2 parents 77407f2 + 5cb05d0 commit 111eaa2
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "brando"

organization := "com.digital-achiever"

version := "1.0.0"
version := "1.0.1"

scalaVersion := "2.10.3"

Expand Down
21 changes: 14 additions & 7 deletions src/main/scala/Brando.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package brando

import akka.actor.{ Actor, ActorContext, ActorRef, Props, Status, Stash }
import akka.actor.{ Actor, ActorContext, ActorRef, Props, Status, Stash, Terminated }
import akka.io.{ IO, Tcp }
import akka.pattern.ask
import akka.util.{ ByteString, Timeout }
Expand Down Expand Up @@ -119,7 +119,7 @@ class Brando(
port: Int,
database: Option[Int],
auth: Option[String],
listeners: Set[ActorRef]) extends Actor with Stash {
private[brando] var listeners: Set[ActorRef]) extends Actor with Stash {
import context.dispatcher

val config = context.system.settings.config
Expand All @@ -136,21 +136,23 @@ class Brando(
val connection = context.actorOf(Props(classOf[Connection],
self, address, connectionRetry, maxConnectionAttempts))

def receive = disconnected
listeners.map(context.watch(_))

def receive = disconnected orElse cleanListeners

def authenticated: Receive = {
case request: Request connection forward request
case x: Tcp.ConnectionClosed
notifyStateChange(Disconnected)
context.become(disconnected)
context.become(disconnected orElse cleanListeners)
}

def disconnected: Receive = {
case request: Request stash()

case x: Tcp.Connected

context.become(authenticating)
context.become(authenticating orElse cleanListeners)

(for {
auth if (auth.isDefined) connection ? Request(ByteString("AUTH"), ByteString(auth.get)) else Future.successful()
Expand All @@ -171,18 +173,23 @@ class Brando(

case x: Tcp.ConnectionClosed
notifyStateChange(Disconnected)
context.become(disconnected)
context.become(disconnected orElse cleanListeners)

case Connected
unstashAll()
notifyStateChange(Connected)
context.become(authenticated)
context.become(authenticated orElse cleanListeners)

case AuthenticationFailed
notifyStateChange(AuthenticationFailed)

}

def cleanListeners: Receive = {
case Terminated(l)
listeners = listeners - l
}

private def notifyStateChange(newState: BrandoStateChange) {
listeners foreach { _ ! newState }
}
Expand Down
8 changes: 6 additions & 2 deletions src/main/scala/ShardManager.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package brando

import akka.actor.{ Actor, ActorRef, Props }
import akka.actor.{ Actor, ActorRef, Props, Terminated }
import akka.util.ByteString
import collection.mutable
import java.util.zip.CRC32
Expand Down Expand Up @@ -34,12 +34,13 @@ object ShardManager {
class ShardManager(
shards: Seq[Shard],
hashFunction: (Array[Byte] Long),
val listeners: Set[ActorRef] = Set()) extends Actor {
private[brando] var listeners: Set[ActorRef] = Set()) extends Actor {

val pool = mutable.Map.empty[String, ActorRef]
val shardLookup = mutable.Map.empty[ActorRef, Shard]

shards.map(create(_))
listeners.map(context.watch(_))

def receive = {

Expand All @@ -64,6 +65,9 @@ class ShardManager(
case None println("Update received for unknown shard actorRef " + sender + "\r\n")
}

case Terminated(l)
listeners = listeners - l

case x println("ShardManager received unexpected " + x + "\r\n")
}

Expand Down
15 changes: 15 additions & 0 deletions src/test/scala/HealthMonitorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ class HealthMonitorTest extends TestKit(ActorSystem("HealthMonitorTest"))
manager ! PoisonPill
}

it("should cleaned up any dead listeners") {

val probe1 = TestProbe()
val probe2 = TestProbe()

val brando = TestActorRef(new Brando("localhost", 6379, None, None, listeners = Set(probe1.ref, probe2.ref))).underlyingActor
assertResult(2)(brando.listeners.size)

probe1.ref ! PoisonPill
probe2.expectMsg(Connected)

assertResult(1)(brando.listeners.size)

}

it("should restart the shard, and notify, when healthcheck fails") {
val probe = TestProbe()
val listener = TestProbe()
Expand Down
16 changes: 8 additions & 8 deletions src/test/scala/ResponseTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,51 @@ class ResponseTest extends FunSpec {
describe("Utf8String") {
it("should extract an utf8 String from a ByteString") {
val ok = Response.AsString.unapply(Some(ByteString("ok")))
expectResult(Some("ok"))(ok)
assertResult(Some("ok"))(ok)
}
}

describe("Strings") {
it("should extract a list of string from a option bytestring list ") {
val resp = Some(List(Some(ByteString("l1")), Some(ByteString("l2")), Some(ByteString("l3"))))
val seq = Response.AsStrings.unapply(resp)
expectResult(Some(Seq("l1", "l2", "l3")))(seq)
assertResult(Some(Seq("l1", "l2", "l3")))(seq)
}

it("shouldn't extract if it is another type") {
val seq = Response.AsStrings.unapply(Some(List(Some(12L))))
expectResult(None)(seq)
assertResult(None)(seq)
}
}

describe("Bytes Sequences") {
it("should extract a list of string from a option bytestring list ") {
val resp = Some(List(Some(ByteString(0, 1)), Some(ByteString(2, 3))))
val seq = Response.AsByteSeqs.unapply(resp)
expectResult(Some(Seq(Seq(0, 1), Seq(2, 3))))(seq)
assertResult(Some(Seq(Seq(0, 1), Seq(2, 3))))(seq)
}

it("shouldn't extract if it is another type") {
val seq = Response.AsByteSeqs.unapply(Some(List(Some(12L))))
expectResult(None)(seq)
assertResult(None)(seq)
}
}

describe("Strings Hashes") {
it("should extract a map when the result list has an heaven size") {
val resp = Some(List(Some(ByteString("k1")), Some(ByteString("v1")), Some(ByteString("k2")), Some(ByteString("v2"))))
val map = Response.AsStringsHash.unapply(resp)
expectResult(Some(Map("k1" -> "v1", "k2" -> "v2")))(map)
assertResult(Some(Map("k1" -> "v1", "k2" -> "v2")))(map)
}

it("should extract an empty map when the result list is empty") {
val map = Response.AsStringsHash.unapply(Some(List.empty))
expectResult(Some(Map.empty))(map)
assertResult(Some(Map.empty))(map)
}

it("should fails when the result list has an odd size") {
val map = Response.AsStringsHash.unapply(Some(List(Some(ByteString("k1")))))
expectResult(None)(map)
assertResult(None)(map)
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/test/scala/ShardManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,25 @@ class ShardManagerTest extends TestKit(ActorSystem("ShardManagerTest"))
probe.expectMsg(ShardStateChange(shards(1), ConnectionFailed))
}

it("should cleaned up any dead listeners") {

val shards = Seq(
Shard("server1", "localhost", 6379, Some(0)),
Shard("server2", "localhost", 13579, Some(1)))

val probe1 = TestProbe()
val probe2 = TestProbe()

val shardManager = TestActorRef(new ShardManager(shards, ShardManager.defaultHashFunction, Set(probe1.ref, probe2.ref))).underlyingActor
assertResult(2)(shardManager.listeners.size)

probe1.ref ! PoisonPill
probe2.expectMsg(ShardStateChange(shards(0), Connected))

assertResult(1)(shardManager.listeners.size)

}

it("should notify listeners when a shard fails to authenticate") {
val shards = Seq(
Shard("server1", "localhost", 6379, Some(0)),
Expand Down

0 comments on commit 111eaa2

Please sign in to comment.