Skip to content

Commit

Permalink
Fix #5 Update Akka from 2.2.4 to 2.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ngocdaothanh committed Mar 6, 2014
1 parent b966467 commit f5ad16f
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 76 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
1.6:
* #5 Update Akka from 2.2.4 to 2.3.0

1.5:
* #1 Use immutable for handover data
* #2 Update Akka from 2.2.3 to 2.2.4
Expand Down
11 changes: 6 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Glokka is a Scala library that provides registration of global "name to actor
mapping" for Akka cluster. See:

* `Erlang's "global" module <http://erlang.org/doc/man/global.html>`_
* `Akka's cluster feature <http://doc.akka.io/docs/akka/2.2.3/scala/cluster-usage.html>`_
* `Akka's cluster feature <http://doc.akka.io/docs/akka/2.3.0/scala/cluster-usage.html>`_

Glokka is used in `Xitrum <http://ngocdaothanh.github.io/xitrum/>`_ to implement
its distributed `SockJS <https://github.com/sockjs/sockjs-client>`_ feature.
Expand All @@ -18,7 +18,7 @@ In your SBT project's build.sbt:

::

libraryDependencies += "tv.cntt" %% "glokka" % "1.5"
libraryDependencies += "tv.cntt" %% "glokka" % "1.6"

In your Scala code:

Expand Down Expand Up @@ -140,7 +140,7 @@ Glokka can run in Akka non-cluster mode (local or remote). While developing, you
can run Akka in local mode, then later config Akka to run in cluster mode.

In cluster mode, Glokka uses
`Akka's Cluster Singleton Pattern <http://doc.akka.io/docs/akka/2.2.3/contrib/cluster-singleton.html>`_
`Akka's Cluster Singleton Pattern <http://doc.akka.io/docs/akka/2.3.0/contrib/cluster-singleton.html>`_
to maintain an actor that stores the name -> actorRef lookup table.

Akka config file for a node should look like this (note "ClusterSystem" in the
Expand All @@ -153,11 +153,12 @@ source code example above and the config below):
provider = "akka.cluster.ClusterActorRefProvider"
}

# This node
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
port = 2551 # 0 means random port
}
}

Expand All @@ -166,6 +167,6 @@ source code example above and the config below):
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]

auto-down = on
auto-down-unreachable-after = 10s
}
}
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ organization := "tv.cntt"

name := "glokka"

version := "1.5-SNAPSHOT"
version := "1.6-SNAPSHOT"

scalaVersion := "2.10.3"

Expand All @@ -14,11 +14,11 @@ scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked")
// java.lang.UnsupportedClassVersionError: Unsupported major.minor version 51.0
javacOptions ++= Seq("-source", "1.6", "-target", "1.6")

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.4"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.0"

libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.2.4"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.3.0"

libraryDependencies += "com.typesafe.akka" %% "akka-contrib" % "2.2.4"
libraryDependencies += "com.typesafe.akka" %% "akka-contrib" % "2.3.0"

libraryDependencies += "org.specs2" %% "specs2" % "2.3.8" % "test"

Expand Down
22 changes: 5 additions & 17 deletions src/main/scala/glokka/ClusterSingletonProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ private object ClusterSingletonProxy {
val SINGLETON_NAME = URLEncoder.encode("GlokkaActorRegistry", "UTF-8")
}

private object HandOver
private case class HandOverData(name2Ref: Map[String, ActorRef], ref2Names: Map[ActorRef, Set[String]])
private case object TerminateRegistry

