Skip to content

Commit

Permalink
Merge pull request #32 from dmitri-carpov/master
Browse files Browse the repository at this point in the history
close #21 basic supervision strategy implementation
  • Loading branch information
dmitri-carpov committed May 18, 2016
2 parents 4077393 + ae6abe3 commit 43c4b29
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Jvm.`1.8`.required
// Dependencies
resolvers += Resolver.bintrayRepo("krasserm", "maven")

val akkaVersion = "2.4.1"
val akkaVersion = "2.4.4"
val kafkaVersion = "0.9.0.0"

libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object TracedProcessManager {
class TracedProcessManager(process: StagedProcess,
date: Option[Date],
receiver: ActorRef,
id: Option[String] = None) extends ProcessManager(process, date, _ => ()) {
id: Option[String] = None) extends ProcessManager(process, date) {

override def persistenceId: String = id.getOrElse(super.persistenceId)

Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/com.mediative.eigenflow/EigenflowBootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package com.mediative.eigenflow

import akka.actor.{ Props, ActorSystem }
import akka.event.LoggingAdapter
import akka.actor.ActorSystem
import com.mediative.eigenflow.environment.ConfigurationLoader
import com.mediative.eigenflow.helpers.DateHelper._
import com.mediative.eigenflow.process.ProcessManager
import com.mediative.eigenflow.process.ProcessManager.Continue
import com.mediative.eigenflow.process.ProcessSupervisor
import com.mediative.eigenflow.publisher.MessagingSystem

import scala.util.{ Failure, Success, Try }
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{ Failure, Success }

/**
* Creates the actor system and start processing.
Expand All @@ -38,7 +37,8 @@ trait EigenflowBootstrap {
def process: StagedProcess

// bootstrap the system: initialize akka, message publisher ...
implicit val messagingSystem = Class.forName(ConfigurationLoader.config.getString("eigenflow.messaging")).getConstructor().newInstance().asInstanceOf[MessagingSystem]
implicit val messagingSystem = Class.forName(ConfigurationLoader.config.getString("eigenflow.messaging")).
getConstructor().newInstance().asInstanceOf[MessagingSystem]

// load environment variables
private val startDate = Option(System.getenv("start")).flatMap(parse)
Expand All @@ -47,13 +47,13 @@ trait EigenflowBootstrap {
implicit val system = ActorSystem("DataFlow", ConfigurationLoader.config)

// create main actor and tell to proceed.
system.actorOf(Props(new ProcessManager(process, startDate, stopSystem _))) ! Continue
system.actorOf(ProcessSupervisor.props(process, startDate, stopSystem(system))) ! Continue

/**
* System shutdown logic.
*/
private def stopSystem(code: Int): Unit = {
messagingSystem.stop()
private def stopSystem(system: ActorSystem)(code: Int): Unit = {
Try(messagingSystem.stop())

system.terminate().onComplete {
case Success(_) => System.exit(code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package com.mediative.eigenflow.process

import java.util.Date

import akka.actor.{ ActorLogging, Props }
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{ ActorLogging, OneForOneStrategy, Props, SupervisorStrategy }
import akka.persistence.PersistentActor
import com.mediative.eigenflow.StagedProcess
import com.mediative.eigenflow.environment.ProcessConfiguration
Expand All @@ -45,12 +46,22 @@ private[eigenflow] object ProcessManager {
* The process parent actor which creates FSM actors which actually run processes based on processingDate.
*
*/
private[eigenflow] class ProcessManager(process: StagedProcess, startDate: Option[Date], onTermination: Int => Unit)(implicit val messagingSystem: MessagingSystem) extends PersistentActor with ActorLogging {
private[eigenflow] class ProcessManager(process: StagedProcess, startDate: Option[Date])(implicit val messagingSystem: MessagingSystem) extends PersistentActor with ActorLogging {

import com.mediative.eigenflow.process.ProcessManager._

private val config = ProcessConfiguration.load

override def supervisorStrategy: SupervisorStrategy = {
OneForOneStrategy() {
case t => Escalate
}
}

override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = fail

override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = fail

override def persistenceId: String = s"${config.id}-manager"

override def receiveRecover: Receive = {
Expand Down Expand Up @@ -78,7 +89,7 @@ private[eigenflow] class ProcessManager(process: StagedProcess, startDate: Optio
}
} else {
log.info(s"The process is idle until: ${TimeFormat.format(processingDate)}")
terminateWith(SuccessCode)
done
}
}

Expand All @@ -92,11 +103,9 @@ private[eigenflow] class ProcessManager(process: StagedProcess, startDate: Optio
}

case ProcessFailed =>
terminateWith(FailureCode)
fail
}

private def terminateWith(code: Int) = {
context.stop(self)
onTermination(code)
}
private def fail() = context.parent ! ProcessSupervisor.Failed
private def done() = context.parent ! ProcessSupervisor.Done
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.mediative.eigenflow.process

import java.util.Date

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{ Actor, ActorRef, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
import com.mediative.eigenflow.StagedProcess
import com.mediative.eigenflow.publisher.MessagingSystem

object ProcessSupervisor {
def props(process: StagedProcess, date: Option[Date], stopSystem: Int => Unit)(implicit messagingSystem: MessagingSystem) =
Props(new ProcessSupervisor(process, date, stopSystem))

case object Done
case object Failed
}

class ProcessSupervisor(val process: StagedProcess, val startDate: Option[Date], stopSystem: Int => Unit)(implicit val messagingSystem: MessagingSystem) extends Actor {
private val processManager: ActorRef = context.actorOf(Props(new ProcessManager(process, startDate)))

context.watch(processManager)

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ => Stop
}

override def receive: Receive = {
case ProcessSupervisor.Done => stopSystem(0)
case ProcessSupervisor.Failed | Terminated(`processManager`) =>
stopSystem(1)
case message => processManager.forward(message)
}
}

0 comments on commit 43c4b29

Please sign in to comment.