Skip to content

Commit

Permalink
Merge pull request #643 from jluhrs/feature/SEQNG-715
Browse files Browse the repository at this point in the history
SEQNG-715 Fixed code that updates and reloads sequences
  • Loading branch information
jluhrs authored Sep 10, 2018
2 parents bf1e0b7 + a7d0a35 commit 9830261
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 71 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ lazy val seqexec_engine = project
.in(file("modules/seqexec/engine"))
.enablePlugins(AutomateHeaderPlugin)
.enablePlugins(GitBranchPrompt)
.dependsOn(seqexec_model.jvm)
.dependsOn(seqexec_model.jvm % "compile->compile;test->test")
.settings(commonSettings: _*)
.settings(
addCompilerPlugin(Plugins.kindProjectorPlugin),
Expand Down
24 changes: 16 additions & 8 deletions modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import seqexec.engine.Result.{PartialVal, PauseContext, RetVal}
import seqexec.model.{ClientID, SequenceState}
import fs2.Stream
import gem.Observation
import monocle.Lens
import monocle.{Lens, Optional}
import monocle.macros.Lenses
import monocle.function.At.at
import monocle.function.At.atMap
import monocle.function.Index.index
import mouse.boolean._
import org.log4s.getLogger

Expand Down Expand Up @@ -135,7 +134,7 @@ class Engine[D, U](stateL: Lens[D, Engine.State]) {
*/
def load(id: Observation.Id, seq: Sequence): Endo[D] =
stateL.modify(st =>
st.copy(sequences = st.sequences.get(id).map(t => st.sequences.updated(id, t.update(seq))
st.copy(sequences = st.sequences.get(id).map(t => st.sequences.updated(id, Sequence.State.reload(seq.steps, t))
).getOrElse(st.sequences.updated(id, Sequence.State.init(seq))))
)

Expand All @@ -149,6 +148,15 @@ class Engine[D, U](stateL: Lens[D, Engine.State]) {
)
)

/**
* Refresh the steps executions of an existing sequence
* @param id sequence identifier
* @param steps List of new steps definitions
* @return
*/
def update(id: Observation.Id, steps: List[Step]): Endo[D] =
(stateL ^|-? Engine.State.sequenceState(id)).modify(_.update(steps.map(_.executions)))

/**
* Adds the current `Execution` to the completed `Queue`, makes the next
* pending `Execution` the current one, and initiates the actual execution.
Expand Down Expand Up @@ -388,10 +396,10 @@ class Engine[D, U](stateL: Lens[D, Engine.State]) {
inspect(stateL.get(_).sequences.get(id).map(f))

private def modifyS(id: Observation.Id)(f: Sequence.State => Sequence.State): HandleP[Unit] =
modify((stateL ^|-> Engine.State.sequenceState(id)).modify(s => s.map(f)))
modify((stateL ^|-? Engine.State.sequenceState(id)).modify(f))

private def putS(id: Observation.Id)(s: Sequence.State): HandleP[Unit] =
modify((stateL ^|-> Engine.State.sequenceState(id)).set(s.some))
modify((stateL ^|-? Engine.State.sequenceState(id)).set(s))

// For debugging
def printSequenceState(id: Observation.Id): HandleP[Unit] =
Expand All @@ -407,8 +415,8 @@ object Engine {

object State {
def empty: State = State(Map.empty)
def atSequence(id: Observation.Id): Lens[Map[Observation.Id, Sequence.State], Option[Sequence.State]] = at(id)
def sequenceState[D](id: Observation.Id): Lens[State, Option[Sequence.State]] = State.sequences ^|-> atSequence(id)
private def atSequence(id: Observation.Id): Optional[Map[Observation.Id, Sequence.State], Sequence.State] = index(id)
def sequenceState[D](id: Observation.Id): Optional[State, Sequence.State] = State.sequences ^|-? atSequence(id)
}

abstract class Types {
Expand Down
56 changes: 36 additions & 20 deletions modules/seqexec/engine/src/main/scala/seqexec/engine/Sequence.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,13 @@ object Sequence {

def start(i: Int): State

def update(sequence: Sequence): State
/**
* Updates the steps executions.
* It preserves the number of steps.
* @param stepDefs New executions.
* @return Updated state
*/
def update(stepDefs: List[List[Actions]]): State

/**
* Unzip `State`. This creates a single `Sequence` with either completed `Step`s
Expand Down Expand Up @@ -231,12 +237,27 @@ object Sequence {
}

/**
* Initialize a `State` passing a `Queue` of pending `Sequence`s.
* Initialize a `State` passing a `Sequence` of pending `Step`s.
*/
// TODO: Make this function `apply`?
def init(q: Sequence): State =
Sequence.Zipper.zipper(q).map(Zipper(_, SequenceState.Idle))
.getOrElse(Final(Sequence.empty(q.id), SequenceState.Idle))
.getOrElse(Final(q, SequenceState.Idle))

/**
* Rebuilds the state of a sequence with a new steps definition, but preserving breakpoints and skip marks
* The sequence must not be running.
* @param q New sequence definition
* @param st Old sequence state
* @return The new sequence state
*/
def reload(steps: List[Step], st:State): State =
if(st.status.isRunning) st
else {
val oldSeq = st.toSequence
val updSteps = oldSeq.steps.zip(steps).map{ case (o, n) => n.copy(breakpoint = o.breakpoint, skipMark = o.skipMark)} ++ steps.drop(oldSeq.steps.length)
init(oldSeq.copy(steps = updSteps))
}

/**
* This is the `State` in Zipper mode, which means is under execution.
Expand Down Expand Up @@ -313,24 +334,19 @@ object Sequence {

// Some rules:
// 1. Done steps cannot change.
// 2. Pending steps cannot turn to not pending.
// 3. Running step cannot change done or focus executions
// 4. Must preserve breakpoints and skip marks
override def update(sequence: Sequence): State = {
val updatedPending = sequence.steps.drop(zipper.done.length)

require(updatedPending.forall(Step.status(_) === StepState.Pending))

updatedPending match {
case t::ts => zipperL.modify(zp => zp.copy(focus = zp.focus.update(t), pending = pending.zip(ts).map{
case (o, n) => n.copy(breakpoint = o.breakpoint, skipMark = o.skipMark)
} ++ ts.drop(pending.length)))(this)
case _ => if(status.isRunning) this
else Final(Sequence(zipper.id, zipper.done), status)
// 2. Running step cannot change `done` or `focus` executions
// 3. Must preserve breakpoints and skip marks
override def update(stepDefs: List[List[Actions]]): State =
stepDefs.drop(zipper.done.length) match {
case t :: ts => zipperL.modify(zp =>
zp.copy(
focus = zp.focus.update(t),
pending = pending.zip(ts).map{case (step, exes) => step.copy(executions = exes)} ++ pending.drop(ts.length)
)
)(this)
case _ => this
}

}

override val toSequence: Sequence = zipper.toSequence

}
Expand Down Expand Up @@ -364,7 +380,7 @@ object Sequence {

override def start(i: Int): State = self

override def update(sequence: Sequence): State = self
override def update(stepDefs: List[List[Actions]]): State = self

override val toSequence: Sequence = seq

Expand Down
54 changes: 25 additions & 29 deletions modules/seqexec/engine/src/main/scala/seqexec/engine/Step.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,46 +116,42 @@ object Step {

val skip: Step = toStep.copy(skipped = Skipped(true))

def update(step: Step): Zipper = {
val currentified = Zipper.currentify(step)

//If running, only change the pending executions and the rollback definition.
(if (Step.status(toStep) === StepState.Running)
// Step updates should not change the number of Executions. If it does, the update will not apply unless
// the Step is paused and restarted.
if (step.executions.length === done.length + pending.length + 1)
currentified.map(c => this.copy(pending = c.pending.takeRight(pending.length), rolledback = c.rolledback))
else
currentified.map(c => this.copy(rolledback = c.rolledback))
else currentified.map(_.copy(breakpoint = this.breakpoint, skipMark = this.skipMark))
).getOrElse(this)
}
def update(executions: List[Actions]): Zipper =
Zipper.calcRolledback(executions).map{ case r@(_, exes) =>
// Changing `pending` allows to propagate changes to non executed `executions`, even if the step is running
// Don't do it if the number of executions changes. In that case the update will only have an effect if
// the step is (re)started.
if (exes.length === done.length + pending.length) this.copy(pending = exes.takeRight(pending.length), rolledback = r)
else this.copy(rolledback = r)
}.getOrElse(this)

}

object Zipper {

private def calcRolledback(executions: List[Actions]): Option[(Execution, List[Actions])] = executions match {
case Nil => None
case exe :: exes =>
Execution.currentify(exe).map((_, exes))
}

/**
* Make a `Zipper` from a `Step` only if all the `Execution`s in the `Step` are
* pending. This is a special way of *zipping* a `Step`.
*
*/
def currentify(step: Step): Option[Zipper] =
step.executions match {
case Nil => None
case exe :: exes =>
Execution.currentify(exe).map(x =>
Zipper(
step.id,
step.fileId,
step.breakpoint,
step.skipMark,
exes,
x,
Nil,
(x, exes)
)
)
calcRolledback(step.executions).map{ case (x, exes) =>
Zipper(
step.id,
step.fileId,
step.breakpoint,
step.skipMark,
exes,
x,
Nil,
(x, exes)
)
}

val current: Lens[Zipper, Execution] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2016-2018 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package seqexec.engine

import cats.Eq
import cats.tests.CatsSuite
import gem.Observation
import gem.arb.ArbObservation
import monocle.law.discipline.OptionalTests
import org.scalacheck.{Arbitrary, Cogen}
import org.scalacheck.Arbitrary._
import seqexec.engine
import seqexec.model.SeqexecModelArbitraries._
import seqexec.model.SequenceState

final class EngineSpec extends CatsSuite with ArbObservation {
implicit val seqstateEq: Eq[engine.Sequence.State] = Eq.fromUniversalEquals
implicit val execstateEq: Eq[engine.Engine.State] = Eq.by(x => x.sequences)

implicit val sequenceArb: Arbitrary[Sequence] = Arbitrary{
for{
id <- arbitrary[Observation.Id](ArbObservation.arbObservationId)
} yield Sequence(id, List())
}

implicit val sequenceStateArb: Arbitrary[Sequence.State] = Arbitrary{
for{
seq <- arbitrary[Sequence]
st <- arbitrary[SequenceState]
} yield Sequence.State.Final(seq, st)
}

implicit val sequenceStateCogen: Cogen[Sequence.State] = Cogen[Observation.Id].contramap(_.toSequence.id)

implicit val engineStateArb: Arbitrary[Engine.State] = Arbitrary {
for {
q <- arbitrary[Map[Observation.Id, Sequence.State]]
} yield Engine.State(q)
}

checkAll("sequence optional",
OptionalTests[Engine.State, Sequence.State, Observation.Id](Engine.State.sequenceState))

}
Original file line number Diff line number Diff line change
Expand Up @@ -607,18 +607,23 @@ object SeqexecEngine extends SeqexecConfiguration {

}

private def toEngineSequence(id: Observation.Id, seq: SequenceGen, d: HeaderExtraData): Sequence = Sequence(id, seq.steps.map(_.generator(d)))
private def toStepList(seq: SequenceGen, d: HeaderExtraData): List[engine.Step] = seq.steps.map(_.generator(d))

private def toEngineSequence(id: Observation.Id, seq: SequenceGen, d: HeaderExtraData): Sequence = Sequence(id, toStepList(seq, d))

private[server] def loadSequenceEndo(seqId: Observation.Id, seqg: SequenceGen): Endo[EngineState] =
EngineState.sequences.modify(ss => ss + (seqId -> ObserverSequence(ss.get(seqId).flatMap(_.observer), seqg))) >>>
(st => executeEngine.load(seqId, toEngineSequence(seqId, seqg, HeaderExtraData(st.conditions, st.operator, EngineState.sequences.get(st).get(seqId).flatMap(_.observer))))(st))

private[server] def updateSequenceEndo(seqId: Observation.Id, seqg: SequenceGen): Endo[EngineState] =
(st => executeEngine.update(seqId, toStepList(seqg, HeaderExtraData(st.conditions, st.operator, EngineState.sequences.get(st).get(seqId).flatMap(_.observer))))(st))

private def refreshSequence(id: Observation.Id): Endo[EngineState] = (st:EngineState) => {
st.sequences.get(id).map(obsseq => loadSequenceEndo(id, obsseq.seq)).foldLeft(st){case (s, f) => f(s)}
st.sequences.get(id).map(obsseq => updateSequenceEndo(id, obsseq.seq)).foldLeft(st){case (s, f) => f(s)}
}

private val refreshSequences: Endo[EngineState] = (st:EngineState) => {
st.sequences.map{ case (id, obsseq) => loadSequenceEndo(id, obsseq.seq) }.foldLeft(st){case (s, f) => f(s)}
st.sequences.map{ case (id, obsseq) => updateSequenceEndo(id, obsseq.seq) }.foldLeft(st){case (s, f) => f(s)}
}

private def modifyStateEvent(v: SeqEvent, svs: => SequencesQueue[SequenceView]): SeqexecEvent = v match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ class SeqTranslateSpec extends FlatSpec {
(EngineState.executionState ^|-> Engine.State.sequences ^|-? index[Map[Observation.Id,Sequence.State], Observation.Id, Sequence.State](seqId) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default)

// Observe started
private val s0: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.start(0))(baseState)
private val s0: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.start(0))(baseState)
// Observe pending
private val s1: EngineState = baseState
// Observe completed
private val s2: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.mark(0)(Result.OK(Result.Observed(fileId))))(baseState)
private val s2: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.mark(0)(Result.OK(Result.Observed(fileId))))(baseState)
// Observe started, but with file Id already allocated
private val s3: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.start(0).mark(0)(Result.Partial(Result.FileIdAllocated(fileId), IO(Result.OK(Result.Observed(fileId))))))(baseState)
private val s3: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.start(0).mark(0)(Result.Partial(Result.FileIdAllocated(fileId), IO(Result.OK(Result.Observed(fileId))))))(baseState)
// Observe paused
private val s4: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.mark(0)(Result.Paused(ObserveContext(_ => SeqAction(Result.OK(Result.Observed(fileId))), Seconds(1)))))(baseState)
private val s4: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.mark(0)(Result.Paused(ObserveContext(_ => SeqAction(Result.OK(Result.Observed(fileId))), Seconds(1)))))(baseState)
// Observe failed
private val s5: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.mark(0)(Result.Error("error")))(baseState)
private val s5: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.mark(0)(Result.Error("error")))(baseState)