private class ClusterSingletonProxy(proxyName: String) extends Actor {
import ClusterSingletonProxy._
Expand All @@ -27,22 +26,10 @@ private class ClusterSingletonProxy(proxyName: String) extends Actor {
override def preStart() {
Cluster(context.system).subscribe(self, classOf[ClusterEvent.ClusterDomainEvent])

val singletonPropsFactory: Option[Any] => Props = handOverData => {
handOverData match {
case Some(HandOverData(name2Ref, ref2Names)) =>
Props(classOf[Registry], false, name2Ref, ref2Names)

case _ =>
val name2Ref = Map.empty[String, ActorRef]
val ref2Names = Map.empty[ActorRef, Set[String]]
Props(classOf[Registry], false, name2Ref, ref2Names)
}
}

val proxyProps = ClusterSingletonManager.props(
singletonProps = singletonPropsFactory,
singletonProps = Props(classOf[Registry], false),
singletonName = SINGLETON_NAME,
terminationMessage = HandOver,
terminationMessage = TerminateRegistry,
role = None
)

Expand Down Expand Up @@ -74,7 +61,8 @@ private class ClusterSingletonProxy(proxyName: String) extends Actor {
membersByAge -= m

case other =>
leader.foreach { _.tell(other, sender) }
val s = sender()
leader.foreach { _.tell(other, s) }
}

private def leader: Option[ActorSelection] =
Expand Down
77 changes: 30 additions & 47 deletions src/main/scala/glokka/Registry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package glokka
import scala.collection.mutable.{ArrayBuffer, HashMap => MHashMap, MultiMap => MMultiMap, Set => MSet}
import scala.concurrent.duration.{FiniteDuration, SECONDS}

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Cancellable, Props, Terminated}
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Cancellable, PoisonPill, Props, Terminated}
import com.typesafe.config.ConfigFactory

object Registry {
Expand Down Expand Up @@ -39,7 +39,7 @@ object Registry {

// Using mutable data structures internally in actor is OK (for speed).

private case class PendingMsg(sender: ActorRef, msg: Any)
private case class PendingMsg(poster: ActorRef, msg: Any)

// Key-value, key is actor name
private case class PendingCreateValue(creator: ActorRef, msgs: ArrayBuffer[PendingMsg], cancellable: Cancellable)
Expand All @@ -58,20 +58,6 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
// Key is actor name
private val pendingCreateReqs = new MHashMap[String, PendingCreateValue]

/** This constructor is for data handover in cluster mode. */
def this(
localMode: Boolean,
immutableName2Ref: Map[String, ActorRef], immutableRef2Names: Map[ActorRef, Set[String]]
) {
this(localMode)

// Convert immutable to mutable
name2Ref ++= immutableName2Ref
immutableRef2Names.foreach { case (k, set) =>
set.foreach { v => ref2Names.addBinding(k, v) }
}
}

override def preStart() {
if (localMode)
log.info("Glokka actor registry starts in local mode")
Expand All @@ -97,12 +83,12 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
doLookupOrCreate(name, timeout)

case Some(PendingCreateValue(_, msgs, _)) =>
msgs.append(PendingMsg(sender, msg))
msgs.append(PendingMsg(sender(), msg))
}

case CancelCreate(name) =>
// Only the one who sent LookupOrCreate can now cancel
doCancel(name, sender, false)
doCancel(name, sender(), false)

case TimeoutCreate(name, maybeTheCreator) =>
doCancel(name, maybeTheCreator, true)
Expand All @@ -113,15 +99,16 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
doRegister(name, ref)

case Some(PendingCreateValue(creator, msgs, cancellable)) =>
if (sender == creator) {
val s = sender()
if (s == creator) {
cancellable.cancel() // Do this as soon as possible

doRegister(name, ref)

pendingCreateReqs.remove(name)
msgs.foreach { msg => self.tell(msg.msg, msg.sender) }
msgs.foreach { msg => self.tell(msg.msg, msg.poster) }
} else {
msgs.append(PendingMsg(sender, msg))
msgs.append(PendingMsg(s, msg))
}
}

Expand All @@ -137,7 +124,7 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
doLookup(name)

case Some(PendingCreateValue(_, msgs, _)) =>
msgs.append(PendingMsg(sender, msg))
msgs.append(PendingMsg(sender(), msg))
}

case Terminated(ref) =>
Expand All @@ -146,14 +133,9 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
}

// Only used in cluster mode, see ClusterSingletonProxy
case HandOver =>
// Convert mutable to immutable
val immutableName2Ref = name2Ref.toMap
val immutableRef2Names = ref2Names.toMap.mapValues { mset => mset.toSet }

// Reply to ClusterSingletonManager with hand over data,
// which will be passed as parameter to new consumer singleton
context.parent ! (immutableName2Ref, immutableRef2Names)
case TerminateRegistry =>
// For consistency, tell all actors in this registry to stop
ref2Names.keys.foreach(_ ! PoisonPill)

context.stop(self)
}
Expand All @@ -166,32 +148,33 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
doRegister(msg.name, msg.system, msg.props)

