Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Commit

Permalink
Merge pull request #166 from diogoaurelio/DriverBootstrapEscalationSt…
Browse files Browse the repository at this point in the history
…rategy-plus-Jan

Driver bootstrap escalation strategy (already merged with jan PR)
  • Loading branch information
utzwestermann authored Jun 23, 2017
2 parents a3040e4 + 2b2d439 commit 592b7f4
Show file tree
Hide file tree
Showing 76 changed files with 1,952 additions and 1,480 deletions.
3 changes: 3 additions & 0 deletions schedoscope-core/src/main/resources/fmpp/templates/Parser.jj
Original file line number Diff line number Diff line change
Expand Up @@ -4546,6 +4546,8 @@ SqlBinaryOperator BinaryRowOperator() :
{ return SqlStdOperatorTable.DIVIDE; }
| <MODULO>
{ return SqlStdOperatorTable.DIVIDE; }
| <DIV>
{ return SqlStdOperatorTable.DIVIDE; }
| <CONCAT>
{ return SqlStdOperatorTable.CONCAT; }
| <AND>
Expand Down Expand Up @@ -4761,6 +4763,7 @@ SqlPostfixOperator PostfixRowOperator() :
| < DISPATCH: "DISPATCH" >
| < DISTINCT: "DISTINCT" >
| < DISTRIBUTE: "DISTRIBUTE" >
| < DIV: "DIV" >
| < DOMAIN: "DOMAIN" >
| < DOUBLE: "DOUBLE" >
| < DROP: "DROP" >
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ case class ExternalView(view: View) extends View {

override val isExternal = true

//
// Pass env through to wrapped view
//
override def env_=(env: String): Unit = {
super.env_=(env)
view.env = env
}

//
// Give us a string representation clearly showing this is an external view
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class FieldLike[T: Manifest] extends Named {
}

override def toString: String = assignedStructure match {
case Some(view: View) => s"${view.tableName}.${view.nameOf(this).getOrElse("")}"
case _ => namingBase
case Some(view: View) => s"${view.tableName}.$n"
case _ => n
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ case class Parameter[T: Manifest](orderWeight: Long) extends FieldLike[T] {
override def hashCode(): Int = {
t.hashCode + 7 * v.hashCode()
}

override def toString() = if (v.isDefined) s"Parameter(${v.get})" else super.toString
}

/**
Expand Down
47 changes: 16 additions & 31 deletions schedoscope-core/src/main/scala/org/schedoscope/dsl/View.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,7 @@ abstract class View extends Structure with ViewDsl with DelayedInit {
/**
* The view's environment.
*/
private var _env: String = "dev"

def env: String = _env

def env_=(env: String): Unit = {
_env = env
}
def env = Schedoscope.settings.env

var storageFormat: StorageFormat = TextFile()
var additionalStoragePathPrefix: Option[String] = None
Expand Down Expand Up @@ -238,7 +232,7 @@ abstract class View extends Structure with ViewDsl with DelayedInit {
* current view depends on. This function is returned so that it can be assigned to variables for further reference.
*/
override def dependsOn[V <: View : Manifest](df: () => V) = {
val dsf = () => List(View.register(this.env, df()))
val dsf = () => List(View.register(df()))

dependsOn(dsf)

Expand All @@ -250,9 +244,7 @@ abstract class View extends Structure with ViewDsl with DelayedInit {
* current view depends on.
*/
override def dependsOn[V <: View : Manifest](dsf: () => Seq[V]) {
val df = () => dsf().map {
View.register(this.env, _)
}
val df = () => dsf().map (View.register[V])

deferredDependencies += df
}
Expand Down Expand Up @@ -394,7 +386,7 @@ abstract class View extends Structure with ViewDsl with DelayedInit {
case (influencer, influencee) =>
val ownerView: View = influencee.assignedStructure.get.asInstanceOf[View]
if (ownerView.explicitLineage.isEmpty)
ownerView.explicitLineage = ownerView.fields.map(f => f -> mutable.Set[FieldLike[_]]()).toMap
ownerView.explicitLineage = ownerView.fieldsAndParameters.map(f => f -> mutable.Set[FieldLike[_]]()).toMap

ownerView.explicitLineage(influencee).add(influencer)
}
Expand Down Expand Up @@ -468,19 +460,19 @@ object View {
}

/**
* Instantiate views given an environment and view URL path. A parsed view augmentor can further modify the created views.
* Instantiate views given a view URL path. A parsed view augmentor can further modify the created views.
*/
def viewsFromUrl(env: String, viewUrlPath: String, parsedViewAugmentor: ParsedViewAugmentor = new ParsedViewAugmentor() {}): List[View] =
def viewsFromUrl(viewUrlPath: String, parsedViewAugmentor: ParsedViewAugmentor = new ParsedViewAugmentor() {}): List[View] =
try {
ViewUrlParser
.parse(env, viewUrlPath)
.parse(viewUrlPath)
.map {
parsedViewAugmentor.augment(_)
}
.filter {
_ != null
}
.map { case ParsedView(env, viewClass, parameters) => newView(viewClass, env, parameters: _*) }
.map { case ParsedView(viewClass, parameters) => newView(viewClass, parameters: _*) }
} catch {
case t: Throwable =>
if (t.isInstanceOf[java.lang.reflect.InvocationTargetException]) {
Expand All @@ -491,9 +483,9 @@ object View {
}

/**
* Instantiate a new view given its class name, an environment, and a list of parameter values.
* Instantiate a new view given its class name and a list of parameter values.
*/
def newView[V <: View : Manifest](viewClass: Class[V], env: String, parameterValues: TypedAny*): V = {
def newView[V <: View : Manifest](viewClass: Class[V], parameterValues: TypedAny*): V = {
val viewCompanionObjectClass = Class.forName(viewClass.getName() + "$")
val viewCompanionConstructor = viewCompanionObjectClass.getDeclaredConstructor()
viewCompanionConstructor.setAccessible(true)
Expand Down Expand Up @@ -535,21 +527,14 @@ object View {
parametersToPass += passedValueForParameter.v
}

register(env, viewConstructor.invoke(viewCompanionObject, parametersToPass.asInstanceOf[Seq[Object]]: _*).asInstanceOf[V])
register(viewConstructor.invoke(viewCompanionObject, parametersToPass.asInstanceOf[Seq[Object]]: _*).asInstanceOf[V])
}

private def register[V <: View : Manifest](env: String, v: V): V = this.synchronized {
val registeredView = knownViews.get(v.urlPath) match {
case Some(registeredView) => {
registeredView.asInstanceOf[V]
}
case None => {
knownViews.put(v.urlPath, v)
v
}
}
registeredView.env = env
registeredView
private def register[V <: View : Manifest](v: V): V = this.synchronized {
if (!knownViews.contains(v.urlPath))
knownViews.put(v.urlPath, v)

knownViews(v.urlPath).asInstanceOf[V]
}

private def recursiveDependenciesOf(view: View, soFar: mutable.Set[View] = mutable.Set[View]()): mutable.Set[View] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class NoAugmentation extends ViewUrlParser.ParsedViewAugmentor

object ViewUrlParser {

case class ParsedView(env: String, viewClass: Class[View], parameters: List[TypedAny])
case class ParsedView(viewClass: Class[View], parameters: List[TypedAny])

trait ParsedViewAugmentor {
def augment(pv: ParsedView): ParsedView = pv
Expand Down Expand Up @@ -132,7 +132,7 @@ object ViewUrlParser {
case cnf: ClassNotFoundException => throw new IllegalArgumentException("No class for package and view: " + cnf.getMessage())
}

def parse(env: String, viewUrlPath: String): List[ParsedView] = try {
def parse(viewUrlPath: String): List[ParsedView] = try {
val normalizedPathFront = if (viewUrlPath.startsWith("/"))
viewUrlPath.tail
else
Expand All @@ -154,7 +154,7 @@ object ViewUrlParser {
for {
viewClass <- parseViewClassnames(packageName, viewClassNames)
pl <- parseParameters(parameters)
} yield ParsedView(env, viewClass, pl)
} yield ParsedView(viewClass, pl)

} catch {

Expand Down Expand Up @@ -196,5 +196,5 @@ Quoting:
""")
}

def viewNames(viewUrlPath: String) = parse("dev", viewUrlPath).map(pv => pv.viewClass.getName)
def viewNames(viewUrlPath: String) = parse(viewUrlPath).map(pv => pv.viewClass.getName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ object DependencyAnalyzer {
val validated = try {
planner.validate(parsed)
} catch {
case _: ValidationException =>
log.debug("Trying again with a recursively-built schema...")
case ex: ValidationException =>
log.debug("Trying again with a recursively-built schema...", ex)
planner = new NonFlatteningPlannerImpl(SchedoscopeConfig(view, scanRecursive = true))
planner.validate(planner.parse(firstStmt))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ object DummyOperatorTable extends SqlOperatorTable {

case class FramingSqlRankFunction(name: String) extends SqlRankFunction(name) {
override def allowsFraming(): Boolean = true

override def getOperandCountRange: SqlOperandCountRange = SqlOperandCountRanges.any()
}

case class FramingSqlAggFunction(name: String) extends SqlAggFunction(name,
Expand All @@ -56,6 +58,10 @@ object DummyOperatorTable extends SqlOperatorTable {
"LAG", "LEAD"
).map(n =>
n -> FramingSqlAggFunction(n)
) ++ Seq(
"CONVERT", "TRANSLATE"
).map(n =>
n -> HiveQlFunction(n)
)

override def lookupOperatorOverloads(opName: SqlIdentifier, category: SqlFunctionCategory, syntax: SqlSyntax,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package org.schedoscope.scheduler.actors

import java.security.PrivilegedAction

import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import akka.actor.SupervisorStrategy.{Escalate, Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorRef, OneForOneStrategy, Props, actorRef2Scala}
import akka.event.{Logging, LoggingReceive}
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.hadoop.fs._
Expand Down Expand Up @@ -63,7 +64,8 @@ class DriverActor[T <: Transformation](transformationManagerActor: ActorRef,
try {
driver = driverConstructor(ds)
} catch {
case t: Throwable => throw RetryableDriverException("Driver actor could not initialize driver because driver constructor throws exception (HINT: if Driver Actor start failure behaviour persists, validate the respective transformation driver config in conf file). Restarting driver actor...", t)
// note: whatever is thrown here is packed as an ActorInitializationException to Supervisor
case i: Throwable => throw InvalidDriverClassException(s"Unable to instantiate new Driver for provided transformation: ${i.getMessage}", i)
}
logStateInfo("booted", "DRIVER ACTOR: booted")
}
Expand Down Expand Up @@ -301,6 +303,14 @@ class DriverActor[T <: Transformation](transformationManagerActor: ActorRef,
* Factory methods for driver actors.
*/
object DriverActor {

// used for determining BalancingDispatcher children' Supervision
lazy val driverRouterSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = -1) {
case _: RetryableDriverException => Restart
case _: ActorInitializationException => Stop
case _ => Escalate
}

def props(settings: SchedoscopeSettings, transformationName: String, transformationManager: ActorRef, hdfs: FileSystem): Props =
Props(
classOf[DriverActor[_]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,10 @@ class TransformationManagerActor(settings: SchedoscopeSettings,
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1) {
case _: RetryableDriverException => Restart
case _: ActorInitializationException => Restart
case _: ActorInitializationException => Stop
case _ => Escalate
}

// used for determining BalancingDispatcher children' Supervision
lazy val driverRouterSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = -1) {
case _: RetryableDriverException => Restart
case _: ActorInitializationException => Restart
case _ => Escalate
}

val driverStates = HashMap[String, TransformationStatusResponse[_]]()
val driverActorsBackOffSupervision = new BackOffSupervision(
Expand Down Expand Up @@ -107,7 +101,7 @@ class TransformationManagerActor(settings: SchedoscopeSettings,
actorOf(
SmallestMailboxPool(
nrOfInstances = settings.getDriverSettings(transformation).concurrency,
supervisorStrategy = driverRouterSupervisorStrategy,
supervisorStrategy = DriverActor.driverRouterSupervisorStrategy,
routerDispatcher = "akka.actor.driver-router-dispatcher"
).props(routeeProps = DriverActor.props(settings, transformation, self)),
s"${transformation}-driver"
Expand Down Expand Up @@ -153,6 +147,7 @@ class TransformationManagerActor(settings: SchedoscopeSettings,
* Factory for the actions manager actor.
*/
object TransformationManagerActor {

def props(settings: SchedoscopeSettings,
bootstrapDriverActors: Boolean = true) =
Props(classOf[TransformationManagerActor],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ case class DriverRunFailed[T <: Transformation](override val driver: Driver[T],
* to cause a driver actor restart.
*/
case class RetryableDriverException(message: String = null, cause: Throwable = null) extends RuntimeException(message, cause)

case class InvalidDriverClassException(message: String = null, cause: Throwable = null) extends RuntimeException(message, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object SchedoscopeJsonDataFormat extends DefaultJsonProtocol {
implicit val actionStatusFormat = jsonFormat5(TransformationStatus)
implicit val actionStatusListFormat = jsonFormat2(TransformationStatusList)
implicit val viewTransformationStatusFormat: JsonFormat[ViewTransformationStatus] = lazyFormat(jsonFormat2(ViewTransformationStatus))
implicit val viewStatusFormat: JsonFormat[ViewStatus] = lazyFormat(jsonFormat14(ViewStatus))
implicit val viewStatusFormat: JsonFormat[ViewStatus] = lazyFormat(jsonFormat15(ViewStatus))
implicit val fieldStatusFormat: JsonFormat[FieldStatus] = lazyFormat(jsonFormat3(FieldStatus))
implicit val viewStatusListFormat = jsonFormat2(ViewStatusList)
implicit val queueStatusListFormat = jsonFormat2(QueueStatusList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ case class ViewStatus(
storageFormat: Option[String],
materializeOnce: Option[Boolean],
comment: Option[Option[String]],
isTable: Option[Boolean])
isTable: Option[Boolean],
isExternal: Boolean)

case class FieldStatus(name: String, fieldtype: String, comment: Option[String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import akka.event.Logging
import org.joda.time.format.DateTimeFormat
import org.schedoscope.AskPattern._
import org.schedoscope.conf.SchedoscopeSettings
import org.schedoscope.dsl.View
import org.schedoscope.dsl.{Field, View}
import org.schedoscope.dsl.transformations._
import org.schedoscope.scheduler.actors.ViewManagerActor
import org.schedoscope.scheduler.driver.{DriverRunFailed, DriverRunOngoing, DriverRunState, DriverRunSucceeded}
Expand Down Expand Up @@ -74,7 +74,7 @@ class SchedoscopeServiceImpl(actorSystem: ActorSystem, settings: SchedoscopeSett
}

private def viewsFromUrl(viewUrlPath: String) =
View.viewsFromUrl(settings.env, viewUrlPath, settings.viewAugmentor)
View.viewsFromUrl(viewUrlPath, settings.viewAugmentor)


private def parseQueueElements(q: List[AnyRef]): List[RunStatus] = q.map {
Expand Down Expand Up @@ -142,13 +142,14 @@ class SchedoscopeServiceImpl(actorSystem: ActorSystem, settings: SchedoscopeSett
Some(vsr.view.dependencies.map(d => (d.tableName, d.urlPath)).groupBy(_._1).mapValues(_.toList.map(_._2)))
else
None,
lineage = if (overview) None else Some(vsr.view.lineage.map { case (f, deps) => f.toString -> deps.map(_.toString).toList }),
lineage = if (overview) None else Some(vsr.view.lineage.filter(_._1.isInstanceOf[Field[_]]).map { case (f, deps) => f.toString -> deps.map(_.toString).toList }),
transformation = if (overview) None else Option(vsr.view.registeredTransformation().viewTransformationStatus),
export = if (overview) None else Option(viewExportStatus(vsr.view.registeredExports.map(e => e.apply()))),
storageFormat = if (overview) None else Option(vsr.view.storageFormat.getClass.getSimpleName),
materializeOnce = if (overview) None else Option(vsr.view.isMaterializeOnce),
comment = if (overview) None else Option(vsr.view.comment),
isTable = isTable
isTable = isTable,
isExternal = vsr.view.isExternal
)

private def viewStatusListFromStatusResponses(viewStatusResponses: List[ViewStatusResponse],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ class SchemaManager(val metastoreClient: IMetaStoreClient, val connection: Conne

private def partitionToView(tablePrototype: View, p: Partition) = {
val viewUrl = s"${tablePrototype.urlPathPrefix}/${p.getValues.mkString("/")}"
View.viewsFromUrl(settings.env,
viewUrl,
View.viewsFromUrl(viewUrl,
settings.viewAugmentor).head
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.io.OutputStreamWriter
import java.net.URI

import org.apache.hadoop.fs.Path
import org.schedoscope.Schedoscope
import org.schedoscope.dsl.{FieldLike, Named, View}
import org.schedoscope.test.resources.{LocalTestResources, TestResources}

Expand All @@ -30,8 +31,6 @@ import scala.collection.mutable.ListBuffer
*/
trait WritableView extends View {

env = "test"

val isStatic = false

var resources: TestResources = new LocalTestResources()
Expand Down
Loading

0 comments on commit 592b7f4

Please sign in to comment.