-
Notifications
You must be signed in to change notification settings - Fork 39
KeYmaera X Code Snippets
Exporting Mathematica QE calls as generated by the builtin QE
tactic takes two steps:
- Execute tactic
prepareQE
to normalize a formula and form its universal closure - Transform the normalized universal closure to Mathematica syntax
The code below performs both steps:
package edu.cmu.cs.ls.keymaerax.btactics
import edu.cmu.cs.ls.keymaerax.parser.StringConverter._
import edu.cmu.cs.ls.keymaerax.btactics.TactixLibrary._
import edu.cmu.cs.ls.keymaerax.infrastruct.Augmentors.SequentAugmentor
import edu.cmu.cs.ls.keymaerax.lemma.Lemma
import edu.cmu.cs.ls.keymaerax.tools.ext.MathematicaQEToolBridge
import edu.cmu.cs.ls.keymaerax.tools.qe.MathematicaConversion
import org.scalatest.LoneElement._
class QETools extends TacticTestBase {
"Prepare QE" should "prepare but not perform a QE call" in withMathematica { tool =>
// transform formula according to the QE tactic (normalize, universal closure), result in KeYmaera X syntax
proveBy("x>=2 ==> y>=1 -> x^2*y>=4 & \\forall z (z>0 -> x/z > 0)".asSequent, ToolTactics.prepareQE(List.empty, skip)).
subgoals.loneElement shouldBe "==> \\forall y \\forall x (x>=2 & y>=1 -> x^2*y>=4 & \\forall z (z>0 -> x/z > 0))".asSequent
proveBy("x>=2 ==> y>=1 -> x^2*y>=4 | \\forall z (z>0 -> x/z > 0)".asSequent, ToolTactics.prepareQE(List.empty, skip)).
subgoals.loneElement shouldBe "==> \\forall z \\forall y \\forall x (x>=2 & y>=1 & z>0 -> x^2*y>=4 | x/z > 0)".asSequent
}
"Export" should "export in Mathematica syntax" in withMathematica { tool =>
val preparedQE = proveBy("x>=2 ==> y>=1 -> x^2*y>=4 | \\forall z (z>0 -> x/z > 0)".asSequent,
ToolTactics.prepareQE(List.empty, skip)).subgoals.loneElement.toFormula
// export to Mathematica Reduce/Resolve by accessing private export methods; result in Mathematica syntax
val qeTool = tool.invokePrivate(PrivateMethod[MathematicaQEToolBridge[Lemma]]('mQE)()).qeTool
val mexpr = qeTool.invokePrivate(PrivateMethod[MathematicaConversion.MExpr]('qe)(preparedQE))
println(mexpr)
}
}
Model monitors check whether the system behaves consistently with the modeled differential equations. Automated synthesis of model monitors assumes that the differential equations are solvable in order to derive exact monitor conditions that describe the expected behavior. For differential equations without polynomial solutions (e.g., non-linear differential equations), instead of using solutions ModelPlex can inspect proofs to find safety-relevant regions that confine the differential equations.
val Some(spec) = KeYmaeraXArchiveParser.getEntry("A KeYmaera X specification with a proof", io.Source.fromFile("/path/to/spec.kyx").mkString)
ModelPlex.createNonlinearModelApprox(spec.name, spec.tactics.head._3, spec.defs)(spec.model)
In order to approximate differential equations from proofs, the specification file must contain a proof tactic, e.g., as in the entry "ModelPlex/Partial Observability/Curved Ground Robot Motion is Safe" in partialobservability.kyx.
ModelPlex then reads off safety regions from differential invariant (diffInvariant
) and differential cut (dC
) steps in the proof. The discrete approximation has the shape below, e.g., see the entry "ModelPlex/Partial Observability/Approximated Curved Ground Robot Motion is Safe" in partialobservability.kyx for an example of a discrete approximation derived from the specification above.
HP ghosts ::= { x_0:=x; };
HP approxPlant ::= { x:=*; };
HP evolDomTest ::= { ?Q(x); };
HP discretePlant ::= {
ghosts;
evolDomTest;
approxPlant;
evolDomTest;
};
The intuition behind this discrete approximation is that not all safety proofs rely on the exact differential equations to prove safety. Some only need to know that the continuous behavior stayed inside a certain region as documented by differential cut/differential weakening steps in the proof and the evolution domain constraint in the model.
This discrete approximation step is possible when the tactics in spec.tactics.head._3
have a certain shape: all differential equation reasoning first builds a region by using differential cut/differential invariant steps, and then discards the exact continuous behavior using differential weakening, as illustrated in the tactic shape below.
implyR(1);
loop("J(x)", 1); <(
"Init": QE,
"Post": QE,
"Step":
composeb(1);
choiceb(1);
andR(1); <(
"[left branch;]p(x)":
diffInvariant("I_1(x,x_0)", 1);
diffInvariant("I_2(x,x_0)", 1);
dW(1);
QE,
"[right branch;]p(x)":
diffInvariant("I_1(x,x_0)", 1);
diffInvariant("I_3(x,x_0)", 1);
dW(1);
QE
)
)
All branches in the proof first perform diffInvariant
steps to describe the relevant safety region, and then end in dW
. The procedure ModelPlex.createNonlinearModelApprox
picks up the conditions I_1
, I_2
, and I_3
from the proof. The resulting model monitors then check the true continuous behavior for staying inside these conditions.
ModelPlex for partially observable systems enables uniform treatment of various sources of uncertainty and unobservability in ModelPlex, including sensor measurement uncertainty, actuator disturbance, and unknown quantities/parameters. Partial ModelPlex monitors can retroactively add uncertainty to idealized hybrid system models.
As an example, the instructions here use a model of a water tank with flow disturbance.
-
Read a KeYmaera X specification from a file
val Some(spec) = KeYmaeraXArchiveParser.getEntry("A KeYmaera X specification", io.Source.fromFile("/path/to/spec.kyx").mkString) // e.g. val Some(spec) = KeYmaeraXArchiveParser.getEntry("ModelPlex/Partial Observability/Water tank with flow disturbance is safe", // io.Source.fromInputStream(getClass.getResourceAsStream("/keymaerax-projects/modelplex/partialobservability.kyx")).mkString) val model = spec.model.exhaustiveSubst(USubst(entry.defs.substs.filter(_.what.isInstanceOf[SystemConst]))).asInstanceOf[Formula]
The specification must be of the shape
assumptions -> [prg;]safe
, whereassumptions
is a first-order formula,prg
is a hybrid program (not a game), andsafe
a dL formula. If the specification uses definitions, all programs must be expanded fully, but function and predicate symbols may stay unexpanded. -
Determine the state variables of the specification
val stateVars = StaticSemantics.boundVars(model).toSet.filter(_.isInstanceOf[BaseVariable]).toList.asInstanceOf[List[BaseVariable]]
ModelPlex monitors all variables that are bound by the model. If some of those variables are not observable in the system (e.g., because there are no sensors directly providing measurements for a certain quantity) or only observable with measurement uncertainty, they are still state variables but will be characterized using existential quantifiers. Model parameters can also be treated as unobservable.
-
Specify sensors (optional)
val unobservable: ListMap[NamedSymbol, Option[Formula]] = ListMap( Variable("l") -> Some("0 <= lU() & lS-lU() <= l & l <= lS+lU()".asFormula), Variable("fd") -> None, Function("D", None, Unit, Real) -> None)
In this example, we declare that the water tank load
l
is not observable as an exact quantity as mandated by the model, but our system provides a sensorlS
subject to measurement uncertaintylU()
, which becomes a parameter of the monitor. For actuation, the example model is more detailed by distinguishing between the flowf
chosen by the controller and the actual flowfd
and its disturbance boundsD
. The truefd
is entirely unobservable and we may not knowD
, so declare them both asNone
.
If all bound variables of the model are observable, define val unobservable = ListMap.empty
.
-
Create the ModelPlex conjecture
val ModelPlexConjecture(init, dLMonitorCondition, assumptions) = ModelPlex.createMonitorSpecificationConjecture(model, stateVars, unobservable)
The result is a monitor condition in dL with existentially quantified unobservable quantities and their optional sensor specifications.
-
Symbolically execute the program
val foMonitorCondition = proveBy(dLMonitorCondition, ModelPlex.controllerMonitorByChase(1))
Symbolic execution applies program proof rules until the effect of all hybrid programs in the model are determined in first-order form. The result contains (existential) quantifiers but no programs. None of the unobservable quantities is free in
foMonitorCondition
, but they may still occur existentially quantified. Steps 6+7 remove those quantifiers. -
Search for existential witnesses
val simplifiedFOMonitorCondition = proveBy(foMonitorCondition, ModelPlex.optimizationOneWithSearch(Some(tool), assumptions, unobservable.keySet.toList, Some(ModelPlex.mxSimplify))(1))
Tactic
ModelPlex.optimizationOneWithSearch
attempts to find witnesses for existential quantifiers in the first-order monitor condition and instantiates existential quantifiers with those witnesses (if any). WithSome(ModelPlex.mxSimplify)
we attempt to additionally simplify the monitor condition. This step can be omitted by providingNone
. Usually, the tactic finds some witnesses but not for all the existential quantifiers, so that in the next step we use quantifier elimination to obtain an equivalent quantifier-free formula. -
Eliminate remaining existential quantifiers: this step applies a backend for quantifier elimination to find an equivalent quantifier-free monitor formula. Even though decidable in theory, quantifier elimination is computationally expensive in practice. As a result, the automated tactics for this step often exceed any reasonable time budget, but there are several options to scale the process more manually.
-
With
partialQE
verbatim in the backend toolval qfMonitorCondition = proveBy(simplifiedFOMonitorCondition, partialQE & ModelPlex.mxSimplify(1) & ModelPlex.reassociate)
The steps
ModelPlex.mxSimplify(1)
andModelPlex.reassociate
beautify the monitor condition but can be omitted if too expensive. -
With function-abbreviating
mxPartialQE
and backend toolIn larger models, the first-order monitor conditions can become quite complex and the backend procedures used in
partialQE
may not provide results within a reasonable time budget. In this case, manual alternatives can make progress, but need careful inspection because they do not come with an unbroken chain of proofs. The next snippet abbreviates function symbols whose arguments are not quantified in order to reduce arithmetic complexity.assert(simplifiedFOMonitorCondition.subgoals.size == 1 && simplifiedFOMonitorCondition.subgoals.head.ante.isEmpty && simplifiedFOMonitorCondition.subgoals.head.succ.size == 1) val qfMonitorCondition = ModelPlex.mxPartialQE(simplifiedFOMonitorCondition.subgoals.head.succ.head, entry.defs, tool)
-
With manual splitting
If abbreviations are not sufficient, additional preprocessing as below may split the single
partialQE
attempt into multiple separate attempts.assert(simplifiedFOMonitorCondition.subgoals.size == 1 && simplifiedFOMonitorCondition.subgoals.head.ante.isEmpty && simplifiedFOMonitorCondition.subgoals.head.succ.size == 1) val kernelPos = PosInExpr(List.fill(unobservable.size)(0)) val (ctx, mxKernel) = Context.at(simplifiedFOMonitorCondition.subgoals.head.succ.head, kernelPos) val normalizedSimplifiedFOMonitorComponents = FormulaTools.disjuncts(FormulaTools.disjunctiveNormalForm(mxKernel.asInstanceOf[Formula])).map(ctx) val qfMonitorCondition = normalizedSimplifiedFOMonitorComponents.map(f => tool.simplify(ModelPlex.mxPartialQE(f, entry.defs, tool), assumptions)).reduceRightOption(Or).getOrElse(True)
-
With stepwise existential quantifiers
If arithmetic complexity is still unmanageable, observability can be tackled in multiple runs: in Step 3, try to not specify all unobservable quantities at once and continue Steps 4-7 to determine a quantifier-free monitor condition on that subset first. Then, one-by-one, add existential quantifiers to the quantifier-free monitor condition and eliminate to obtain a quantifier-free monitor condition with one less observable quantity; continue until all unobservable quantities are eliminated from the monitor condition.
-
With nested stepwise existential quantifiers and variables replacing function symbols
In some cases, the arithmetic backend may not handle function symbols well and return formulas containing terms
0[]
. These terms do not have a corresponding representation in KeYmaera X and therefore result in a conversion exception. The snippet below temporarily replaces 0-ary function symbols with variables to avoid this issue.def stepwisePartialQE(fml: Formula): Formula = fml match { case Exists(x, p) => val monitorComponents = FormulaTools.disjuncts(stepwisePartialQE(p)).map(Exists(x, _)) val componentResults = monitorComponents.zipWithIndex.map({ case (f, i) => val consts = StaticSemantics.symbols(f). filter({ case Function(_, _, Unit, Real, false) => true case _ => false }). map({ case fn@Function(n, i, Unit, s, _) => FuncOf(fn, Nothing) -> Variable(n, i, s) }).toList val fml = consts.foldLeft(f)({ case (fml, (fn, v)) => fml.replaceAll(fn, v) }) val result = tool.simplify(ModelPlex.mxPartialQE(fml, entry.defs, tool), assumptions) consts.foldLeft(result)({ case (fml, (fn, v)) => fml.replaceAll(v, fn) }) }) FormulaTools.disjunctiveNormalForm(componentResults. reduceRightOption[Formula]({ case (p, q) => tool.simplify(Or(p, q), assumptions) }).getOrElse(True)) case _ => FormulaTools.disjunctiveNormalForm(fml) } val qfMonitorCondition = stepwisePartialQE(simplifiedFOMonitorCondition.subgoals.head.succ.head)
-
-
Rephrase monitor as test program
val monitorProgram = proveBy(qfMonitorCondition, ModelPlex.chaseToTests(combineTests=false)(1))
Prepare for exporting code by rephrasing the monitor condition as a hybrid program that represents each of the monitor components as a test.
-
Export code, for example to Python
assert(monitorProgram.subgoals.size == 1 && monitorProgram.subgoals.head.ante.isEmpty && monitorProgram.subgoals.head.succ.size == 1) val prg = monitorProgram.subgoals.head.succ.head val inputs = CodeGenerator.getInputs(prg) val sensorsForUnobservables = unobservable.flatMap({ case (_, Some(v)) => StaticSemantics.freeVars(v).toSet -- stateVars case _ => Set.empty[Variable] }).toList val observable = stateVars.toSet.diff(unobservable.keySet.filter(_.isInstanceOf[BaseVariable]).map(_.asInstanceOf[BaseVariable])) ++ sensorsForUnobservables.map(_.asInstanceOf[BaseVariable]) val (monitorCodeDefs, monitorCodeBody) = (new PythonGenerator(new PythonMonitorGenerator('resist, entry.defs), init, entry.defs))(prg, observable, inputs, "Water tank monitor")
Determine which quantities are inputs to the monitor, which variables represent sensors for unobservable quantities, and which variables are directly observable. Then instruct the code generator to emit monitor code, here in Python.
The full example with all steps is listed below.
val Some(entry) = KeYmaeraXArchiveParser.getEntry("ModelPlex/Partial Observability/Water tank with flow disturbance is safe",
io.Source.fromInputStream(getClass.getResourceAsStream("/keymaerax-projects/modelplex/partialobservability.kyx")).mkString)
val model = entry.model.exhaustiveSubst(USubst(entry.defs.substs.filter(_.what.isInstanceOf[SystemConst]))).asInstanceOf[Formula]
val stateVars = StaticSemantics.boundVars(model).toSet.filter(_.isInstanceOf[BaseVariable]).toList.asInstanceOf[List[BaseVariable]]
val unobservable = ListMap[NamedSymbol, Option[Formula]](
Variable("l") -> Some("0 <= lU() & lS-lU() <= l & l <= lS+lU()".asFormula),
Variable("fd") -> None,
Function("D", None, Unit, Real) -> None
)
val ModelPlexConjecture(init, dLMonitorCondition, assumptions) = ModelPlex.createMonitorSpecificationConjecture(model, stateVars, unobservable)
println("dL monitor condition: " + dLMonitorCondition.prettyString)
val chaseStartPos = List.fill(unobservable.size)(0) ++ (if (unobservable.values.exists(_.isDefined)) 1::Nil else Nil)
val foMonitorCondition = proveBy(dLMonitorCondition, ModelPlex.modelMonitorByChase(1, chaseStartPos))
println("First-order monitor condition: " + foMonitorCondition)
val simplifiedFOMonitorCondition = proveBy(foMonitorCondition, ModelPlex.optimizationOneWithSearch(Some(tool), assumptions,
unobservable.keySet.toList, Some(ModelPlex.mxSimplify))(1))
println("Witness condition: " + simplifiedFOMonitorCondition.prettyString)
def stepwisePartialQE(fml: Formula): Formula = fml match {
case Exists(x, p) =>
val monitorComponents = FormulaTools.disjuncts(stepwisePartialQE(p)).map(Exists(x, _))
val componentResults = monitorComponents.zipWithIndex.map({ case (f, i) =>
//@note backend may not always support f[] well in the context of existential quantifiers
val consts = StaticSemantics.symbols(f).
filter({ case Function(_, _, Unit, Real, false) => true case _ => false }).
map({ case fn@Function(n, i, Unit, s, _) => FuncOf(fn, Nothing) -> Variable(n, i, s) }).toList
val fml = consts.foldLeft(f)({ case (fml, (fn, v)) => fml.replaceAll(fn, v) })
val result = tool.simplify(ModelPlex.mxPartialQE(fml, entry.defs, tool), assumptions)
consts.foldLeft(result)({ case (fml, (fn, v)) => fml.replaceAll(v, fn) })
})
FormulaTools.disjunctiveNormalForm(componentResults.
reduceRightOption[Formula]({ case (p, q) => tool.simplify(Or(p, q), assumptions) }).getOrElse(True))
case _ => FormulaTools.disjunctiveNormalForm(fml)
}
assert(simplifiedFOMonitorCondition.subgoals.size == 1 && simplifiedFOMonitorCondition.subgoals.head.ante.isEmpty && simplifiedFOMonitorCondition.subgoals.head.succ.size == 1)
val qfMonitorCondition = stepwisePartialQE(simplifiedFOMonitorCondition.subgoals.head.succ.head)
println("QF monitor condition: " + qfMonitorCondition.prettyString)
val monitorProgram = proveBy(qfMonitorCondition, ModelPlex.chaseToTests(combineTests=false)(1))
println("Monitor program: " + monitorProgram.prettyString)
assert(monitorProgram.subgoals.size == 1 && monitorProgram.subgoals.head.ante.isEmpty && monitorProgram.subgoals.head.succ.size == 1)
val prg = monitorProgram.subgoals.head.succ.head
val inputs = CodeGenerator.getInputs(prg)
val sensorsForUnobservables = unobservable.flatMap({
case (_, Some(v)) => StaticSemantics.freeVars(v).toSet -- stateVars
case _ => Set.empty[Variable]
}).toList
val observable = stateVars.toSet.diff(unobservable.keySet.filter(_.isInstanceOf[BaseVariable]).map(_.asInstanceOf[BaseVariable])) ++
sensorsForUnobservables.map(_.asInstanceOf[BaseVariable])
val (monitorCodeDefs, monitorCodeBody) = (new PythonGenerator(new PythonMonitorGenerator('resist, entry.defs), init, entry.defs))(prg, observable, inputs, "Water tank monitor")
println("Monitor Python implementation:\n" + monitorCodeDefs + "\n" + monitorCodeBody)