case Some(PendingCreateValue(creator, msgs, cancellable)) =>
if (sender == creator) {
val s = sender()
if (s == creator) {
cancellable.cancel() // Do this as soon as possible

doRegister(msg.name, msg.system, msg.props)

pendingCreateReqs.remove(msg.name)
msgs.foreach { msg => self.tell(msg.msg, msg.sender) }
msgs.foreach { msg => self.tell(msg.msg, msg.poster) }
} else {
msgs.append(PendingMsg(sender, msg))
msgs.append(PendingMsg(s, msg))
}
}
}

private def doLookupOrCreate(name: String, timeoutInSeconds: Int) {
val s = sender()
name2Ref.get(name) match {
case Some(ref) =>
sender ! LookupResultOk(name, ref)
s ! LookupResultOk(name, ref)

case None =>
val delay = FiniteDuration(timeoutInSeconds, SECONDS)
val msg = TimeoutCreate(name, sender)
import context.dispatcher
val cancellable = context.system.scheduler.scheduleOnce(delay, self, msg)
val msg = TimeoutCreate(name, s)
val cancellable = context.system.scheduler.scheduleOnce(delay, self, msg)(context.dispatcher)

sender ! LookupResultNone(name)
pendingCreateReqs(name) = PendingCreateValue(sender, ArrayBuffer(), cancellable)
s ! LookupResultNone(name)
pendingCreateReqs(name) = PendingCreateValue(s, ArrayBuffer(), cancellable)
}
}

Expand All @@ -201,7 +184,7 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
if (!becauseOfTimeout) cancellable.cancel() // Do this as soon as possible

pendingCreateReqs.remove(name)
msgs.foreach { msg => self.tell(msg.msg, msg.sender) }
msgs.foreach { msg => self.tell(msg.msg, msg.poster) }
}
}
}
Expand All @@ -210,12 +193,12 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
name2Ref.get(name) match {
case Some(oldRef) =>
if (oldRef == ref)
sender ! RegisterResultOk(name, oldRef)
sender() ! RegisterResultOk(name, oldRef)
else
sender ! RegisterResultConflict(name, oldRef)
sender() ! RegisterResultConflict(name, oldRef)

case None =>
sender ! RegisterResultOk(name, ref)
sender() ! RegisterResultOk(name, ref)
context.watch(ref)
name2Ref(name) = ref
ref2Names.addBinding(ref, name)
Expand All @@ -225,11 +208,11 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
private def doRegister(name: String, system: ActorSystem, props: Props) {
name2Ref.get(name) match {
case Some(oldRef) =>
sender ! RegisterResultConflict(name, oldRef)
sender() ! RegisterResultConflict(name, oldRef)

case None =>
val ref = system.actorOf(props)
sender ! RegisterResultOk(name, ref)
sender() ! RegisterResultOk(name, ref)
context.watch(ref)
name2Ref(name) = ref
ref2Names.addBinding(ref, name)
Expand All @@ -239,10 +222,10 @@ private class Registry(localMode: Boolean) extends Actor with ActorLogging {
private def doLookup(name: String) {
name2Ref.get(name) match {
case Some(ref) =>
sender ! LookupResultOk(name, ref)
sender() ! LookupResultOk(name, ref)

case None =>
sender ! LookupResultNone(name)
sender() ! LookupResultNone(name)
}
}
}
7 changes: 4 additions & 3 deletions src/test/scala/glokka/LocalSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package glokka

import org.specs2.mutable._

import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration._

Expand All @@ -18,7 +19,7 @@ class LocalSpec extends Specification {
implicit val system = ActorSystem("ClusterSystem")

// For "ask" timeout
implicit val timeout = Timeout(5000)
implicit val timeout = Timeout(5000, TimeUnit.SECONDS)

val registry = Registry.start(system, "test")

Expand Down Expand Up @@ -69,7 +70,7 @@ class LocalSpec extends Specification {
import akka.actor.ActorDSL._

val ref = actor(new Act {
become { case "hello" => sender ! "hi" }
become { case "hello" => sender() ! "hi" }
})

val future = registry ? Registry.RegisterByRef("name", ref)
Expand All @@ -86,7 +87,7 @@ class LocalSpec extends Specification {
import akka.actor.ActorDSL._

val ref = actor(new Act {
become { case "hello" => sender ! "hi" }
become { case "hello" => sender() ! "hi" }
})

val future = registry ? Registry.RegisterByRef("name2", ref)
Expand Down

0 comments on commit f5ad16f

Please sign in to comment.