Skip to content

KeYmaera X Code Snippets

Stefan Mitsch edited this page Mar 10, 2022 · 5 revisions

Exporting Mathematica QE Calls

Exporting Mathematica QE calls as generated by the builtin QE tactic takes two steps:

  1. Execute tactic prepareQE to normalize a formula and form its universal closure
  2. Transform the normalized universal closure to Mathematica syntax

The code below performs both steps:

package edu.cmu.cs.ls.keymaerax.btactics

import edu.cmu.cs.ls.keymaerax.parser.StringConverter._
import edu.cmu.cs.ls.keymaerax.btactics.TactixLibrary._
import edu.cmu.cs.ls.keymaerax.infrastruct.Augmentors.SequentAugmentor
import edu.cmu.cs.ls.keymaerax.lemma.Lemma
import edu.cmu.cs.ls.keymaerax.tools.ext.MathematicaQEToolBridge
import edu.cmu.cs.ls.keymaerax.tools.qe.MathematicaConversion
import org.scalatest.LoneElement._

class QETools extends TacticTestBase {

  "Prepare QE" should "prepare but not perform a QE call" in withMathematica { tool =>
    // transform formula according to the QE tactic (normalize, universal closure), result in KeYmaera X syntax
    proveBy("x>=2 ==> y>=1 -> x^2*y>=4 & \\forall z (z>0 -> x/z > 0)".asSequent, ToolTactics.prepareQE(List.empty, skip)).
      subgoals.loneElement shouldBe "==> \\forall y \\forall x (x>=2 & y>=1 -> x^2*y>=4 & \\forall z (z>0 -> x/z > 0))".asSequent
    proveBy("x>=2 ==> y>=1 -> x^2*y>=4 | \\forall z (z>0 -> x/z > 0)".asSequent, ToolTactics.prepareQE(List.empty, skip)).
      subgoals.loneElement shouldBe "==> \\forall z \\forall y \\forall x (x>=2 & y>=1 & z>0 -> x^2*y>=4 | x/z > 0)".asSequent
  }