private val systems = SeqTranslate.Systems(
new ODBProxy(new Peer("localhost", 8443, null), ODBProxy.DummyOdbCommands),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class SeqexecEngineSpec extends FlatSpec with Matchers {
val s0 = (SeqexecEngine.loadSequenceEndo(seqObsId1, sequence(seqObsId1)) >>>
SeqexecEngine.loadSequenceEndo(seqObsId2, sequence(seqObsId2)) >>>
(EngineState.queues ^|-? index(CalibrationQueueName)).modify(_ ++ List(seqObsId1, seqObsId2)) >>>
(EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default)
(EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default)

(for {
q <- async.boundedQueue[IO, executeEngine.EventType](10)
Expand Down Expand Up @@ -412,13 +412,13 @@ class SeqexecEngineSpec extends FlatSpec with Matchers {
"SeqexecEngine" should "not run 2nd sequence because it's using the same resource" in {
val s0 = (SeqexecEngine.loadSequenceEndo(seqObsId1, sequenceWithResources(seqObsId1, Set(Instrument.F2, TCS))) >>>
SeqexecEngine.loadSequenceEndo(seqObsId2, sequenceWithResources(seqObsId2, Set(Instrument.F2))) >>>
(EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default)
(EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default)

(for {
q <- async.boundedQueue[IO, executeEngine.EventType](10)
sf <- advanceOne(q, s0, seqexecEngine.start(q, seqObsId2, UserDetails("", ""), UUID.randomUUID()))
} yield {
inside(sf.flatMap((EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId2)).getOption).map(_.status)) {
inside(sf.flatMap((EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId2)).getOption).map(_.status)) {
case Some(status) => assert(status.isIdle)
}
}).unsafeRunSync
Expand All @@ -428,13 +428,13 @@ class SeqexecEngineSpec extends FlatSpec with Matchers {
it should "run 2nd sequence when there are no shared resources" in {
val s0 = (SeqexecEngine.loadSequenceEndo(seqObsId1, sequenceWithResources(seqObsId1, Set(Instrument.F2, TCS))) >>>
SeqexecEngine.loadSequenceEndo(seqObsId2, sequenceWithResources(seqObsId2, Set(Instrument.GmosS))) >>>
(EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default)
(EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default)

(for {
q <- async.boundedQueue[IO, executeEngine.EventType](10)
sf <- advanceOne(q, s0, seqexecEngine.start(q, seqObsId2, UserDetails("", ""), UUID.randomUUID()))
} yield {
inside(sf.flatMap((EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId2)).getOption).map(_.status)) {
inside(sf.flatMap((EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId2)).getOption).map(_.status)) {
case Some(status) => assert(status.isRunning)
}
}).unsafeRunSync
Expand Down

0 comments on commit 9830261

Please sign in to comment.