From f8216f9fff2f1d9b801b87626531f52b3c544887 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20L=C3=BChrs?= Date: Thu, 6 Sep 2018 15:09:59 -0300 Subject: [PATCH 1/2] SEQNG-715 Fixed code that updates and reloads sequences --- .../main/scala/seqexec/engine/Engine.scala | 24 +++++--- .../main/scala/seqexec/engine/Sequence.scala | 56 ++++++++++++------- .../src/main/scala/seqexec/engine/Step.scala | 54 +++++++++--------- .../scala/seqexec/server/SeqexecEngine.scala | 11 +++- 4 files changed, 85 insertions(+), 60 deletions(-) diff --git a/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala b/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala index 6cb204e507..2ce6b02c08 100644 --- a/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala +++ b/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala @@ -12,10 +12,9 @@ import seqexec.engine.Result.{PartialVal, PauseContext, RetVal} import seqexec.model.{ClientID, SequenceState} import fs2.Stream import gem.Observation -import monocle.Lens +import monocle.{Lens, Optional} import monocle.macros.Lenses -import monocle.function.At.at -import monocle.function.At.atMap +import monocle.function.Index.index import mouse.boolean._ import org.log4s.getLogger @@ -135,7 +134,7 @@ class Engine[D, U](stateL: Lens[D, Engine.State]) { */ def load(id: Observation.Id, seq: Sequence): Endo[D] = stateL.modify(st => - st.copy(sequences = st.sequences.get(id).map(t => st.sequences.updated(id, t.update(seq)) + st.copy(sequences = st.sequences.get(id).map(t => st.sequences.updated(id, Sequence.State.reload(seq.steps, t)) ).getOrElse(st.sequences.updated(id, Sequence.State.init(seq)))) ) @@ -149,6 +148,15 @@ class Engine[D, U](stateL: Lens[D, Engine.State]) { ) ) + /** + * Refresh the steps executions of an existing sequence + * @param id sequence identifier + * @param steps List of new steps definitions + * @return + */ + def update(id: Observation.Id, steps: List[Step]): Endo[D] = + (stateL ^|-? Engine.State.sequenceState(id)).modify(_.update(steps.map(_.executions))) + /** * Adds the current `Execution` to the completed `Queue`, makes the next * pending `Execution` the current one, and initiates the actual execution. @@ -388,10 +396,10 @@ class Engine[D, U](stateL: Lens[D, Engine.State]) { inspect(stateL.get(_).sequences.get(id).map(f)) private def modifyS(id: Observation.Id)(f: Sequence.State => Sequence.State): HandleP[Unit] = - modify((stateL ^|-> Engine.State.sequenceState(id)).modify(s => s.map(f))) + modify((stateL ^|-? Engine.State.sequenceState(id)).modify(f)) private def putS(id: Observation.Id)(s: Sequence.State): HandleP[Unit] = - modify((stateL ^|-> Engine.State.sequenceState(id)).set(s.some)) + modify((stateL ^|-? Engine.State.sequenceState(id)).set(s)) // For debugging def printSequenceState(id: Observation.Id): HandleP[Unit] = @@ -407,8 +415,8 @@ object Engine { object State { def empty: State = State(Map.empty) - def atSequence(id: Observation.Id): Lens[Map[Observation.Id, Sequence.State], Option[Sequence.State]] = at(id) - def sequenceState[D](id: Observation.Id): Lens[State, Option[Sequence.State]] = State.sequences ^|-> atSequence(id) + def atSequence(id: Observation.Id): Optional[Map[Observation.Id, Sequence.State], Sequence.State] = index(id) + def sequenceState[D](id: Observation.Id): Optional[State, Sequence.State] = State.sequences ^|-? atSequence(id) } abstract class Types { diff --git a/modules/seqexec/engine/src/main/scala/seqexec/engine/Sequence.scala b/modules/seqexec/engine/src/main/scala/seqexec/engine/Sequence.scala index 6c7f8381d5..173106b913 100644 --- a/modules/seqexec/engine/src/main/scala/seqexec/engine/Sequence.scala +++ b/modules/seqexec/engine/src/main/scala/seqexec/engine/Sequence.scala @@ -192,7 +192,13 @@ object Sequence { def start(i: Int): State - def update(sequence: Sequence): State + /** + * Updates the steps executions. + * It preserves the number of steps. + * @param stepDefs New executions. + * @return Updated state + */ + def update(stepDefs: List[List[Actions]]): State /** * Unzip `State`. This creates a single `Sequence` with either completed `Step`s @@ -231,12 +237,27 @@ object Sequence { } /** - * Initialize a `State` passing a `Queue` of pending `Sequence`s. + * Initialize a `State` passing a `Sequence` of pending `Step`s. */ // TODO: Make this function `apply`? def init(q: Sequence): State = Sequence.Zipper.zipper(q).map(Zipper(_, SequenceState.Idle)) - .getOrElse(Final(Sequence.empty(q.id), SequenceState.Idle)) + .getOrElse(Final(q, SequenceState.Idle)) + + /** + * Rebuilds the state of a sequence with a new steps definition, but preserving breakpoints and skip marks + * The sequence must not be running. + * @param q New sequence definition + * @param st Old sequence state + * @return The new sequence state + */ + def reload(steps: List[Step], st:State): State = + if(st.status.isRunning) st + else { + val oldSeq = st.toSequence + val updSteps = oldSeq.steps.zip(steps).map{ case (o, n) => n.copy(breakpoint = o.breakpoint, skipMark = o.skipMark)} ++ steps.drop(oldSeq.steps.length) + init(oldSeq.copy(steps = updSteps)) + } /** * This is the `State` in Zipper mode, which means is under execution. @@ -313,24 +334,19 @@ object Sequence { // Some rules: // 1. Done steps cannot change. - // 2. Pending steps cannot turn to not pending. - // 3. Running step cannot change done or focus executions - // 4. Must preserve breakpoints and skip marks - override def update(sequence: Sequence): State = { - val updatedPending = sequence.steps.drop(zipper.done.length) - - require(updatedPending.forall(Step.status(_) === StepState.Pending)) - - updatedPending match { - case t::ts => zipperL.modify(zp => zp.copy(focus = zp.focus.update(t), pending = pending.zip(ts).map{ - case (o, n) => n.copy(breakpoint = o.breakpoint, skipMark = o.skipMark) - } ++ ts.drop(pending.length)))(this) - case _ => if(status.isRunning) this - else Final(Sequence(zipper.id, zipper.done), status) + // 2. Running step cannot change `done` or `focus` executions + // 3. Must preserve breakpoints and skip marks + override def update(stepDefs: List[List[Actions]]): State = + stepDefs.drop(zipper.done.length) match { + case t :: ts => zipperL.modify(zp => + zp.copy( + focus = zp.focus.update(t), + pending = pending.zip(ts).map{case (step, exes) => step.copy(executions = exes)} ++ pending.drop(ts.length) + ) + )(this) + case _ => this } - } - override val toSequence: Sequence = zipper.toSequence } @@ -364,7 +380,7 @@ object Sequence { override def start(i: Int): State = self - override def update(sequence: Sequence): State = self + override def update(stepDefs: List[List[Actions]]): State = self override val toSequence: Sequence = seq diff --git a/modules/seqexec/engine/src/main/scala/seqexec/engine/Step.scala b/modules/seqexec/engine/src/main/scala/seqexec/engine/Step.scala index c3ff6cd337..0e0ba66609 100644 --- a/modules/seqexec/engine/src/main/scala/seqexec/engine/Step.scala +++ b/modules/seqexec/engine/src/main/scala/seqexec/engine/Step.scala @@ -116,46 +116,42 @@ object Step { val skip: Step = toStep.copy(skipped = Skipped(true)) - def update(step: Step): Zipper = { - val currentified = Zipper.currentify(step) - - //If running, only change the pending executions and the rollback definition. - (if (Step.status(toStep) === StepState.Running) - // Step updates should not change the number of Executions. If it does, the update will not apply unless - // the Step is paused and restarted. - if (step.executions.length === done.length + pending.length + 1) - currentified.map(c => this.copy(pending = c.pending.takeRight(pending.length), rolledback = c.rolledback)) - else - currentified.map(c => this.copy(rolledback = c.rolledback)) - else currentified.map(_.copy(breakpoint = this.breakpoint, skipMark = this.skipMark)) - ).getOrElse(this) - } + def update(executions: List[Actions]): Zipper = + Zipper.calcRolledback(executions).map{ case r@(_, exes) => + // Changing `pending` allows to propagate changes to non executed `executions`, even if the step is running + // Don't do it if the number of executions changes. In that case the update will only have an effect if + // the step is (re)started. + if (exes.length === done.length + pending.length) this.copy(pending = exes.takeRight(pending.length), rolledback = r) + else this.copy(rolledback = r) + }.getOrElse(this) } object Zipper { + private def calcRolledback(executions: List[Actions]): Option[(Execution, List[Actions])] = executions match { + case Nil => None + case exe :: exes => + Execution.currentify(exe).map((_, exes)) + } + /** * Make a `Zipper` from a `Step` only if all the `Execution`s in the `Step` are * pending. This is a special way of *zipping* a `Step`. * */ def currentify(step: Step): Option[Zipper] = - step.executions match { - case Nil => None - case exe :: exes => - Execution.currentify(exe).map(x => - Zipper( - step.id, - step.fileId, - step.breakpoint, - step.skipMark, - exes, - x, - Nil, - (x, exes) - ) - ) + calcRolledback(step.executions).map{ case (x, exes) => + Zipper( + step.id, + step.fileId, + step.breakpoint, + step.skipMark, + exes, + x, + Nil, + (x, exes) + ) } val current: Lens[Zipper, Execution] = diff --git a/modules/seqexec/server/src/main/scala/seqexec/server/SeqexecEngine.scala b/modules/seqexec/server/src/main/scala/seqexec/server/SeqexecEngine.scala index 62a6af387d..28280660ab 100644 --- a/modules/seqexec/server/src/main/scala/seqexec/server/SeqexecEngine.scala +++ b/modules/seqexec/server/src/main/scala/seqexec/server/SeqexecEngine.scala @@ -607,18 +607,23 @@ object SeqexecEngine extends SeqexecConfiguration { } - private def toEngineSequence(id: Observation.Id, seq: SequenceGen, d: HeaderExtraData): Sequence = Sequence(id, seq.steps.map(_.generator(d))) + private def toStepList(seq: SequenceGen, d: HeaderExtraData): List[engine.Step] = seq.steps.map(_.generator(d)) + + private def toEngineSequence(id: Observation.Id, seq: SequenceGen, d: HeaderExtraData): Sequence = Sequence(id, toStepList(seq, d)) private[server] def loadSequenceEndo(seqId: Observation.Id, seqg: SequenceGen): Endo[EngineState] = EngineState.sequences.modify(ss => ss + (seqId -> ObserverSequence(ss.get(seqId).flatMap(_.observer), seqg))) >>> (st => executeEngine.load(seqId, toEngineSequence(seqId, seqg, HeaderExtraData(st.conditions, st.operator, EngineState.sequences.get(st).get(seqId).flatMap(_.observer))))(st)) + private[server] def updateSequenceEndo(seqId: Observation.Id, seqg: SequenceGen): Endo[EngineState] = + (st => executeEngine.update(seqId, toStepList(seqg, HeaderExtraData(st.conditions, st.operator, EngineState.sequences.get(st).get(seqId).flatMap(_.observer))))(st)) + private def refreshSequence(id: Observation.Id): Endo[EngineState] = (st:EngineState) => { - st.sequences.get(id).map(obsseq => loadSequenceEndo(id, obsseq.seq)).foldLeft(st){case (s, f) => f(s)} + st.sequences.get(id).map(obsseq => updateSequenceEndo(id, obsseq.seq)).foldLeft(st){case (s, f) => f(s)} } private val refreshSequences: Endo[EngineState] = (st:EngineState) => { - st.sequences.map{ case (id, obsseq) => loadSequenceEndo(id, obsseq.seq) }.foldLeft(st){case (s, f) => f(s)} + st.sequences.map{ case (id, obsseq) => updateSequenceEndo(id, obsseq.seq) }.foldLeft(st){case (s, f) => f(s)} } private def modifyStateEvent(v: SeqEvent, svs: => SequencesQueue[SequenceView]): SeqexecEvent = v match { From a7d0a3544e59073526c274d2ef4d2154c947e12c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20L=C3=BChrs?= Date: Mon, 10 Sep 2018 15:09:30 -0300 Subject: [PATCH 2/2] SEQNG-715 Added unit test for Engine lens. --- build.sbt | 2 +- .../main/scala/seqexec/engine/Engine.scala | 2 +- .../scala/seqexec/engine/EngineSpec.scala | 45 +++++++++++++++++++ .../seqexec/server/SeqTranslateSpec.scala | 10 ++--- .../seqexec/server/SeqexecEngineSpec.scala | 10 ++--- 5 files changed, 57 insertions(+), 12 deletions(-) create mode 100644 modules/seqexec/engine/src/test/scala/seqexec/engine/EngineSpec.scala diff --git a/build.sbt b/build.sbt index 401d190947..e0bda7a466 100644 --- a/build.sbt +++ b/build.sbt @@ -562,7 +562,7 @@ lazy val seqexec_engine = project .in(file("modules/seqexec/engine")) .enablePlugins(AutomateHeaderPlugin) .enablePlugins(GitBranchPrompt) - .dependsOn(seqexec_model.jvm) + .dependsOn(seqexec_model.jvm % "compile->compile;test->test") .settings(commonSettings: _*) .settings( addCompilerPlugin(Plugins.kindProjectorPlugin), diff --git a/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala b/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala index 2ce6b02c08..7366627ae7 100644 --- a/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala +++ b/modules/seqexec/engine/src/main/scala/seqexec/engine/Engine.scala @@ -415,7 +415,7 @@ object Engine { object State { def empty: State = State(Map.empty) - def atSequence(id: Observation.Id): Optional[Map[Observation.Id, Sequence.State], Sequence.State] = index(id) + private def atSequence(id: Observation.Id): Optional[Map[Observation.Id, Sequence.State], Sequence.State] = index(id) def sequenceState[D](id: Observation.Id): Optional[State, Sequence.State] = State.sequences ^|-? atSequence(id) } diff --git a/modules/seqexec/engine/src/test/scala/seqexec/engine/EngineSpec.scala b/modules/seqexec/engine/src/test/scala/seqexec/engine/EngineSpec.scala new file mode 100644 index 0000000000..2556022111 --- /dev/null +++ b/modules/seqexec/engine/src/test/scala/seqexec/engine/EngineSpec.scala @@ -0,0 +1,45 @@ +// Copyright (c) 2016-2018 Association of Universities for Research in Astronomy, Inc. (AURA) +// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause + +package seqexec.engine + +import cats.Eq +import cats.tests.CatsSuite +import gem.Observation +import gem.arb.ArbObservation +import monocle.law.discipline.OptionalTests +import org.scalacheck.{Arbitrary, Cogen} +import org.scalacheck.Arbitrary._ +import seqexec.engine +import seqexec.model.SeqexecModelArbitraries._ +import seqexec.model.SequenceState + +final class EngineSpec extends CatsSuite with ArbObservation { + implicit val seqstateEq: Eq[engine.Sequence.State] = Eq.fromUniversalEquals + implicit val execstateEq: Eq[engine.Engine.State] = Eq.by(x => x.sequences) + + implicit val sequenceArb: Arbitrary[Sequence] = Arbitrary{ + for{ + id <- arbitrary[Observation.Id](ArbObservation.arbObservationId) + } yield Sequence(id, List()) + } + + implicit val sequenceStateArb: Arbitrary[Sequence.State] = Arbitrary{ + for{ + seq <- arbitrary[Sequence] + st <- arbitrary[SequenceState] + } yield Sequence.State.Final(seq, st) + } + + implicit val sequenceStateCogen: Cogen[Sequence.State] = Cogen[Observation.Id].contramap(_.toSequence.id) + + implicit val engineStateArb: Arbitrary[Engine.State] = Arbitrary { + for { + q <- arbitrary[Map[Observation.Id, Sequence.State]] + } yield Engine.State(q) + } + + checkAll("sequence optional", + OptionalTests[Engine.State, Sequence.State, Observation.Id](Engine.State.sequenceState)) + +} diff --git a/modules/seqexec/server/src/test/scala/seqexec/server/SeqTranslateSpec.scala b/modules/seqexec/server/src/test/scala/seqexec/server/SeqTranslateSpec.scala index ba51390588..4f57b15871 100644 --- a/modules/seqexec/server/src/test/scala/seqexec/server/SeqTranslateSpec.scala +++ b/modules/seqexec/server/src/test/scala/seqexec/server/SeqTranslateSpec.scala @@ -57,17 +57,17 @@ class SeqTranslateSpec extends FlatSpec { (EngineState.executionState ^|-> Engine.State.sequences ^|-? index[Map[Observation.Id,Sequence.State], Observation.Id, Sequence.State](seqId) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default) // Observe started - private val s0: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.start(0))(baseState) + private val s0: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.start(0))(baseState) // Observe pending private val s1: EngineState = baseState // Observe completed - private val s2: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.mark(0)(Result.OK(Result.Observed(fileId))))(baseState) + private val s2: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.mark(0)(Result.OK(Result.Observed(fileId))))(baseState) // Observe started, but with file Id already allocated - private val s3: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.start(0).mark(0)(Result.Partial(Result.FileIdAllocated(fileId), IO(Result.OK(Result.Observed(fileId))))))(baseState) + private val s3: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.start(0).mark(0)(Result.Partial(Result.FileIdAllocated(fileId), IO(Result.OK(Result.Observed(fileId))))))(baseState) // Observe paused - private val s4: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.mark(0)(Result.Paused(ObserveContext(_ => SeqAction(Result.OK(Result.Observed(fileId))), Seconds(1)))))(baseState) + private val s4: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.mark(0)(Result.Paused(ObserveContext(_ => SeqAction(Result.OK(Result.Observed(fileId))), Seconds(1)))))(baseState) // Observe failed - private val s5: EngineState = (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqId)).modify(_.mark(0)(Result.Error("error")))(baseState) + private val s5: EngineState = (EngineState.executionState ^|-? Engine.State.sequenceState(seqId)).modify(_.mark(0)(Result.Error("error")))(baseState) private val systems = SeqTranslate.Systems( new ODBProxy(new Peer("localhost", 8443, null), ODBProxy.DummyOdbCommands), diff --git a/modules/seqexec/server/src/test/scala/seqexec/server/SeqexecEngineSpec.scala b/modules/seqexec/server/src/test/scala/seqexec/server/SeqexecEngineSpec.scala index b8f3591ce2..952622b2ad 100644 --- a/modules/seqexec/server/src/test/scala/seqexec/server/SeqexecEngineSpec.scala +++ b/modules/seqexec/server/src/test/scala/seqexec/server/SeqexecEngineSpec.scala @@ -273,7 +273,7 @@ class SeqexecEngineSpec extends FlatSpec with Matchers { val s0 = (SeqexecEngine.loadSequenceEndo(seqObsId1, sequence(seqObsId1)) >>> SeqexecEngine.loadSequenceEndo(seqObsId2, sequence(seqObsId2)) >>> (EngineState.queues ^|-? index(CalibrationQueueName)).modify(_ ++ List(seqObsId1, seqObsId2)) >>> - (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default) + (EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default) (for { q <- async.boundedQueue[IO, executeEngine.EventType](10) @@ -412,13 +412,13 @@ class SeqexecEngineSpec extends FlatSpec with Matchers { "SeqexecEngine" should "not run 2nd sequence because it's using the same resource" in { val s0 = (SeqexecEngine.loadSequenceEndo(seqObsId1, sequenceWithResources(seqObsId1, Set(Instrument.F2, TCS))) >>> SeqexecEngine.loadSequenceEndo(seqObsId2, sequenceWithResources(seqObsId2, Set(Instrument.F2))) >>> - (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default) + (EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default) (for { q <- async.boundedQueue[IO, executeEngine.EventType](10) sf <- advanceOne(q, s0, seqexecEngine.start(q, seqObsId2, UserDetails("", ""), UUID.randomUUID())) } yield { - inside(sf.flatMap((EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId2)).getOption).map(_.status)) { + inside(sf.flatMap((EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId2)).getOption).map(_.status)) { case Some(status) => assert(status.isIdle) } }).unsafeRunSync @@ -428,13 +428,13 @@ class SeqexecEngineSpec extends FlatSpec with Matchers { it should "run 2nd sequence when there are no shared resources" in { val s0 = (SeqexecEngine.loadSequenceEndo(seqObsId1, sequenceWithResources(seqObsId1, Set(Instrument.F2, TCS))) >>> SeqexecEngine.loadSequenceEndo(seqObsId2, sequenceWithResources(seqObsId2, Set(Instrument.GmosS))) >>> - (EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default) + (EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId1) ^|-> Sequence.State.status).set(SequenceState.Running.init))(EngineState.default) (for { q <- async.boundedQueue[IO, executeEngine.EventType](10) sf <- advanceOne(q, s0, seqexecEngine.start(q, seqObsId2, UserDetails("", ""), UUID.randomUUID())) } yield { - inside(sf.flatMap((EngineState.executionState ^|-> Engine.State.sequences ^|-? index(seqObsId2)).getOption).map(_.status)) { + inside(sf.flatMap((EngineState.executionState ^|-? Engine.State.sequenceState(seqObsId2)).getOption).map(_.status)) { case Some(status) => assert(status.isRunning) } }).unsafeRunSync