  "Export" should "export in Mathematica syntax" in withMathematica { tool =>
    val preparedQE = proveBy("x>=2 ==> y>=1 -> x^2*y>=4 | \\forall z (z>0 -> x/z > 0)".asSequent,
      ToolTactics.prepareQE(List.empty, skip)).subgoals.loneElement.toFormula

    // export to Mathematica Reduce/Resolve by accessing private export methods; result in Mathematica syntax
    val qeTool = tool.invokePrivate(PrivateMethod[MathematicaQEToolBridge[Lemma]]('mQE)()).qeTool
    val mexpr = qeTool.invokePrivate(PrivateMethod[MathematicaConversion.MExpr]('qe)(preparedQE))
    println(mexpr)
  }
}

ModelPlex API

ModelPlex Model Monitors for Non-Solvable ODEs

Model monitors check whether the system behaves consistently with the modeled differential equations. Automated synthesis of model monitors assumes that the differential equations are solvable in order to derive exact monitor conditions that describe the expected behavior. For differential equations without polynomial solutions (e.g., non-linear differential equations), instead of using solutions ModelPlex can inspect proofs to find safety-relevant regions that confine the differential equations.

val Some(spec) = KeYmaeraXArchiveParser.getEntry("A KeYmaera X specification with a proof", io.Source.fromFile("/path/to/spec.kyx").mkString)
ModelPlex.createNonlinearModelApprox(spec.name, spec.tactics.head._3, spec.defs)(spec.model)

In order to approximate differential equations from proofs, the specification file must contain a proof tactic, e.g., as in the entry "ModelPlex/Partial Observability/Curved Ground Robot Motion is Safe" in partialobservability.kyx.

ModelPlex then reads off safety regions from differential invariant (diffInvariant) and differential cut (dC) steps in the proof. The discrete approximation has the shape below, e.g., see the entry "ModelPlex/Partial Observability/Approximated Curved Ground Robot Motion is Safe" in partialobservability.kyx for an example of a discrete approximation derived from the specification above.

HP ghosts ::= { x_0:=x; };
HP approxPlant ::= { x:=*; };
HP evolDomTest ::= { ?Q(x); };

HP discretePlant ::= {
  ghosts;
  evolDomTest;
  approxPlant;
  evolDomTest;
};

The intuition behind this discrete approximation is that not all safety proofs rely on the exact differential equations to prove safety. Some only need to know that the continuous behavior stayed inside a certain region as documented by differential cut/differential weakening steps in the proof and the evolution domain constraint in the model.

This discrete approximation step is possible when the tactics in spec.tactics.head._3 have a certain shape: all differential equation reasoning first builds a region by using differential cut/differential invariant steps, and then discards the exact continuous behavior using differential weakening, as illustrated in the tactic shape below.

implyR(1);
loop("J(x)", 1); <(
  "Init": QE,
  "Post": QE,
  "Step":
    composeb(1);
    choiceb(1);
    andR(1); <(
      "[left branch;]p(x)":
        diffInvariant("I_1(x,x_0)", 1);
        diffInvariant("I_2(x,x_0)", 1);
        dW(1);
        QE,
      "[right branch;]p(x)":
        diffInvariant("I_1(x,x_0)", 1);
        diffInvariant("I_3(x,x_0)", 1);
        dW(1);
        QE
    )
)

All branches in the proof first perform diffInvariant steps to describe the relevant safety region, and then end in dW. The procedure ModelPlex.createNonlinearModelApprox picks up the conditions I_1, I_2, and I_3 from the proof. The resulting model monitors then check the true continuous behavior for staying inside these conditions.

Partial Observability ModelPlex

ModelPlex for partially observable systems enables uniform treatment of various sources of uncertainty and unobservability in ModelPlex, including sensor measurement uncertainty, actuator disturbance, and unknown quantities/parameters. Partial ModelPlex monitors can retroactively add uncertainty to idealized hybrid system models.

As an example, the instructions here use a model of a water tank with flow disturbance.

  1. Read a KeYmaera X specification from a file

    val Some(spec) = KeYmaeraXArchiveParser.getEntry("A KeYmaera X specification", io.Source.fromFile("/path/to/spec.kyx").mkString)
    // e.g. val Some(spec) = KeYmaeraXArchiveParser.getEntry("ModelPlex/Partial Observability/Water tank with flow disturbance is safe",
    //   io.Source.fromInputStream(getClass.getResourceAsStream("/keymaerax-projects/modelplex/partialobservability.kyx")).mkString)
    val model = spec.model.exhaustiveSubst(USubst(entry.defs.substs.filter(_.what.isInstanceOf[SystemConst]))).asInstanceOf[Formula]

    The specification must be of the shape assumptions -> [prg;]safe, where assumptions is a first-order formula, prg is a hybrid program (not a game), and safe a dL formula. If the specification uses definitions, all programs must be expanded fully, but function and predicate symbols may stay unexpanded.

  2. Determine the state variables of the specification

    val stateVars = StaticSemantics.boundVars(model).toSet.filter(_.isInstanceOf[BaseVariable]).toList.asInstanceOf[List[BaseVariable]]

    ModelPlex monitors all variables that are bound by the model. If some of those variables are not observable in the system (e.g., because there are no sensors directly providing measurements for a certain quantity) or only observable with measurement uncertainty, they are still state variables but will be characterized using existential quantifiers. Model parameters can also be treated as unobservable.

  3. Specify sensors (optional)

    val unobservable: ListMap[NamedSymbol, Option[Formula]] = ListMap(
       Variable("l") -> Some("0 <= lU() & lS-lU() <= l & l <= lS+lU()".asFormula),
       Variable("fd") -> None,
       Function("D", None, Unit, Real) -> None)

    In this example, we declare that the water tank load l is not observable as an exact quantity as mandated by the model, but our system provides a sensor lS subject to measurement uncertainty lU(), which becomes a parameter of the monitor. For actuation, the example model is more detailed by distinguishing between the flow f chosen by the controller and the actual flow fd and its disturbance bounds D. The true fd is entirely unobservable and we may not know D, so declare them both as None.

If all bound variables of the model are observable, define val unobservable = ListMap.empty.

  1. Create the ModelPlex conjecture

    val ModelPlexConjecture(init, dLMonitorCondition, assumptions) = ModelPlex.createMonitorSpecificationConjecture(model, stateVars, unobservable)

    The result is a monitor condition in dL with existentially quantified unobservable quantities and their optional sensor specifications.

  2. Symbolically execute the program

    val foMonitorCondition = proveBy(dLMonitorCondition, ModelPlex.controllerMonitorByChase(1))

    Symbolic execution applies program proof rules until the effect of all hybrid programs in the model are determined in first-order form. The result contains (existential) quantifiers but no programs. None of the unobservable quantities is free in foMonitorCondition, but they may still occur existentially quantified. Steps 6+7 remove those quantifiers.

  3. Search for existential witnesses

    val simplifiedFOMonitorCondition = proveBy(foMonitorCondition, ModelPlex.optimizationOneWithSearch(Some(tool), assumptions,
      unobservable.keySet.toList, Some(ModelPlex.mxSimplify))(1))

    Tactic ModelPlex.optimizationOneWithSearch attempts to find witnesses for existential quantifiers in the first-order monitor condition and instantiates existential quantifiers with those witnesses (if any). With Some(ModelPlex.mxSimplify) we attempt to additionally simplify the monitor condition. This step can be omitted by providing None. Usually, the tactic finds some witnesses but not for all the existential quantifiers, so that in the next step we use quantifier elimination to obtain an equivalent quantifier-free formula.

  4. Eliminate remaining existential quantifiers: this step applies a backend for quantifier elimination to find an equivalent quantifier-free monitor formula. Even though decidable in theory, quantifier elimination is computationally expensive in practice. As a result, the automated tactics for this step often exceed any reasonable time budget, but there are several options to scale the process more manually.

    1. With partialQE verbatim in the backend tool

      val qfMonitorCondition = proveBy(simplifiedFOMonitorCondition, partialQE & ModelPlex.mxSimplify(1) & ModelPlex.reassociate)

      The steps ModelPlex.mxSimplify(1) and ModelPlex.reassociate beautify the monitor condition but can be omitted if too expensive.

    2. With function-abbreviating mxPartialQE and backend tool

      In larger models, the first-order monitor conditions can become quite complex and the backend procedures used in partialQE may not provide results within a reasonable time budget. In this case, manual alternatives can make progress, but need careful inspection because they do not come with an unbroken chain of proofs. The next snippet abbreviates function symbols whose arguments are not quantified in order to reduce arithmetic complexity.

      assert(simplifiedFOMonitorCondition.subgoals.size == 1 && simplifiedFOMonitorCondition.subgoals.head.ante.isEmpty && simplifiedFOMonitorCondition.subgoals.head.succ.size == 1)
      val qfMonitorCondition = ModelPlex.mxPartialQE(simplifiedFOMonitorCondition.subgoals.head.succ.head, entry.defs, tool)
    3. With manual splitting

      If abbreviations are not sufficient, additional preprocessing as below may split the single partialQE attempt into multiple separate attempts.

      assert(simplifiedFOMonitorCondition.subgoals.size == 1 && simplifiedFOMonitorCondition.subgoals.head.ante.isEmpty && simplifiedFOMonitorCondition.subgoals.head.succ.size == 1)
      val kernelPos = PosInExpr(List.fill(unobservable.size)(0))
      val (ctx, mxKernel) = Context.at(simplifiedFOMonitorCondition.subgoals.head.succ.head, kernelPos)
      val normalizedSimplifiedFOMonitorComponents = FormulaTools.disjuncts(FormulaTools.disjunctiveNormalForm(mxKernel.asInstanceOf[Formula])).map(ctx)
      
      val qfMonitorCondition = 
        normalizedSimplifiedFOMonitorComponents.map(f => tool.simplify(ModelPlex.mxPartialQE(f, entry.defs, tool), assumptions)).reduceRightOption(Or).getOrElse(True)
    4. With stepwise existential quantifiers

      If arithmetic complexity is still unmanageable, observability can be tackled in multiple runs: in Step 3, try to not specify all unobservable quantities at once and continue Steps 4-7 to determine a quantifier-free monitor condition on that subset first. Then, one-by-one, add existential quantifiers to the quantifier-free monitor condition and eliminate to obtain a quantifier-free monitor condition with one less observable quantity; continue until all unobservable quantities are eliminated from the monitor condition.

    5. With nested stepwise existential quantifiers and variables replacing function symbols

      In some cases, the arithmetic backend may not handle function symbols well and return formulas containing terms 0[]. These terms do not have a corresponding representation in KeYmaera X and therefore result in a conversion exception. The snippet below temporarily replaces 0-ary function symbols with variables to avoid this issue.

      def stepwisePartialQE(fml: Formula): Formula = fml match {
         case Exists(x, p) =>
           val monitorComponents = FormulaTools.disjuncts(stepwisePartialQE(p)).map(Exists(x, _))           
           val componentResults = monitorComponents.zipWithIndex.map({ case (f, i) =>
              val consts = StaticSemantics.symbols(f).
                 filter({ case Function(_, _, Unit, Real, false) => true case _ => false }).
                 map({ case fn@Function(n, i, Unit, s, _) => FuncOf(fn, Nothing) -> Variable(n, i, s) }).toList
              val fml = consts.foldLeft(f)({ case (fml, (fn, v)) => fml.replaceAll(fn, v) })
              val result = tool.simplify(ModelPlex.mxPartialQE(fml, entry.defs, tool), assumptions)
              consts.foldLeft(result)({ case (fml, (fn, v)) => fml.replaceAll(v, fn) })
              })
           FormulaTools.disjunctiveNormalForm(componentResults.
              reduceRightOption[Formula]({ case (p, q) => tool.simplify(Or(p, q), assumptions) }).getOrElse(True))
         case _ => FormulaTools.disjunctiveNormalForm(fml)
      }
      
      val qfMonitorCondition = stepwisePartialQE(simplifiedFOMonitorCondition.subgoals.head.succ.head)
  5. Rephrase monitor as test program

    val monitorProgram = proveBy(qfMonitorCondition, ModelPlex.chaseToTests(combineTests=false)(1))

    Prepare for exporting code by rephrasing the monitor condition as a hybrid program that represents each of the monitor components as a test.

  6. Export code, for example to Python

    assert(monitorProgram.subgoals.size == 1 && monitorProgram.subgoals.head.ante.isEmpty && monitorProgram.subgoals.head.succ.size == 1)
    val prg = monitorProgram.subgoals.head.succ.head
    val inputs = CodeGenerator.getInputs(prg)
    val sensorsForUnobservables = unobservable.flatMap({
       case (_, Some(v)) => StaticSemantics.freeVars(v).toSet -- stateVars
       case _ => Set.empty[Variable]
    }).toList
    val observable = stateVars.toSet.diff(unobservable.keySet.filter(_.isInstanceOf[BaseVariable]).map(_.asInstanceOf[BaseVariable])) ++
       sensorsForUnobservables.map(_.asInstanceOf[BaseVariable])
    val (monitorCodeDefs, monitorCodeBody) = (new PythonGenerator(new PythonMonitorGenerator('resist, entry.defs), init, entry.defs))(prg, observable, inputs, "Water tank monitor")

    Determine which quantities are inputs to the monitor, which variables represent sensors for unobservable quantities, and which variables are directly observable. Then instruct the code generator to emit monitor code, here in Python.

The full example with all steps is listed below.

val Some(entry) = KeYmaeraXArchiveParser.getEntry("ModelPlex/Partial Observability/Water tank with flow disturbance is safe",
    io.Source.fromInputStream(getClass.getResourceAsStream("/keymaerax-projects/modelplex/partialobservability.kyx")).mkString)
val model = entry.model.exhaustiveSubst(USubst(entry.defs.substs.filter(_.what.isInstanceOf[SystemConst]))).asInstanceOf[Formula]

val stateVars = StaticSemantics.boundVars(model).toSet.filter(_.isInstanceOf[BaseVariable]).toList.asInstanceOf[List[BaseVariable]]
val unobservable = ListMap[NamedSymbol, Option[Formula]](
    Variable("l") -> Some("0 <= lU() & lS-lU() <= l & l <= lS+lU()".asFormula),
    Variable("fd") -> None,
    Function("D", None, Unit, Real) -> None
)
val ModelPlexConjecture(init, dLMonitorCondition, assumptions) = ModelPlex.createMonitorSpecificationConjecture(model, stateVars, unobservable)
println("dL monitor condition: " + dLMonitorCondition.prettyString)

val chaseStartPos = List.fill(unobservable.size)(0) ++ (if (unobservable.values.exists(_.isDefined)) 1::Nil else Nil)
val foMonitorCondition = proveBy(dLMonitorCondition, ModelPlex.modelMonitorByChase(1, chaseStartPos))
println("First-order monitor condition: " + foMonitorCondition)

val simplifiedFOMonitorCondition = proveBy(foMonitorCondition, ModelPlex.optimizationOneWithSearch(Some(tool), assumptions,
    unobservable.keySet.toList, Some(ModelPlex.mxSimplify))(1))
println("Witness condition: " + simplifiedFOMonitorCondition.prettyString)

def stepwisePartialQE(fml: Formula): Formula = fml match {
    case Exists(x, p) =>
    val monitorComponents = FormulaTools.disjuncts(stepwisePartialQE(p)).map(Exists(x, _))
    val componentResults = monitorComponents.zipWithIndex.map({ case (f, i) =>
        //@note backend may not always support f[] well in the context of existential quantifiers
        val consts = StaticSemantics.symbols(f).
           filter({ case Function(_, _, Unit, Real, false) => true case _ => false }).
           map({ case fn@Function(n, i, Unit, s, _) => FuncOf(fn, Nothing) -> Variable(n, i, s) }).toList
        val fml = consts.foldLeft(f)({ case (fml, (fn, v)) => fml.replaceAll(fn, v) })
        val result = tool.simplify(ModelPlex.mxPartialQE(fml, entry.defs, tool), assumptions)
        consts.foldLeft(result)({ case (fml, (fn, v)) => fml.replaceAll(v, fn) })
    })
    FormulaTools.disjunctiveNormalForm(componentResults.
        reduceRightOption[Formula]({ case (p, q) => tool.simplify(Or(p, q), assumptions) }).getOrElse(True))
    case _ => FormulaTools.disjunctiveNormalForm(fml)
}

assert(simplifiedFOMonitorCondition.subgoals.size == 1 && simplifiedFOMonitorCondition.subgoals.head.ante.isEmpty && simplifiedFOMonitorCondition.subgoals.head.succ.size == 1)
val qfMonitorCondition = stepwisePartialQE(simplifiedFOMonitorCondition.subgoals.head.succ.head)

println("QF monitor condition: " + qfMonitorCondition.prettyString)

val monitorProgram = proveBy(qfMonitorCondition, ModelPlex.chaseToTests(combineTests=false)(1))
println("Monitor program: " + monitorProgram.prettyString)

assert(monitorProgram.subgoals.size == 1 && monitorProgram.subgoals.head.ante.isEmpty && monitorProgram.subgoals.head.succ.size == 1)
val prg = monitorProgram.subgoals.head.succ.head
val inputs = CodeGenerator.getInputs(prg)
val sensorsForUnobservables = unobservable.flatMap({
    case (_, Some(v)) => StaticSemantics.freeVars(v).toSet -- stateVars
    case _ => Set.empty[Variable]
}).toList
val observable = stateVars.toSet.diff(unobservable.keySet.filter(_.isInstanceOf[BaseVariable]).map(_.asInstanceOf[BaseVariable])) ++
    sensorsForUnobservables.map(_.asInstanceOf[BaseVariable])
val (monitorCodeDefs, monitorCodeBody) = (new PythonGenerator(new PythonMonitorGenerator('resist, entry.defs), init, entry.defs))(prg, observable, inputs, "Water tank monitor")
println("Monitor Python implementation:\n" + monitorCodeDefs + "\n" + monitorCodeBody)