From 24c59c3f086480c9960c207527269fff61ae13c9 Mon Sep 17 00:00:00 2001 From: fhchl Date: Wed, 31 Jan 2024 10:38:42 +0100 Subject: [PATCH 01/10] WIP --- dynax/system.py | 1 + tests/test_systems.py | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/dynax/system.py b/dynax/system.py index 1082407..ba5960e 100644 --- a/dynax/system.py +++ b/dynax/system.py @@ -97,6 +97,7 @@ def output(self, x, u, t): # these attributes should be set by subclasses n_states: int | Literal["scalar"] = static_field(init=False) n_inputs: int | Literal["scalar"] = static_field(init=False) + initial_state: Array | None = static_field(default=None) def __check_init__(self): # Check that required attributes are initialized diff --git a/tests/test_systems.py b/tests/test_systems.py index cf4d407..3b13cf6 100644 --- a/tests/test_systems.py +++ b/tests/test_systems.py @@ -144,3 +144,15 @@ def test_discrete_forward_model(): scipy_t, scipy_y, scipy_x = dlsim(scipy_sys, u, x0=x0, t=t) npt.assert_allclose(scipy_y, y, **tols) npt.assert_allclose(scipy_x, x, **tols) + + +def test_initial_state(): + class Sys(DynamicalSystem): + n_states = "scalar" + n_inputs = "scalar" + + def vector_field(self, x, u, t=None): + return x * 0.1 + u + + Sys(initial_state=1) + From 86129efe799135eff872707000fa79181fca2dc7 Mon Sep 17 00:00:00 2001 From: fhchl Date: Wed, 31 Jan 2024 18:56:29 +0100 Subject: [PATCH 02/10] add cool ascii flow charts and work on system.py --- docs/conf.py | 1 + dynax/estimation.py | 22 +++- dynax/evolution.py | 89 ++++++++----- dynax/example_models.py | 18 ++- dynax/linearize.py | 87 ++++++------- dynax/system.py | 271 +++++++++++++++++++++++++-------------- tests/test_estimation.py | 64 ++++----- tests/test_evolution.py | 114 ++++++++++++++++ tests/test_linearize.py | 37 +++--- tests/test_systems.py | 126 ++---------------- 10 files changed, 474 insertions(+), 355 deletions(-) create mode 100644 tests/test_evolution.py diff --git a/docs/conf.py b/docs/conf.py index 3e263b5..f89159a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -20,6 +20,7 @@ "sphinx.ext.napoleon", "sphinx.ext.viewcode", "sphinxcontrib.bibtex", + "sphinxcontrib.aafig", "sphinx_autodoc_typehints", "nbsphinx", ] diff --git a/dynax/estimation.py b/dynax/estimation.py index b98818f..bc7d57d 100644 --- a/dynax/estimation.py +++ b/dynax/estimation.py @@ -150,7 +150,7 @@ def fit_least_squares( model: AbstractEvolution, t: ArrayLike, y: ArrayLike, - x0: ArrayLike, + x0: Optional[ArrayLike] = None, u: Optional[ArrayLike] = None, batched: bool = False, sigma: Optional[ArrayLike] = None, @@ -168,7 +168,11 @@ def fit_least_squares( """ t = jnp.asarray(t) y = jnp.asarray(y) - x0 = jnp.asarray(x0) + + if x0 is not None: + x0 = jnp.asarray(x0) + else: + x0 = model.system.initial_state if batched: # First axis holds experiments, second axis holds time. @@ -212,7 +216,7 @@ def residual_term(params): # this can use pmap, if batch size is smaller than CPU cores model = jax.vmap(model) # FIXME: ucoeffs not supported for Map - _, pred_y = model(x0, t=t, ucoeffs=ucoeffs) + _, pred_y = model(t=t, ucoeffs=ucoeffs, initial_state=x0) res = (y - pred_y) * weight return res.reshape(-1) @@ -246,7 +250,7 @@ def fit_multiple_shooting( model: AbstractEvolution, t: ArrayLike, y: ArrayLike, - x0: ArrayLike, + x0: Optional[ArrayLike] = None, u: Optional[Union[Callable[[float], Array], ArrayLike]] = None, num_shots: int = 1, continuity_penalty: float = 0.1, @@ -273,7 +277,11 @@ def fit_multiple_shooting( """ t = jnp.asarray(t) y = jnp.asarray(y) - x0 = jnp.asarray(x0) + + if x0 is not None: + x0 = jnp.asarray(x0) + else: + x0 = model.system.initial_state if u is None: msg = ( @@ -332,7 +340,7 @@ def fit_multiple_shooting( def residuals(params): x0s, model = unravel(params) x0s = jnp.concatenate((x0[None], x0s), axis=0) - xs_pred, ys_pred = jax.vmap(model)(x0s, t=ts0, ucoeffs=ucoeffs) + xs_pred, ys_pred = jax.vmap(model)(t=ts0, ucoeffs=ucoeffs, initial_state=x0s) # output residual res_y = ((ys - ys_pred) / std_y).reshape(-1) res_y = res_y / np.sqrt(len(res_y)) @@ -362,7 +370,7 @@ def transfer_function(sys: DynamicalSystem, to_states: bool = False, **kwargs): def H(s: complex): """Transfer-function at s.""" - identity = np.eye(linsys.n_states) + identity = np.eye(linsys.initial_state.size) phi_B = jnp.linalg.solve(s * identity - A, B) if to_states: return phi_B diff --git a/dynax/evolution.py b/dynax/evolution.py index 31d61ee..20a8f70 100644 --- a/dynax/evolution.py +++ b/dynax/evolution.py @@ -1,56 +1,74 @@ -from typing import Callable, Optional +from abc import abstractmethod +from typing import Callable, cast, Optional import diffrax as dfx import equinox as eqx import jax import jax.numpy as jnp -from jaxtyping import Array, ArrayLike, PyTree +from jaxtyping import Array, PyTree from .interpolation import spline_it from .system import DynamicalSystem from .util import broadcast_right, dim2shape +def check_shape(shape, dim, arg): + if not (dim == "scalar" and shape != ()) and not (shape[1:] == (dim,)): + raise ValueError(f"Argument {arg} of shape {shape} is size {dim}.") + + class AbstractEvolution(eqx.Module): """Abstract base-class for evolutions.""" - def __call__(self, x0: ArrayLike, t: Array, u: Array, **kwargs): - raise NotImplementedError + system: DynamicalSystem + @abstractmethod + def __call__( + self, t: Array, u: Optional[Array], initial_state: Optional[Array] + ) -> tuple[Array, Array]: + """Evolve an initial state along the vector field and compute output. -def check_shape(shape, dim, arg): - if not (dim == "scalar" and shape != ()) and not (shape[1:] == (dim,)): - raise ValueError(f"Argument {arg} of shape {shape} is size {dim}.") + Args: + t: The time periode over which to solve. + u: An optional input sequence of same length. + initial_state: An optional, fixed initial state used instead of + `system.initial_state`. + + """ + raise NotImplementedError class Flow(AbstractEvolution): - """Evolution function for continous-time dynamical system.""" + """Evolution for continous-time dynamical systems.""" - system: DynamicalSystem solver: dfx.AbstractAdaptiveSolver = eqx.static_field(default_factory=dfx.Dopri5) step: dfx.AbstractStepSizeController = eqx.static_field( default_factory=dfx.ConstantStepSize - ) + ) # TODO: replace with adaptive step size dt0: Optional[float] = eqx.static_field(default=None) def __call__( self, - x0: ArrayLike, t: Array, u: Optional[Array] = None, + initial_state: Optional[Array] = None, + *, ufun: Optional[Callable[[float], Array]] = None, ucoeffs: Optional[tuple[PyTree, PyTree, PyTree, PyTree]] = None, **diffeqsolve_kwargs, ) -> tuple[Array, Array]: """Solve initial value problem for state and output trajectories.""" + # Parse inputs. t = jnp.asarray(t) - x0 = jnp.asarray(x0) - # Check initial state shape - if x0.shape != dim2shape(self.system.n_states): - raise ValueError("Initial state dimenions do not match.") + if initial_state is not None: + x = jnp.asarray(initial_state) + if initial_state.shape != self.system.initial_state.shape: + raise ValueError("Initial state dimenions do not match.") + else: + initial_state = self.system.initial_state - # Prepare input function + # Prepare input function. if u is None and ufun is None and ucoeffs is None and self.system.n_inputs == 0: _ufun = lambda t: jnp.empty((0,)) elif ucoeffs is not None: @@ -66,7 +84,7 @@ def __call__( else: raise ValueError("Must specify one of u, ufun, or ucoeffs.") - # Check shape of ufun return values + # Check shape of ufun return values. out = jax.eval_shape(_ufun, 0.0) if not isinstance(out, jax.ShapeDtypeStruct): raise ValueError(f"ufun must return Arrays, not {type(out)}.") @@ -74,12 +92,12 @@ def __call__( if not out.shape == dim2shape(self.system.n_inputs): raise ValueError("Input dimensions do not match.") - # Solve ODE + # Solve ODE. diffeqsolve_default_options = dict( solver=self.solver, stepsize_controller=self.step, saveat=dfx.SaveAt(ts=t), - max_steps=50 * len(t), + max_steps=50 * len(t), # completely arbitrary number of steps adjoint=dfx.DirectAdjoint(), dt0=self.dt0 if self.dt0 is not None else t[1], ) @@ -90,40 +108,46 @@ def __call__( term, t0=t[0], t1=t[-1], - y0=x0, + y0=initial_state, args=self, # https://github.com/patrick-kidger/diffrax/issues/135 **diffeqsolve_default_options, ).ys + # Could be in general a Pytree, but we only allow Array states. + x = cast(Array, x) - # Compute output + # Compute output. y = jax.vmap(self.system.output)(x, u, t) return x, y class Map(AbstractEvolution): - """Flow map for evolving a discrete-time dynamical system.""" - - system: DynamicalSystem + """Evolution for discrete-time dynamical systems.""" def __call__( self, - x0: ArrayLike, t: Optional[Array] = None, u: Optional[Array] = None, + initial_state: Optional[Array] = None, + *, num_steps: Optional[int] = None, - ): + ) -> tuple[Array, Array]: """Solve discrete map.""" - x0 = jnp.asarray(x0) + + # Parse inputs. + if initial_state is not None: + x = jnp.asarray(initial_state) + if initial_state.shape != self.system.initial_state.shape: + raise ValueError("Initial state dimenions do not match.") + else: + initial_state = self.system.initial_state if t is not None: t = jnp.asarray(t) - num_steps = len(t) elif u is not None: u = jnp.asarray(u) - num_steps = len(u) elif num_steps is not None: - t = jnp.zeros(num_steps) + t = jnp.arange(num_steps) else: raise ValueError("must specify one of num_steps, t, or u.") @@ -139,14 +163,15 @@ def __call__( inputs = u unpack = lambda input: (None, input) + # Evolve. def scan_fun(state, input): t, u = unpack(input) next_state = self.system.vector_field(state, u, t) return next_state, state - _, x = jax.lax.scan(scan_fun, x0, inputs, length=num_steps) + _, x = jax.lax.scan(scan_fun, initial_state, inputs, length=num_steps) - # Compute output + # Compute output. y = jax.vmap(self.system.output)(x, u, t) return x, y diff --git a/dynax/example_models.py b/dynax/example_models.py index f4fe6d9..df741d3 100644 --- a/dynax/example_models.py +++ b/dynax/example_models.py @@ -13,7 +13,8 @@ class PlasticFlowLinElastic(DynamicalSystem): kappa: float = non_negative_field() alpha: float sigma_0: float - n_states = 2 + + initial_state = jnp.zeros(2) n_inputs = "scalar" def vector_field(self, x, u, t=None): @@ -41,7 +42,8 @@ class SpringMassDamper(DynamicalSystem): m: float r: float k: float - n_states = 2 + + initial_state = jnp.zeros(2) n_inputs = "scalar" def vector_field(self, x, u, t=None): @@ -61,7 +63,8 @@ class NonlinearDrag(ControlAffine): k: float m: float outputs: list[int] = static_field(default_factory=lambda: [0]) - n_states = 2 + + initial_state = jnp.zeros(2) n_inputs = "scalar" def f(self, x): @@ -80,7 +83,7 @@ def h(self, x): class Sastry9_9(ControlAffine): """Sastry Example 9.9""" - n_states = 3 + initial_state = jnp.zeros(3) n_inputs = "scalar" def f(self, x): @@ -94,13 +97,14 @@ def h(self, x): class LotkaVolterra(DynamicalSystem): - n_states = 2 - n_inputs = 0 alpha: float = non_negative_field() beta: float = non_negative_field() gamma: float = non_negative_field() delta: float = non_negative_field() + initial_state = jnp.ones(2) + n_inputs = 0 + def vector_field(self, x, u=None, t=None): x, y = x return jnp.array( @@ -118,6 +122,8 @@ class SpringMassWithBoucWenHysteresis(DynamicalSystem): n: float = non_negative_field(min_val=1.0) a: float = boxed_field(0.0, 1.0) + initial_state = jnp.zeros(3) + def vector_field(self, x, u=None, t=None): if u is None: u = 0 diff --git a/dynax/linearize.py b/dynax/linearize.py index bfb61d3..1335c6a 100644 --- a/dynax/linearize.py +++ b/dynax/linearize.py @@ -11,7 +11,13 @@ from jaxtyping import Array from .derivative import lie_derivative -from .system import ControlAffine, DynamicalSystem, LinearSystem +from .system import ( + _CoupledSystemMixin, + ControlAffine, + DynamicalSystem, + DynamicStateFeedbackSystem, + LinearSystem, +) # TODO: make this a method of ControlAffine @@ -211,39 +217,43 @@ def fn(u, args): return feedbacklaw -class DiscreteLinearizingSystem(DynamicalSystem): +class DiscreteLinearizingSystem(DynamicalSystem, _CoupledSystemMixin): r"""Dynamics computing linearizing feedback as output.""" - sys: ControlAffine - refsys: LinearSystem - feedbacklaw: Callable + _v: Callable n_inputs = "scalar" - def __init__(self, sys, refsys, reldeg, linearizing_output=None): + def __init__( + self, + sys: ControlAffine, + refsys: LinearSystem, + reldeg: int, + **fb_kwargs, + ): if sys.n_inputs != "scalar": raise ValueError("Only single input systems supported.") - self.sys = sys - self.refsys = refsys - self.n_states = self.sys.n_states + self.refsys.n_states + 1 - self.feedbacklaw = discrete_input_output_linearize( - sys, reldeg, refsys, linearizing_output + self._sys1 = sys + self._sys2 = refsys + self.initial_state = jnp.append( + self._pack_states(self._sys1.initial_state, self._sys2.initial_state), 0.0 ) + self._v = discrete_input_output_linearize(sys, reldeg, refsys, **fb_kwargs) - def vector_field(self, x, u, t=None): - x, z, v_last = x[: self.sys.n_states], x[self.sys.n_states : -1], x[-1] - v = self.feedbacklaw(x, z, u, v_last) - xn = self.sys.vector_field(x, v) - zn = self.refsys.vector_field(z, u) - return jnp.concatenate((xn, zn, jnp.array([v]))) - - def output(self, x, u, t=None): - x, z, v_last = x[: self.sys.n_states], x[self.sys.n_states : -1], x[-1] - v = self.feedbacklaw(x, z, u, v_last) # FIXME: feedback law called twice + def vector_field(self, x, u=None, t=None): + (x, z), v_last = self._unpack_states(x[:-1]), x[-1] + v = self._v(x, z, u, v_last) + xn = self._sys1.vector_field(x, v) + zn = self._sys2.vector_field(z, u) + return jnp.append(self._pack_states(xn, zn), v) + + def output(self, x, u=None, t=None): + (x, z), v_last = self._unpack_states(x[:-1]), x[-1] + v = self._v(x, z, u, v_last) # FIXME: feedback law called twice return v -class LinearizingSystem(DynamicalSystem): +class LinearizingSystem(DynamicStateFeedbackSystem): r"""Coupled ODE of nonlinear dynamics, linear reference and io linearizing law. .. math:: @@ -259,10 +269,6 @@ class LinearizingSystem(DynamicalSystem): """ - sys: ControlAffine - refsys: LinearSystem - feedbacklaw: Callable[[Array, Array, float], float] - n_inputs = "scalar" def __init__( @@ -270,29 +276,12 @@ def __init__( sys: ControlAffine, refsys: LinearSystem, reldeg: int, - feedbacklaw: Optional[Callable] = None, - linearizing_output: Optional[int] = None, + **fb_kwargs, ): - self.sys = sys - self.refsys = refsys - self.n_states = ( - self.sys.n_states + self.refsys.n_states - ) # FIXME: support "scalar" - if callable(feedbacklaw): - self.feedbacklaw = feedbacklaw - else: - self.feedbacklaw = input_output_linearize( - sys, reldeg, refsys, linearizing_output - ) - - def vector_field(self, x, u=None, t=None): - x, z = x[: self.sys.n_states], x[self.sys.n_states :] - y = self.feedbacklaw(x, z, u) - dx = self.sys.vector_field(x, y) - dz = self.refsys.vector_field(z, u) - return jnp.concatenate((dx, dz)) + v = input_output_linearize(sys, reldeg, refsys, **fb_kwargs) + super().__init__(sys, refsys, v) def output(self, x, u, t=None): - x, z = x[: self.sys.n_states], x[self.sys.n_states :] - ur = self.feedbacklaw(x, z, u) - return ur + x, z = self._unpack_states(x) + v = self._v(x, z, u) + return v diff --git a/dynax/system.py b/dynax/system.py index ba5960e..b127d21 100644 --- a/dynax/system.py +++ b/dynax/system.py @@ -1,5 +1,6 @@ """Classes for representing dynamical systems.""" +from abc import abstractmethod from collections.abc import Callable from dataclasses import field from typing import Literal @@ -63,6 +64,9 @@ def non_negative_field(min_val: float = 0.0, **kwargs): return boxed_field(lower=min_val, upper=np.inf, **kwargs) +# TODO: make abstract + + class DynamicalSystem(eqx.Module): r"""A continous-time dynamical system. @@ -94,48 +98,54 @@ def output(self, x, u, t): """ - # these attributes should be set by subclasses - n_states: int | Literal["scalar"] = static_field(init=False) + initial_state: Array = static_field(init=False) n_inputs: int | Literal["scalar"] = static_field(init=False) - initial_state: Array | None = static_field(default=None) def __check_init__(self): # Check that required attributes are initialized - required_attrs = ["n_states", "n_inputs"] + required_attrs = ["initial_state", "n_inputs"] for attr in required_attrs: if not hasattr(self, attr): raise AttributeError(f"Attribute '{attr}' not initialized.") # Check that vector_field returns Arrays or scalars and not PyTrees - x = jax.ShapeDtypeStruct(dim2shape(self.n_states), jnp.float64) + x = self.initial_state u = jax.ShapeDtypeStruct(dim2shape(self.n_inputs), jnp.float64) - t = 1.0 - out = jax.eval_shape(self.vector_field, x, u, t) - if not isinstance(out, jax.ShapeDtypeStruct): + try: + dx = jax.eval_shape(self.vector_field, x, u, t=1.0) + y = jax.eval_shape(self.output, x, u, t=1.0) + except Exception as e: + raise ValueError( + "Can not evaluate output shapes. Check your definitions!" + ) from e + if not isinstance(dx, jax.ShapeDtypeStruct): raise ValueError( - f"vector_field must return arrays or scalars, not {type(out)}" + f"vector_field must return arrays or scalars, not {type(dx)}" ) + if not isinstance(y, jax.ShapeDtypeStruct): + raise ValueError(f"outpuut must return arrays or scalars, not {type(y)}") + + @abstractmethod + def vector_field(self, x, u=None, t=None) -> Array: + """Compute state derivative.""" + raise NotImplementedError + + def output(self, x, u=None, t=None) -> Array: + """Compute output.""" + return x @property def n_outputs(self) -> int | Literal["scalar"]: # Compute output size - x = jax.ShapeDtypeStruct(dim2shape(self.n_states), jnp.float64) + x = self.initial_state u = jax.ShapeDtypeStruct(dim2shape(self.n_inputs), jnp.float64) y = jax.eval_shape(self.output, x, u, t=1.0) return "scalar" if y.ndim == 0 else y.shape[0] - def vector_field(self, x, u=None, t=None): - """Compute state derivative.""" - raise NotImplementedError - - def output(self, x, u=None, t=None): - """Compute output.""" - return x - def linearize(self, x0=None, u0=None, t=None) -> "LinearSystem": """Compute the approximate linearized system around a point.""" if x0 is None: - x0 = jnp.zeros(dim2shape(self.n_states)) + x0 = self.initial_state if u0 is None: u0 = jnp.zeros(dim2shape(self.n_inputs)) A, B, C, D = _linearize(self.vector_field, self.output, x0, u0, t) @@ -227,36 +237,40 @@ def __init__(self, A: ArrayLike, B: ArrayLike, C: ArrayLike, D: ArrayLike): self.C = jnp.array(C) self.D = jnp.array(D) - # Extract number of states and inputs from matrices - self.n_states = "scalar" if self.A.ndim == 0 else self.A.shape[0] - if self.n_states == "scalar": + @property + def initial_state(self) -> Array: # type: ignore + return jnp.array(0) if self.A.ndim == 0 else jnp.zeros(self.A.shape[0]) + + @property + def n_inputs(self) -> int | Literal["scalar"]: # type: ignore + if self.initial_state.ndim == 0: if self.B.ndim == 0: - self.n_inputs = "scalar" + return "scalar" elif self.B.ndim == 1: - self.n_inputs = self.B.size - else: - raise ValueError("Dimension mismatch.") + return self.B.size else: if self.B.ndim == 1: - self.n_inputs = "scalar" + return "scalar" elif self.B.ndim == 2: - self.n_inputs = self.B.shape[1] - else: - raise ValueError("Dimension mismatch.") + return self.B.shape[1] + raise ValueError("Dimension mismatch.") - def vector_field(self, x, u=None, t=None): + def vector_field(self, x, u=None, t=None) -> Array: out = self.A.dot(x) if u is not None: out += self.B.dot(u) return out - def output(self, x, u=None, t=None): + def output(self, x, u=None, t=None) -> Array: out = self.C.dot(x) if u is not None: out += self.D.dot(u) return out +# TODO: make abstract + + class ControlAffine(DynamicalSystem): r"""A control-affine dynamical system. @@ -276,7 +290,6 @@ def g(self, x): def h(self, x): return x - # FIXME: remove time dependence def vector_field(self, x, u=None, t=None): if u is None: u = 0 @@ -286,73 +299,122 @@ def output(self, x, u=None, t=None): return self.h(x) -class SeriesSystem(DynamicalSystem): - """Two systems in series.""" - +class _CoupledSystemMixin(eqx.Module): _sys1: DynamicalSystem _sys2: DynamicalSystem + def _pack_states(self, x1, x2) -> Array: + return jnp.concatenate( + ( + jnp.atleast_1d(x1), + jnp.atleast_1d(x2), + ) + ) + + def _unpack_states(self, x): + sys1_size = ( + 1 + if jnp.ndim(self._sys1.initial_state) == 0 + else self._sys1.initial_state.size + ) + return ( + x[:sys1_size].reshape(self._sys1.initial_state.shape), + x[sys1_size:].reshape(self._sys2.initial_state.shape), + ) + + +class SeriesSystem(DynamicalSystem, _CoupledSystemMixin): + r"""Two systems in series. + + .. math:: + + ẋ_1 &= f_1(x_1, u, t) \\ + y_1 &= h_1(x_1, u, t) \\ + ẋ_2 &= f_2(x_2, y1, t) \\ + y_2 &= h_2(x_2, y1, t) + + .. aafig:: + + +------+ +------+ + u --+->+ sys1 +--y1->+ sys2 +--> y2 + +------+ +------+ + + """ + def __init__(self, sys1: DynamicalSystem, sys2: DynamicalSystem): """ Args: sys1: system with n outputs sys2: system with n inputs + """ - assert sys1.n_outputs == sys2.n_inputs, "in- and outputs don't match" self._sys1 = sys1 self._sys2 = sys2 - self.n_states = sys1.n_states + sys2.n_states + self.initial_state = self._pack_states(sys1.initial_state, sys2.initial_state) self.n_inputs = sys1.n_inputs def vector_field(self, x, u=None, t=None): - x1 = x[: self._sys1.n_states] - x2 = x[self._sys1.n_states :] + x1, x2 = self._unpack_states(x) y1 = self._sys1.output(x1, u, t) dx1 = self._sys1.vector_field(x1, u, t) dx2 = self._sys2.vector_field(x2, y1, t) - return jnp.concatenate((jnp.atleast_1d(dx1), jnp.atleast_1d(dx2))) + return self._pack_states(dx1, dx2) def output(self, x, u=None, t=None): - x1 = x[: self._sys1.n_states] - x2 = x[self._sys1.n_states :] + x1, x2 = self._unpack_states(x) y1 = self._sys1.output(x1, u, t) y2 = self._sys2.output(x2, y1, t) return y2 -class FeedbackSystem(DynamicalSystem): - """Two systems connected via feedback.""" +class FeedbackSystem(DynamicalSystem, _CoupledSystemMixin): + r"""Two systems connected via feedback. + + .. math:: - _sys: DynamicalSystem - _fbsys: DynamicalSystem + ẋ_1 &= f_1(x_1, u + y_2, t) \\ + y_1 &= h_1(x_1, t) \\ + ẋ_2 &= f_2(x_2, y_1, t) \\ + y_2 &= h_2(x_2, y_1, t) \\ + + .. aafig:: + + +------+ + u --+->+ sys1 +--+-> y1 + ^ +------+ | + | | + y2| +------+ | + +--+ sys2 |<-+ + +------+ - def __init__(self, sys: DynamicalSystem, fbsys: DynamicalSystem): + """ + + def __init__(self, sys1: DynamicalSystem, sys2: DynamicalSystem): """ Args: - sys: system in forward path - fbsys: system in feedback path + sys1: system in forward path with n inputs + sys2: system in feedback path with n outputs """ - self._sys = sys - self._fbsys = fbsys - self.n_states = sys.n_states + fbsys.n_states - self.n_inputs = sys.n_inputs + self._sys1 = sys1 + self._sys2 = sys2 + self.initial_state = self._pack_states(sys1.initial_state, sys2.initial_state) + self.n_inputs = sys1.n_inputs def vector_field(self, x, u=None, t=None): if u is None: - u = np.zeros(self._sys.n_inputs) - x1 = x[: self._sys.n_states] - x2 = x[self._sys.n_states :] - y1 = self._sys.output(x1, None, t) - y2 = self._fbsys.output(x2, y1, t) - dx1 = self._sys.vector_field(x1, u + y2, t) - dx2 = self._fbsys.vector_field(x2, y1, t) - dx = jnp.concatenate((jnp.atleast_1d(dx1), jnp.atleast_1d(dx2))) + u = np.zeros(dim2shape(self._sys1.n_inputs)) + x1, x2 = self._unpack_states(x) + y1 = self._sys1.output(x1, None, t) + y2 = self._sys2.output(x2, y1, t) + dx1 = self._sys1.vector_field(x1, u + y2, t) + dx2 = self._sys2.vector_field(x2, y1, t) + dx = self._pack_states(dx1, dx2) return dx def output(self, x, u=None, t=None): - x1 = x[: self._sys.n_states] - y = self._sys.output(x1, None, t) + x1, _ = self._unpack_states(x) + y = self._sys1.output(x1, None, t) return y @@ -361,15 +423,26 @@ class StaticStateFeedbackSystem(DynamicalSystem): .. math:: - ẋ &= f(x, v(x, u), t) \\ + ẋ &= f(x, v(x), t) \\ y &= h(x, u, t) + .. aafig:: + + +-----+ + u --+------------->+ sys +----> y + ^ +--+--+ + | | + | | x + | +--------+ | + +--+ "v(x)" +<----+ + +--------+ + """ _sys: DynamicalSystem - _feedbacklaw: Callable + _v: Callable[[Array], Array] - def __init__(self, sys: DynamicalSystem, v: Callable[[Array, Array], Array]): + def __init__(self, sys: DynamicalSystem, v: Callable[[Array], Array]): """ Args: sys: system with vector field `f` and output `h` @@ -377,14 +450,12 @@ def __init__(self, sys: DynamicalSystem, v: Callable[[Array, Array], Array]): """ self._sys = sys - self._feedbacklaw = staticmethod(v) - self.n_states = sys.n_states + self._v = staticmethod(v) + self.initial_state = sys.initial_state self.n_inputs = sys.n_inputs def vector_field(self, x, u=None, t=None): - if u is None: - u = np.zeros(self._sys.n_inputs) - v = self._feedbacklaw(x, u) + v = self._v(x) dx = self._sys.vector_field(x, v, t) return dx @@ -393,50 +464,60 @@ def output(self, x, u=None, t=None): return y -class DynamicStateFeedbackSystem(DynamicalSystem): +class DynamicStateFeedbackSystem(DynamicalSystem, _CoupledSystemMixin): r"""System with dynamic state-feedback. .. math:: + + ẋ_1 &= f_1(x_1, v(x_1, x_2, u), t) \\ + ẋ_2 &= f_2(x_2, u, t) \\ + y &= h_1(x_1, u, t) - ẋ &= f_1(x, v(x, z, u), t) \\ - ż &= f_2(z, r, t) \\ - y &= h_1(x, u, t) + .. aafig:: + + +--------------+ +-----+ + u -+->+ v(x1, x2, u) +--v->+ sys +-> y + | +-+-------+----+ +--+--+ + | ^ ^ | + | | x2 | x1 | + | | +-------------+ + | +------+ + +->+ sys2 | + +------+ """ - _sys: DynamicalSystem - _sys2: DynamicalSystem - _feedbacklaw: Callable[[Array, Array, float], float] + _v: Callable[[Array, Array, float], float] def __init__( self, - sys: DynamicalSystem, + sys1: DynamicalSystem, sys2: DynamicalSystem, - v: Callable[[Array, Array, float], float], + v: Callable[[Array, Array, Array | float], float], ): r""" Args: - sys: system with vector field :math:`f_1` and output :math:`h` + sys1: system with vector field :math:`f_1` and output :math:`h` sys2: system with vector field :math:`f_2` v: dynamic feedback law :math:`v` """ - self._sys = sys + self._sys1 = sys1 self._sys2 = sys2 - self._feedbacklaw = v - self.n_states = sys.n_states + sys2.n_states - self.n_inputs = sys.n_inputs + self._v = staticmethod(v) + self.initial_state = self._pack_states(sys1.initial_state, sys2.initial_state) + self.n_inputs = sys1.n_inputs - def vector_field(self, xz, u=None, t=None): + def vector_field(self, x, u=None, t=None): if u is None: - u = np.zeros(self._sys.n_inputs) - x, z = xz[: self._sys.n_states], xz[self._sys.n_states :] - v = self._feedbacklaw(x, z, u) - dx = self._sys.vector_field(x, v, t) - dz = self._sys2.vector_field(z, u, t) + u = np.zeros(dim2shape(self._sys1.n_inputs)) + x1, x2 = self._unpack_states(x) + v = self._v(x1, x2, u) + dx = self._sys1.vector_field(x1, v, t) + dz = self._sys2.vector_field(x2, u, t) return jnp.concatenate((dx, dz)) - def output(self, xz, u=None, t=None): - x = xz[: self._sys.n_states] - y = self._sys.output(x, u, t) + def output(self, x, u=None, t=None): + x1, _ = self._unpack_states(x) + y = self._sys1.output(x1, u, t) return y diff --git a/tests/test_estimation.py b/tests/test_estimation.py index 979312b..5197bc2 100644 --- a/tests/test_estimation.py +++ b/tests/test_estimation.py @@ -31,14 +31,14 @@ def test_fit_least_squares(outputs): + np.sin(0.1 * 2 * np.pi * t) + np.sin(10 * 2 * np.pi * t) ) - x0 = [1.0, 0.0] + x0 = jnp.array([1.0, 0.0]) true_model = Flow(NonlinearDrag(1.0, 2.0, 3.0, 4.0, outputs)) - _, y_true = true_model(x0, t, u) + _, y_true = true_model(t, u, x0) # fit init_model = Flow(NonlinearDrag(1.0, 1.0, 1.0, 1.0, outputs)) pred_model = fit_least_squares(init_model, t, y_true, x0, u).result # check result - _, y_pred = pred_model(x0, t, u) + _, y_pred = pred_model(t, u, x0) npt.assert_allclose(y_pred, y_true, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -62,12 +62,12 @@ def test_fit_least_squares_on_batch(): x0s = np.repeat(x0[None], us.shape[0], axis=0) ts = np.repeat(t[None], us.shape[0], axis=0) true_model = Flow(NonlinearDrag(1.0, 2.0, 3.0, 4.0)) - _, ys = jax.vmap(true_model)(x0s, ts, us) + _, ys = jax.vmap(true_model)(ts, us, x0s) # fit init_model = Flow(NonlinearDrag(1.0, 1.0, 1.0, 1.0)) pred_model = fit_least_squares(init_model, ts, ys, x0s, us, batched=True).result # check result - _, ys_pred = jax.vmap(pred_model)(x0s, ts, us) + _, ys_pred = jax.vmap(pred_model)(ts, us, x0s) npt.assert_allclose(ys_pred, ys, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -84,7 +84,7 @@ def test_can_compute_jacfwd_with_implicit_methods(): def fun(m, r, k, x0=x0, solver_opt=solver_opt, t=t): model = Flow(SpringMassDamper(m, r, k), **solver_opt) - x_true, _ = model(x0, t, u=np.zeros_like(t)) + x_true, _ = model(t, u=jnp.zeros_like(t), initial_state=x0) return x_true jac = jax.jacfwd(fun, argnums=(0, 1, 2)) @@ -93,20 +93,20 @@ def fun(m, r, k, x0=x0, solver_opt=solver_opt, t=t): def test_fit_with_bounded_parameters(): # data - t = np.linspace(0, 1, 100) - x0 = [0.5, 0.5] + t = jnp.linspace(0, 1, 100) + x0 = jnp.array([0.5, 0.5]) solver_opt = dict(step=PIDController(rtol=1e-5, atol=1e-7)) true_model = Flow( LotkaVolterra(alpha=2 / 3, beta=4 / 3, gamma=1.0, delta=1.0), **solver_opt ) - x_true, _ = true_model(x0, t) + x_true, _ = true_model(t, initial_state=x0) # fit init_model = Flow( LotkaVolterra(alpha=1.0, beta=1.0, gamma=1.5, delta=2.0), **solver_opt ) pred_model = fit_least_squares(init_model, t, x_true, x0).result # check result - x_pred, _ = pred_model(x0, t) + x_pred, _ = pred_model(t, initial_state=x0) npt.assert_allclose(x_pred, x_true, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -117,11 +117,12 @@ def test_fit_with_bounded_parameters(): def test_fit_with_bounded_parameters_and_ndarrays(): # model - class LotkaVolterra(DynamicalSystem): + class LotkaVolterraBounded(DynamicalSystem): alpha: float beta: float delta_gamma: Array = non_negative_field() - n_states = 2 + + initial_state = jnp.array((0.5, 0.5)) n_inputs = 0 def vector_field(self, x, u=None, t=None): @@ -132,22 +133,23 @@ def vector_field(self, x, u=None, t=None): ) # data - t = np.linspace(0, 1, 100) - x0 = [0.5, 0.5] + t = jnp.linspace(0, 1, 100) solver_opt = dict(step=PIDController(rtol=1e-5, atol=1e-7)) true_model = Flow( - LotkaVolterra(alpha=2 / 3, beta=4 / 3, delta_gamma=jnp.array([1.0, 1.0])), + LotkaVolterraBounded( + alpha=2 / 3, beta=4 / 3, delta_gamma=jnp.array([1.0, 1.0]) + ), **solver_opt, ) - x_true, _ = true_model(x0, t) + x_true, _ = true_model(t) # fit init_model = Flow( - LotkaVolterra(alpha=1.0, beta=1.0, delta_gamma=jnp.array([1.5, 2])), + LotkaVolterraBounded(alpha=1.0, beta=1.0, delta_gamma=jnp.array([1.5, 2])), **solver_opt, ) - pred_model = fit_least_squares(init_model, t, x_true, x0).result + pred_model = fit_least_squares(init_model, t, x_true).result # check result - x_pred, _ = pred_model(x0, t) + x_pred, _ = pred_model(t) npt.assert_allclose(x_pred, x_true, **tols) npt.assert_allclose( ravel_pytree(pred_model)[0], ravel_pytree(true_model)[0], **tols @@ -157,11 +159,11 @@ def vector_field(self, x, u=None, t=None): @pytest.mark.parametrize("num_shots", [1, 2, 3]) def test_fit_multiple_shooting_with_input(num_shots): # data - t = np.linspace(0, 10, 10000) - u = np.sin(1 * 2 * np.pi * t) - x0 = [1.0, 0.0] + t = jnp.linspace(0, 10, 10000) + u = jnp.sin(1 * 2 * np.pi * t) + x0 = jnp.array([1.0, 0.0]) true_model = Flow(SpringMassDamper(1.0, 2.0, 3.0)) - x_true, _ = true_model(x0, t, u) + x_true, _ = true_model(t, u, initial_state=x0) # fit init_model = Flow(SpringMassDamper(1.0, 1.0, 1.0)) pred_model = fit_multiple_shooting( @@ -175,7 +177,7 @@ def test_fit_multiple_shooting_with_input(num_shots): verbose=2, ).result # check result - x_pred, _ = pred_model(x0, t, u) + x_pred, _ = pred_model(t, u, initial_state=x0) npt.assert_allclose(x_pred, x_true, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -187,13 +189,13 @@ def test_fit_multiple_shooting_with_input(num_shots): @pytest.mark.parametrize("num_shots", [1, 2, 3]) def test_fit_multiple_shooting_without_input(num_shots): # data - t = np.linspace(0, 1, 1000) - x0 = [0.5, 0.5] + t = jnp.linspace(0, 1, 1000) + x0 = jnp.array([0.5, 0.5]) solver_opt = dict(step=PIDController(rtol=1e-3, atol=1e-6)) true_model = Flow( LotkaVolterra(alpha=2 / 3, beta=4 / 3, gamma=1.0, delta=1.0), **solver_opt ) - x_true, _ = true_model(x0, t) + x_true, _ = true_model(t, initial_state=x0) # fit init_model = Flow( LotkaVolterra(alpha=1.0, beta=1.0, gamma=1.5, delta=2.0), **solver_opt @@ -202,7 +204,7 @@ def test_fit_multiple_shooting_without_input(num_shots): init_model, t, x_true, x0, num_shots=num_shots, continuity_penalty=1 ).result # check result - x_pred, _ = pred_model(x0, t) + x_pred, _ = pred_model(t, initial_state=x0) npt.assert_allclose(x_pred, x_true, atol=1e-3, rtol=1e-3) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -215,7 +217,7 @@ def test_fit_multiple_shooting_without_input(num_shots): def test_transfer_function(): sys = SpringMassDamper(1.0, 1.0, 1.0) sr = 100 - f = np.linspace(0, sr / 2, 100) + f = jnp.linspace(0, sr / 2, 100) s = 2 * np.pi * f * 1j H = jax.vmap(transfer_function(sys))(s)[:, 0] H_true = 1 / (sys.m * s**2 + sys.r * s + sys.k) @@ -227,14 +229,14 @@ def test_csd_matching(): # model sys = SpringMassDamper(1.0, 1.0, 1.0) model = Flow(sys, step=PIDController(rtol=1e-4, atol=1e-6)) - x0 = np.zeros(sys.n_states) + x0 = np.zeros(jnp.shape(sys.initial_state)) # input duration = 1000 sr = 50 t = np.arange(int(duration * sr)) / sr u = np.random.normal(size=len(t)) # output - _, y = model(x0, t, u) + _, y = model(t, u, initial_state=x0) # fit init_sys = SpringMassDamper(1.0, 1.0, 1.0) fitted_sys = fit_csd_matching(init_sys, u, y, sr, nperseg=1024, verbose=1).result diff --git a/tests/test_evolution.py b/tests/test_evolution.py new file mode 100644 index 0000000..98ea8fd --- /dev/null +++ b/tests/test_evolution.py @@ -0,0 +1,114 @@ +import diffrax as dfx +import jax.numpy as jnp +import numpy as np +import numpy.testing as npt +from scipy.signal import dlsim, dlti + +from dynax import DynamicalSystem, Flow, LinearSystem, Map + + +tols = dict(rtol=1e-04, atol=1e-06) + + +class SecondOrder(DynamicalSystem): + """Second-order, linear system with constant coefficients.""" + + b: float + c: float + + n_inputs = 0 + initial_state = jnp.array([0.0, 0.0]) + + def vector_field(self, x, u=None, t=None): + """ddx + b dx + c x = u as first order with x1=x and x2=dx.""" + x1, x2 = x + dx1 = x2 + dx2 = -self.b * x2 - self.c * x1 + return jnp.array([dx1, dx2]) + + def output(self, x, u=None, t=None): + x1, _ = x + return x1 + + +def test_forward_model_crit_damp(): + b = 2 + c = 1 # critical damping as b**2 == 4*c + sys = SecondOrder(b, c) + + def x(t, x0, dx0): + """Solution to critically damped linear second-order system.""" + C2 = x0 + C1 = b / 2 * C2 + return np.exp(-b * t / 2) * (C1 * t + C2) + + x0 = jnp.array([1, 0]) # x(t=0)=1, dx(t=0)=0 + t = jnp.linspace(0, 1) + model = Flow(sys, step=dfx.PIDController(rtol=1e-7, atol=1e-9)) + x_pred = model(t, initial_state=x0)[1] + x_true = x(t, *x0) + assert np.allclose(x_true, x_pred) + + +def test_forward_model_lin_sys(): + b = 2 + c = 1 # critical damping as b**2 == 4*c + uconst = 1 + + A = jnp.array([[0, 1], [-c, -b]]) + B = jnp.array([[0], [1]]) + C = jnp.array([[1, 0]]) + D = jnp.zeros((1, 1)) + sys = LinearSystem(A, B, C, D) + + def x(t, x0, dx0, uconst): + """Solution to critically damped linear second-order system.""" + C2 = x0 - uconst / c + C1 = b / 2 * C2 + return np.exp(-b * t / 2) * (C1 * t + C2) + uconst / c + + x0 = jnp.array([1, 0]) # x(t=0)=1, dx(t=0)=0 + t = jnp.linspace(0, 1) + u = jnp.ones(t.shape + (1,)) * uconst + model = Flow(sys, step=dfx.PIDController(rtol=1e-7, atol=1e-9)) + x_pred = model(t, u, initial_state=x0)[1] + x_true = x(t, x0[0], x0[1], uconst) + assert np.allclose(x_true, x_pred) + + +def test_discrete_forward_model(): + b = 2 + c = 1 # critical damping as b**2 == 4*c + t = jnp.arange(50) + u = jnp.sin(1 / len(t) * 2 * np.pi * t)[:, None] # single input + x0 = jnp.array([1.0, 0.0]) + A = jnp.array([[0, 1], [-c, -b]]) + B = jnp.array([[0], [1]]) + C = jnp.array([[1, 0]]) + D = jnp.zeros((1, 1)) + # test just input + sys = LinearSystem(A, B, C, D) + model = Map(sys) + x, y = model(u=u, initial_state=x0) # ours + scipy_sys = dlti(A, B, C, D) + _, scipy_y, scipy_x = dlsim(scipy_sys, u, x0=x0) + npt.assert_allclose(scipy_y, y, **tols) + npt.assert_allclose(scipy_x, x, **tols) + # test input and time (results should be same) + x, y = model(u=u, t=t, initial_state=x0) + scipy_t, scipy_y, scipy_x = dlsim(scipy_sys, u, x0=x0, t=t) + npt.assert_allclose(scipy_y, y, **tols) + npt.assert_allclose(scipy_x, x, **tols) + + +def test_initial_state(): + class Sys(DynamicalSystem): + n_inputs = "scalar" + initial_state = jnp.array(1.0) + + def vector_field(self, x, u, t=None): + return x * 0.1 + u + + t = jnp.arange(5) + u = jnp.zeros(5) + x, y = Flow(Sys())(t, u) diff --git a/tests/test_linearize.py b/tests/test_linearize.py index feb3dec..df146be 100644 --- a/tests/test_linearize.py +++ b/tests/test_linearize.py @@ -28,7 +28,8 @@ class SpringMassDamperWithOutput(ControlAffine): r: float = 0.1 k: float = 0.1 out: int = 0 - n_states = 2 + + initial_state = jnp.zeros(2) n_inputs = "scalar" def f(self, x): @@ -92,7 +93,7 @@ def test_linearize_lin2lin(): def test_linearize_dyn2lin(): class ScalarScalar(DynamicalSystem): - n_states = "scalar" + initial_state = jnp.array(0.0) n_inputs = "scalar" def vector_field(self, x, u, t): @@ -123,15 +124,15 @@ def test_input_output_linearize_single_output(): """Feedback linearized system equals system linearized around x0.""" sys = NonlinearDrag(0.1, 0.1, 0.1, 0.1) ref = sys.linearize() - xs = np.random.normal(size=(100, sys.n_states)) + xs = np.random.normal(size=(100,) + sys.initial_state.shape) reldeg = relative_degree(sys, xs) feedbacklaw = input_output_linearize(sys, reldeg, ref) feedback_sys = DynamicStateFeedbackSystem(sys, ref, feedbacklaw) - t = np.linspace(0, 0.1) - u = np.sin(t) + t = jnp.linspace(0, 0.1) + u = jnp.sin(t) npt.assert_allclose( - Flow(ref)(np.zeros(sys.n_states), t, u)[1], - Flow(feedback_sys)(np.zeros(feedback_sys.n_states), t, u)[1], + Flow(ref)(t, u)[1], + Flow(feedback_sys)(t, u)[1], **tols, ) @@ -142,19 +143,19 @@ def test_input_output_linearize_multiple_outputs(): ref = sys.linearize() for out_idx in range(2): out_idx = 1 - xs = np.random.normal(size=(100, sys.n_states)) + xs = np.random.normal(size=(100,) + sys.initial_state.shape) reldeg = relative_degree(sys, xs, output=out_idx) feedbacklaw = input_output_linearize(sys, reldeg, ref, output=out_idx) feedback_sys = DynamicStateFeedbackSystem(sys, ref, feedbacklaw) - t = np.linspace(0, 1) - u = np.sin(t) * 0.1 - y_ref = Flow(ref)(np.zeros(sys.n_states), t, u)[1] - y = Flow(feedback_sys)(np.zeros(feedback_sys.n_states), t, u)[1] + t = jnp.linspace(0, 1) + u = jnp.sin(t) * 0.1 + y_ref = Flow(ref)(t, u)[1] + y = Flow(feedback_sys)(t, u)[1] npt.assert_allclose(y_ref[:, out_idx], y[:, out_idx], **tols) class Lee7_4_5(DynamicalSystem): - n_states = 2 + initial_state = jnp.zeros(2) n_inputs = "scalar" def vector_field(self, x, u, t=None): @@ -174,10 +175,10 @@ def test_discrete_input_output_linearize(): assert reldeg == 2 feedback_sys = DiscreteLinearizingSystem(sys, refsys, reldeg) - t = np.linspace(0, 0.001, 10) - u = np.cos(t) * 0.1 - _, v = Map(feedback_sys)(np.zeros(2 + 2 + 1), t, u) - _, y = Map(sys)(np.zeros(2), t, u) - _, y_ref = Map(refsys)(np.zeros(2), t, u) + t = jnp.linspace(0, 0.001, 10) + u = jnp.cos(t) * 0.1 + _, v = Map(feedback_sys)(t, u) + _, y = Map(sys)(t, u) + _, y_ref = Map(refsys)(t, u) npt.assert_allclose(y_ref, y, **tols) diff --git a/tests/test_systems.py b/tests/test_systems.py index 3b13cf6..5c410bf 100644 --- a/tests/test_systems.py +++ b/tests/test_systems.py @@ -1,13 +1,7 @@ -import diffrax as dfx -import jax.numpy as jnp import numpy as np import numpy.testing as npt -from scipy.signal import dlsim, dlti -from dynax import DynamicalSystem, FeedbackSystem, Flow, LinearSystem, Map, SeriesSystem - - -tols = dict(rtol=1e-04, atol=1e-06) +from dynax import FeedbackSystem, LinearSystem, SeriesSystem def test_series(): @@ -25,12 +19,12 @@ def test_series(): sys2 = LinearSystem(A2, B2, C2, D2) sys = SeriesSystem(sys1, sys2) linsys = sys.linearize() - assert np.array_equal( + npt.assert_array_equal( linsys.A, np.block([[A1, np.zeros((n1, n2))], [B2.dot(C1), A2]]) ) - assert np.array_equal(linsys.B, np.block([[B1], [B2.dot(D1)]])) - assert np.array_equal(linsys.C, np.block([[D2.dot(C1), C2]])) - assert np.array_equal(linsys.D, D2.dot(D1)) + npt.assert_array_equal(linsys.B, np.block([[B1], [B2.dot(D1)]])) + npt.assert_array_equal(linsys.C, np.block([[D2.dot(C1), C2]])) + npt.assert_array_equal(linsys.D, D2.dot(D1)) def test_feedback(): @@ -48,111 +42,9 @@ def test_feedback(): sys2 = LinearSystem(A2, B2, C2, D2) sys = FeedbackSystem(sys1, sys2) linsys = sys.linearize() - assert np.array_equal( + npt.assert_array_equal( linsys.A, np.block([[A1 + B1 @ D2 @ C1, B1 @ C2], [B2 @ C1, A2]]) ) - assert np.array_equal(linsys.B, np.block([[B1], [np.zeros((n2, m1))]])) - assert np.array_equal(linsys.C, np.block([[C1, np.zeros((p1, n2))]])) - assert np.array_equal(linsys.D, np.zeros((p1, m1))) - - -class SecondOrder(DynamicalSystem): - """Second-order, linear system with constant coefficients.""" - - b: float - c: float - n_states = 2 - n_inputs = 0 - - def vector_field(self, x, u=None, t=None): - """ddx + b dx + c x = u as first order with x1=x and x2=dx.""" - x1, x2 = x - dx1 = x2 - dx2 = -self.b * x2 - self.c * x1 - return jnp.array([dx1, dx2]) - - def output(self, x, u=None, t=None): - x1, _ = x - return x1 - - -def test_forward_model_crit_damp(): - b = 2 - c = 1 # critical damping as b**2 == 4*c - sys = SecondOrder(b, c) - - def x(t, x0, dx0): - """Solution to critically damped linear second-order system.""" - C2 = x0 - C1 = b / 2 * C2 - return np.exp(-b * t / 2) * (C1 * t + C2) - - x0 = jnp.array([1, 0]) # x(t=0)=1, dx(t=0)=0 - t = np.linspace(0, 1) - model = Flow(sys, step=dfx.PIDController(rtol=1e-7, atol=1e-9)) - x_pred = model(x0, t)[1] - x_true = x(t, *x0) - assert np.allclose(x_true, x_pred) - - -def test_forward_model_lin_sys(): - b = 2 - c = 1 # critical damping as b**2 == 4*c - uconst = 1 - - A = jnp.array([[0, 1], [-c, -b]]) - B = jnp.array([[0], [1]]) - C = jnp.array([[1, 0]]) - D = jnp.zeros((1, 1)) - sys = LinearSystem(A, B, C, D) - - def x(t, x0, dx0, uconst): - """Solution to critically damped linear second-order system.""" - C2 = x0 - uconst / c - C1 = b / 2 * C2 - return np.exp(-b * t / 2) * (C1 * t + C2) + uconst / c - - x0 = jnp.array([1, 0]) # x(t=0)=1, dx(t=0)=0 - t = np.linspace(0, 1) - u = np.ones(t.shape + (1,)) * uconst - model = Flow(sys, step=dfx.PIDController(rtol=1e-7, atol=1e-9)) - x_pred = model(x0, t, u)[1] - x_true = x(t, x0[0], x0[1], uconst) - assert np.allclose(x_true, x_pred) - - -def test_discrete_forward_model(): - b = 2 - c = 1 # critical damping as b**2 == 4*c - t = jnp.arange(50) - u = jnp.sin(1 / len(t) * 2 * np.pi * t)[:, None] # single input - x0 = jnp.array([1.0, 0.0]) - A = jnp.array([[0, 1], [-c, -b]]) - B = jnp.array([[0], [1]]) - C = jnp.array([[1, 0]]) - D = jnp.zeros((1, 1)) - # test just input - sys = LinearSystem(A, B, C, D) - model = Map(sys) - x, y = model(x0, u=u) # ours - scipy_sys = dlti(A, B, C, D) - _, scipy_y, scipy_x = dlsim(scipy_sys, u, x0=x0) - npt.assert_allclose(scipy_y, y, **tols) - npt.assert_allclose(scipy_x, x, **tols) - # test input and time (results should be same) - x, y = model(x0, u=u, t=t) - scipy_t, scipy_y, scipy_x = dlsim(scipy_sys, u, x0=x0, t=t) - npt.assert_allclose(scipy_y, y, **tols) - npt.assert_allclose(scipy_x, x, **tols) - - -def test_initial_state(): - class Sys(DynamicalSystem): - n_states = "scalar" - n_inputs = "scalar" - - def vector_field(self, x, u, t=None): - return x * 0.1 + u - - Sys(initial_state=1) - + npt.assert_array_equal(linsys.B, np.block([[B1], [np.zeros((n2, m1))]])) + npt.assert_array_equal(linsys.C, np.block([[C1, np.zeros((p1, n2))]])) + npt.assert_array_equal(linsys.D, np.zeros((p1, m1))) From ae3f4c1f7904221852de17e66d7ffa03e0c1d7f5 Mon Sep 17 00:00:00 2001 From: fhchl Date: Thu, 1 Feb 2024 16:44:41 +0100 Subject: [PATCH 03/10] test: run also notebooks --- examples/fit_ode.ipynb | 116 +++++++++++++++++++++++------------------ tests/test_examples.py | 20 +++++-- 2 files changed, 82 insertions(+), 54 deletions(-) diff --git a/examples/fit_ode.ipynb b/examples/fit_ode.ipynb index 8a34322..55d9ffe 100644 --- a/examples/fit_ode.ipynb +++ b/examples/fit_ode.ipynb @@ -73,20 +73,28 @@ "cell_type": "code", "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "An NVIDIA GPU may be present on this machine, but a CUDA-enabled jaxlib is not installed. Falling back to cpu.\n" + ] + } + ], "source": [ "class NonlinearDrag(DynamicalSystem):\n", - " # Set the number of states (order of system) and inputs.\n", - " n_states = 2\n", - " n_inputs = 1\n", - "\n", " # Declare parameters as dataclass fields.\n", " m: float\n", " r: float = non_negative_field()\n", " r2: float = boxed_field(lower=0.01, upper=1)\n", " k: float = boxed_field(lower=1e-3, upper=2)\n", "\n", - " # Define the vector field of the dynamical system\n", + " # Set the initial state of the system and the number of inputs.\n", + " initial_state = jnp.zeros(2)\n", + " n_inputs = \"scalar\"\n", + "\n", + " # Define the vector field of the dynamical system.\n", " def vector_field(self, x, u, t):\n", " x1, x2 = x\n", " return jnp.array(\n", @@ -109,7 +117,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 3, "metadata": {}, "outputs": [ { @@ -118,11 +126,11 @@ "text": [ "true forward model: Flow(\n", " system=NonlinearDrag(\n", - " n_states(static)=2,\n", - " n_inputs(static)=1,\n", + " initial_state(static)=f64[2],\n", + " n_inputs(static)='scalar',\n", " m=1.0,\n", " r(boxed: (0.0, inf))=2.0,\n", - " r2(boxed: (0.01, 1))=0.15,\n", + " r2(boxed: (0.01, 1))=3.0,\n", " k(boxed: (0.001, 2))=1.0\n", " ),\n", " solver(static)=Dopri8(scan_kind=None),\n", @@ -150,7 +158,7 @@ ], "source": [ "true_model = Flow(\n", - " system=NonlinearDrag(m=1.0, r=2.0, r2=0.15, k=1.0),\n", + " system=NonlinearDrag(m=1.0, r=2.0, r2=3., k=1.0),\n", " solver=diffrax.Dopri8(),\n", " step=diffrax.PIDController(rtol=1e-3, atol=1e-6),\n", ")\n", @@ -166,12 +174,12 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 4, "metadata": {}, "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -182,15 +190,13 @@ ], "source": [ "# time\n", - "t = np.linspace(0, 10, 1000)\n", + "t = jnp.linspace(0, 2, 20)\n", "# forcing signal\n", - "u = np.sin(t * 2 * np.pi)\n", - "# zero initial state\n", - "x0 = [0.0, 0.0]\n", + "u = 100*jnp.sin(t * 2 * np.pi)\n", "# x are the states and y is the output\n", - "x, y = true_model(x0, t, u)\n", + "x, y = true_model(t, u)\n", "# add noise to measurement\n", - "yn = y + np.random.normal(size=y.shape, scale=10)\n", + "yn = y + np.random.normal(size=y.shape, scale=100)\n", "\n", "plt.plot(t, yn, label=\"y+n\")\n", "plt.plot(t, y, label=\"y\")\n", @@ -206,7 +212,7 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 5, "metadata": {}, "outputs": [ { @@ -215,12 +221,12 @@ "text": [ "initial system: Flow(\n", " system=NonlinearDrag(\n", - " n_states(static)=2,\n", - " n_inputs(static)=1,\n", - " m=2.0,\n", + " initial_state(static)=f64[2],\n", + " n_inputs(static)='scalar',\n", + " m=1.0,\n", " r(boxed: (0.0, inf))=1.0,\n", " r2(boxed: (0.01, 1))=1.0,\n", - " k(boxed: (0.001, 2))=2.0\n", + " k(boxed: (0.001, 2))=1.0\n", " ),\n", " solver(static)=Tsit5(scan_kind=None),\n", " step(static)=PIDController(\n", @@ -247,7 +253,7 @@ ], "source": [ "init_model = Flow(\n", - " system=NonlinearDrag(m=2.0, r=1.0, r2=1.0, k=2.0),\n", + " system=NonlinearDrag(m=1.0, r=1.0, r2=1.0, k=1.0),\n", " solver=diffrax.Tsit5(),\n", " step=diffrax.PIDController(rtol=1e-3, atol=1e-6),\n", ")\n", @@ -263,7 +269,7 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 6, "metadata": {}, "outputs": [ { @@ -271,28 +277,36 @@ "output_type": "stream", "text": [ " Iteration Total nfev Cost Cost reduction Step norm Optimality \n", - " 0 1 5.1826e-01 4.19e-01 \n", - " 1 2 1.3715e-01 3.81e-01 1.01e+00 1.11e-01 \n", - " 2 3 1.2552e-01 1.16e-02 1.15e-01 8.01e-03 \n", - " 3 10 1.2550e-01 2.50e-05 2.57e-05 1.15e-02 \n", - " 4 13 1.2548e-01 1.87e-05 1.32e-06 8.07e-03 \n", - " 5 18 1.2548e-01 0.00e+00 0.00e+00 8.07e-03 \n", + " 0 1 1.6927e+00 5.15e-01 \n", + " 1 2 1.2288e-01 1.57e+00 6.59e+00 7.46e-02 \n", + " 2 3 2.1478e-02 1.01e-01 2.72e+00 1.70e-02 \n", + " 3 4 1.8862e-02 2.62e-03 3.15e-01 2.02e-03 \n", + " 4 5 1.8099e-02 7.63e-04 3.60e-01 8.95e-04 \n", + " 5 6 1.7903e-02 1.96e-04 1.97e-01 1.32e-04 \n", + " 6 9 1.7900e-02 3.55e-06 4.54e-03 1.17e-04 \n", + " 7 12 1.7899e-02 5.33e-07 6.28e-04 1.16e-04 \n", + " 8 13 1.7898e-02 1.27e-06 1.27e-03 1.15e-04 \n", + " 9 14 1.7897e-02 1.37e-06 2.57e-03 1.10e-04 \n", + " 10 16 1.7896e-02 5.21e-07 1.27e-03 1.08e-04 \n", + " 11 17 1.7893e-02 3.46e-06 1.40e-03 1.07e-04 \n", + " 12 19 1.7892e-02 6.18e-07 7.11e-04 1.06e-04 \n", + " 13 27 1.7892e-02 0.00e+00 0.00e+00 1.06e-04 \n", "`xtol` termination condition is satisfied.\n", - "Function evaluations 18, initial cost 5.1826e-01, final cost 1.2548e-01, first-order optimality 8.07e-03.\n", + "Function evaluations 27, initial cost 1.6927e+00, final cost 1.7892e-02, first-order optimality 1.06e-04.\n", "fitted system: NonlinearDrag(\n", - " n_states(static)=2,\n", - " n_inputs(static)=1,\n", - " m=Array(0.96404107, dtype=float64),\n", - " r(boxed: (0.0, inf))=Array(1.86260874, dtype=float64),\n", - " r2(boxed: (0.01, 1))=Array(0.99998108, dtype=float64),\n", - " k(boxed: (0.001, 2))=Array(1.0077622, dtype=float64)\n", + " initial_state(static)=Array([0., 0.], dtype=float64),\n", + " n_inputs(static)='scalar',\n", + " m=Array(1.15256173, dtype=float64),\n", + " r(boxed: (0.0, inf))=Array(10.09924961, dtype=float64),\n", + " r2(boxed: (0.01, 1))=Array(1., dtype=float64),\n", + " k(boxed: (0.001, 2))=Array(1.86684786, dtype=float64)\n", ")\n", - "Normalized mean squared error: [0.30761926]\n" + "Normalized mean squared error: [0.07727285]\n" ] } ], "source": [ - "res = fit_least_squares(model=init_model, t=t, y=yn, x0=x0, u=u, verbose=2)\n", + "res = fit_least_squares(model=init_model, t=t, y=yn, u=u, verbose=2)\n", "pred_model = res.result\n", "print(\"fitted system:\", pretty(pred_model.system))\n", "print(\"Normalized mean squared error:\", res.nrmse)" @@ -307,12 +321,12 @@ }, { "cell_type": "code", - "execution_count": 30, + "execution_count": 7, "metadata": {}, "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -322,7 +336,7 @@ } ], "source": [ - "x_pred, _ = pred_model(x0, t, u)\n", + "x_pred, _ = pred_model(t, u)\n", "\n", "plt.plot(t, yn, label=\"measurement\")\n", "plt.plot(t, res.y_pred, label=\"prediction\")\n", @@ -341,12 +355,12 @@ }, { "cell_type": "code", - "execution_count": 31, + "execution_count": 8, "metadata": {}, "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -382,17 +396,17 @@ }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Flow.system.m: 0.000\n", - "Flow.system.r: 0.012\n", - "Flow.system.r2: 0.069\n", - "Flow.system.k: 0.000\n" + "Flow.system.m: 0.022\n", + "Flow.system.r: 1.810\n", + "Flow.system.r2: 0.378\n", + "Flow.system.k: 0.241\n" ] } ], diff --git a/tests/test_examples.py b/tests/test_examples.py index 5f9b28c..f5ec052 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -1,13 +1,27 @@ import pathlib import runpy +import nbformat import pytest +from nbconvert.preprocessors import ExecutePreprocessor examples = pathlib.Path(__file__, "..", "..", "examples").resolve().glob("*.py") +notebooks = pathlib.Path(__file__, "..", "..", "examples").resolve().glob("*.ipynb") @pytest.mark.slow -@pytest.mark.parametrize("examples", examples) -def test_examples_run_without_error(examples): - runpy.run_path(examples) +@pytest.mark.parametrize("example", examples, ids=lambda x: str(x.name)) +def test_examples_run_without_error(example): + runpy.run_path(example) + + +@pytest.mark.slow +@pytest.mark.parametrize("notebook", notebooks, ids=lambda x: str(x.name)) +def test_notebooks_dont_change(notebook): + with open(notebook) as f: + nb = nbformat.read(f, as_version=4) + try: + ExecutePreprocessor(timeout=60).preprocess(nb) + except Exception as e: + raise Exception(f"Running the notebook {notebook} failed") from e From 3f716d3ce3cc41e62bdc82e8a2facaf3835bab17 Mon Sep 17 00:00:00 2001 From: fhchl Date: Thu, 1 Feb 2024 16:45:29 +0100 Subject: [PATCH 04/10] fix: use ufun --- dynax/evolution.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dynax/evolution.py b/dynax/evolution.py index 20a8f70..09bf952 100644 --- a/dynax/evolution.py +++ b/dynax/evolution.py @@ -69,18 +69,18 @@ def __call__( initial_state = self.system.initial_state # Prepare input function. - if u is None and ufun is None and ucoeffs is None and self.system.n_inputs == 0: - _ufun = lambda t: jnp.empty((0,)) - elif ucoeffs is not None: + if ucoeffs is not None: path = dfx.CubicInterpolation(t, ucoeffs) _ufun = path.evaluate - elif callable(u): + elif callable(ufun): _ufun = u elif u is not None: u = jnp.asarray(u) if len(t) != u.shape[0]: raise ValueError("t and u must have matching first dimension.") _ufun = spline_it(t, u) + elif self.system.n_inputs == 0: + _ufun = lambda t: jnp.empty((0,)) else: raise ValueError("Must specify one of u, ufun, or ucoeffs.") From 72fad3ae9c902c9ac5df48e650cbe2ff04b32b24 Mon Sep 17 00:00:00 2001 From: fhchl Date: Thu, 1 Feb 2024 21:33:55 +0100 Subject: [PATCH 05/10] docs: don't expand jaxtyping types --- docs/source/conf.py | 20 ++++++++++++++++---- dynax/estimation.py | 2 +- dynax/system.py | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index f89159a..36d6396 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -5,6 +5,8 @@ # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information +import typing + project = "Dynax" copyright = "2023, Franz M. Heuchel" @@ -14,6 +16,7 @@ # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration extensions = [ + "sphinx.ext.napoleon", "sphinx.ext.autodoc", "sphinx.ext.autosummary", "sphinx.ext.todo", @@ -21,7 +24,7 @@ "sphinx.ext.viewcode", "sphinxcontrib.bibtex", "sphinxcontrib.aafig", - "sphinx_autodoc_typehints", + # "sphinx_autodoc_typehints", "nbsphinx", ] @@ -41,11 +44,20 @@ } autoclass_content = "both" -# autodoc_typehints = "signature" -# typehints_use_signature = "True" +autodoc_typehints = "signature" +typehints_use_signature = True + + +napoleon_include_init_with_doc = True +napoleon_preprocess_types = True +napoleon_attr_annotations = True # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output -html_theme = "furo" +html_theme = "sphinx_rtd_theme" html_static_path = ["_static"] + +# Short type docs for jaxtyping's types +# https://github.com/patrick-kidger/pytkdocs_tweaks/blob/2a7ce453e315f526d792f689e61d56ecaa4ab000/pytkdocs_tweaks/__init__.py#L283 +typing.GENERATING_DOCUMENTATION = True # pyright: ignore diff --git a/dynax/estimation.py b/dynax/estimation.py index bc7d57d..d6ed789 100644 --- a/dynax/estimation.py +++ b/dynax/estimation.py @@ -13,7 +13,7 @@ import scipy.signal as sig from jax import Array from jax.flatten_util import ravel_pytree -from jax.typing import ArrayLike +from jaxtyping import ArrayLike from numpy.typing import NDArray from scipy.linalg import pinvh from scipy.optimize import least_squares, OptimizeResult diff --git a/dynax/system.py b/dynax/system.py index b127d21..2dbf60b 100644 --- a/dynax/system.py +++ b/dynax/system.py @@ -10,7 +10,7 @@ import jax.numpy as jnp import numpy as np from jax import Array -from jax.typing import ArrayLike +from jaxtyping import ArrayLike from .util import dim2shape From 3563b668274e7043a5ac8fa2107d963b2f5b16c2 Mon Sep 17 00:00:00 2001 From: fhchl Date: Fri, 2 Feb 2024 10:41:43 +0100 Subject: [PATCH 06/10] fix examples --- dynax/linearize.py | 4 ++-- .../fit_multiple_shooting_second_order_sys.py | 10 ++++----- examples/fit_second_order_sys.py | 9 ++++---- examples/linearize_discrete_time.py | 21 +++++++++---------- 4 files changed, 20 insertions(+), 24 deletions(-) diff --git a/dynax/linearize.py b/dynax/linearize.py index 1335c6a..e9958d4 100644 --- a/dynax/linearize.py +++ b/dynax/linearize.py @@ -226,8 +226,8 @@ class DiscreteLinearizingSystem(DynamicalSystem, _CoupledSystemMixin): def __init__( self, - sys: ControlAffine, - refsys: LinearSystem, + sys: DynamicalSystem, + refsys: DynamicalSystem, reldeg: int, **fb_kwargs, ): diff --git a/examples/fit_multiple_shooting_second_order_sys.py b/examples/fit_multiple_shooting_second_order_sys.py index 429745a..ad35b20 100644 --- a/examples/fit_multiple_shooting_second_order_sys.py +++ b/examples/fit_multiple_shooting_second_order_sys.py @@ -35,7 +35,7 @@ class NonlinearDrag(ControlAffine): k: float # Set the number of states (order of system), the number of inputs - n_states = 2 + initial_state = jnp.zeros(2) n_inputs = "scalar" # Define the dynamical system via the methods f, g, and h @@ -68,8 +68,7 @@ def h(self, x): ), axis=0, ) -initial_x = [0.0, 0.0] -x_train, y_train = true_model(initial_x, t_train, u_train) +x_train, y_train = true_model(t_train, u_train) # create our model system with some initial parameters initial_sys = NonlinearDrag(m=1.0, r=1.0, r2=1.0, k=1.0) @@ -83,7 +82,6 @@ def h(self, x): model=init_model, t=t_train, y=y_train, - x0=initial_x, u=u_train, verbose=2, num_shots=num_shots, @@ -96,10 +94,10 @@ def h(self, x): print("fitted system:", tree_pformat(model.system)) # check the results -x_pred, y_pred = model(initial_x, t_train, u_train) +x_pred, y_pred = model(t_train, u_train) # plot -xs_pred, _ = jax.vmap(model)(x0s, ts0, us) +xs_pred, _ = jax.vmap(model)(ts0, us, initial_state=x0s) plt.plot(t_train, x_train, "k--", label="target") for i in range(num_shots): plt.plot(ts[i], xs_pred[i], label="multiple shooting", color=f"C{i}") diff --git a/examples/fit_second_order_sys.py b/examples/fit_second_order_sys.py index f85cf04..69ee4b6 100644 --- a/examples/fit_second_order_sys.py +++ b/examples/fit_second_order_sys.py @@ -29,7 +29,7 @@ class NonlinearDrag(ControlAffine): k: float # Set the number of states (order of system), the number of in- and outputs. - n_states = 2 + initial_state = jnp.zeros(2) n_inputs = "scalar" # Define the dynamical system via the methods f, g, and h @@ -57,8 +57,7 @@ def h(self, x): samplerate = 1 / t_train[1] np.random.seed(42) u_train = np.random.normal(size=len(t_train)) -initial_x = [0.0, 0.0] -x_train, y_train = true_model(initial_x, t_train, u_train) +x_train, y_train = true_model(t_train, u_train) # create our model system with some initial parameters initial_sys = NonlinearDrag(m=1.0, r=1.0, r2=1.0, k=1.0) @@ -76,12 +75,12 @@ def h(self, x): init_model = Flow(initial_sys) # Fit all parameters with previously estimated parameters as a starting guess. pred_model = fit_least_squares( - model=init_model, t=t_train, y=y_train, x0=initial_x, u=u_train, verbose=0 + model=init_model, t=t_train, y=y_train, u=u_train, verbose=0 ).result print("fitted system:", pred_model.system) # check the results -x_pred, y_pred = pred_model(initial_x, t_train, u_train) +x_pred, y_pred = pred_model(t_train, u_train) assert np.allclose(x_train, x_pred) plt.plot(t_train, x_train, "--", label="target") diff --git a/examples/linearize_discrete_time.py b/examples/linearize_discrete_time.py index c8cef09..c641a3e 100644 --- a/examples/linearize_discrete_time.py +++ b/examples/linearize_discrete_time.py @@ -20,9 +20,10 @@ class Recurrent(DynamicalSystem): n_inputs = "scalar" def __init__(self, hidden_size, *, key): - input_size = 1 - self.cell = GRUCell(input_size, hidden_size, use_bias=False, key=key) - self.n_states = hidden_size + self.cell = GRUCell( + input_size=1, hidden_size=hidden_size, use_bias=False, key=key + ) + self.initial_state = jnp.zeros(hidden_size) def vector_field(self, x, u, t=None): return self.cell(jnp.array([u]), x) @@ -51,14 +52,14 @@ def output(self, x, u=None, t=None): # degree of the nonlinear system. Here we test for the relative degree with a set of # points and inputs. reldeg = discrete_relative_degree( - system, np.random.normal(size=(inputs.size, system.n_states)), inputs + system, np.random.normal(size=(len(inputs),) + system.initial_state.shape), inputs ) print("Relative degree of nonlinear system:", reldeg) print( "Relative degree of reference system:", discrete_relative_degree( reference_system, - np.random.normal(size=(inputs.size, reference_system.n_states)), + np.random.normal(size=(len(inputs),) + reference_system.initial_state.shape), inputs, ), ) @@ -69,14 +70,12 @@ def output(self, x, u=None, t=None): # The output of this system when driven with the reference input is the linearizing # input. The coupled system as an extra state used internally. -_, linearizing_inputs = Map(linearizing_system)( - jnp.zeros(system.n_states + reference_system.n_states + 1), u=inputs -) +_, linearizing_inputs = Map(linearizing_system)(u=inputs) # Lets simulate the original system, the linear reference and the linearized system. -states_orig, output_orig = Map(system)(x0=jnp.zeros(hidden_size), u=inputs) -_, output_ref = Map(reference_system)(x0=jnp.zeros(reference_system.n_states), u=inputs) -_, output_linearized = Map(system)(jnp.zeros(hidden_size), u=linearizing_inputs) +states_orig, output_orig = Map(system)(u=inputs) +_, output_ref = Map(reference_system)(u=inputs) +_, output_linearized = Map(system)(u=linearizing_inputs) assert np.allclose(output_ref, output_linearized) From 7749cd7a617e4987a767f2003973dbdb5d0054d4 Mon Sep 17 00:00:00 2001 From: fhchl Date: Fri, 2 Feb 2024 11:56:11 +0100 Subject: [PATCH 07/10] fix tests --- pyproject.toml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d1e4295..80cff55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,13 +13,8 @@ dependencies = ["jax>=0.4.23", "diffrax>=0.5"] [project.optional-dependencies] dev = [ "pytest", + "jupyter", "matplotlib", - "sphinx", - "sphinx-autobuild", - "sphinx_autodoc_typehints", - "furo", - "sphinxcontrib-bibtex", - "nbsphinx", "pre-commit", ] From 1e856a140a1f4447f6d91eecf0e5abafaac9c66d Mon Sep 17 00:00:00 2001 From: fhchl Date: Sun, 4 Feb 2024 20:54:47 +0100 Subject: [PATCH 08/10] remove x0 parameters from fit functions --- .gitignore | 2 +- dynax/estimation.py | 44 +++++++++------------- dynax/evolution.py | 15 +++++--- dynax/example_models.py | 2 +- examples/fit_ode.ipynb | 79 +++++++++++++++++++--------------------- tests/test_estimation.py | 76 +++++++++++++++++++------------------- tests/test_evolution.py | 4 +- 7 files changed, 106 insertions(+), 116 deletions(-) diff --git a/.gitignore b/.gitignore index 28200c7..e4a81e3 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ experiments .coverage htmlcov build -docs/source/_build +_build docs/generated *.pytest_cache .pytype diff --git a/dynax/estimation.py b/dynax/estimation.py index d6ed789..be043c7 100644 --- a/dynax/estimation.py +++ b/dynax/estimation.py @@ -92,7 +92,7 @@ def _compute_covariance( def _least_squares( f: Callable[[Array], Array], - x0: NDArray, + init_params: Array, bounds: tuple[list, list], reg_term: Optional[Callable[[Array], Array]] = None, x_scale: bool = True, @@ -105,7 +105,7 @@ def _least_squares( # Add regularization term _f = f _reg_term = reg_term # https://github.com/python/mypy/issues/7268 - f = lambda x: jnp.concatenate((_f(x), _reg_term(x))) + f = lambda params: jnp.concatenate((_f(params), _reg_term(params))) if verbose_mse: # Scale cost to mean-squared error @@ -117,15 +117,17 @@ def f(params): if x_scale: # Scale parameters and bounds by initial values - norm = np.where(np.asarray(x0) != 0, np.abs(x0), 1) - x0 = x0 / norm + norm = np.where(np.asarray(init_params) != 0, np.abs(init_params), 1) + init_params = init_params / norm ___f = f - f = lambda x: ___f(x * norm) + f = lambda params: ___f(params * norm) bounds = (np.array(bounds[0]) / norm, np.array(bounds[1]) / norm) fun = MemoizeJac(jax.jit(lambda x: value_and_jacfwd(f, x))) jac = fun.derivative - res = least_squares(fun, x0, bounds=bounds, jac=jac, x_scale="jac", **kwargs) + res = least_squares( + fun, init_params, bounds=bounds, jac=jac, x_scale="jac", **kwargs + ) if x_scale: # Unscale parameters @@ -139,8 +141,8 @@ def f(params): if reg_term is not None: # Remove regularization from residuals and Jacobian and cost - res.fun = res.fun[: -len(x0)] - res.jac = res.jac[: -len(x0)] + res.fun = res.fun[: -len(init_params)] + res.jac = res.jac[: -len(init_params)] res.cost = np.sum(res.fun**2) / 2 return res @@ -150,7 +152,6 @@ def fit_least_squares( model: AbstractEvolution, t: ArrayLike, y: ArrayLike, - x0: Optional[ArrayLike] = None, u: Optional[ArrayLike] = None, batched: bool = False, sigma: Optional[ArrayLike] = None, @@ -169,11 +170,6 @@ def fit_least_squares( t = jnp.asarray(t) y = jnp.asarray(y) - if x0 is not None: - x0 = jnp.asarray(x0) - else: - x0 = model.system.initial_state - if batched: # First axis holds experiments, second axis holds time. std_y = np.std(y, axis=1, keepdims=True) @@ -216,7 +212,7 @@ def residual_term(params): # this can use pmap, if batch size is smaller than CPU cores model = jax.vmap(model) # FIXME: ucoeffs not supported for Map - _, pred_y = model(t=t, ucoeffs=ucoeffs, initial_state=x0) + _, pred_y = model(t=t, ucoeffs=ucoeffs) res = (y - pred_y) * weight return res.reshape(-1) @@ -250,7 +246,6 @@ def fit_multiple_shooting( model: AbstractEvolution, t: ArrayLike, y: ArrayLike, - x0: Optional[ArrayLike] = None, u: Optional[Union[Callable[[float], Array], ArrayLike]] = None, num_shots: int = 1, continuity_penalty: float = 0.1, @@ -278,11 +273,6 @@ def fit_multiple_shooting( t = jnp.asarray(t) y = jnp.asarray(y) - if x0 is not None: - x0 = jnp.asarray(x0) - else: - x0 = model.system.initial_state - if u is None: msg = ( f"t, y must have same number of samples, but have shapes " @@ -308,11 +298,13 @@ def fit_multiple_shooting( t = t[:num_samples] y = y[:num_samples] + n_states = len(model.system.initial_state) + # TODO: use numpy for everything that is not jitted # Divide signals into segments. ts = _moving_window(t, num_samples_per_segment, num_samples_per_segment - 1) ys = _moving_window(y, num_samples_per_segment, num_samples_per_segment - 1) - x0s = np.zeros((num_shots - 1, len(x0))) + x0s = np.zeros((num_shots - 1, n_states)) ucoeffs = None if u is not None: @@ -329,8 +321,8 @@ def fit_multiple_shooting( std_y = np.std(y, axis=0) parameter_bounds = _get_bounds(model) state_bounds = ( - (num_shots - 1) * len(x0) * [-np.inf], - (num_shots - 1) * len(x0) * [np.inf], + (num_shots - 1) * n_states * [-np.inf], + (num_shots - 1) * n_states * [np.inf], ) bounds = ( state_bounds[0] + parameter_bounds[0], @@ -339,7 +331,7 @@ def fit_multiple_shooting( def residuals(params): x0s, model = unravel(params) - x0s = jnp.concatenate((x0[None], x0s), axis=0) + x0s = jnp.concatenate((model.system.initial_state[None], x0s), axis=0) xs_pred, ys_pred = jax.vmap(model)(t=ts0, ucoeffs=ucoeffs, initial_state=x0s) # output residual res_y = ((ys - ys_pred) / std_y).reshape(-1) @@ -353,7 +345,7 @@ def residuals(params): res = _least_squares(residuals, init_params, bounds, x_scale=False, **kwargs) x0s, res.result = unravel(res.x) - res.x0s = np.asarray(jnp.concatenate((x0[None], x0s), axis=0)) + res.x0s = jnp.concatenate((res.result.system.initial_state[None], x0s), axis=0) res.ts = np.asarray(ts) res.ts0 = np.asarray(ts0) diff --git a/dynax/evolution.py b/dynax/evolution.py index 09bf952..57c2b0d 100644 --- a/dynax/evolution.py +++ b/dynax/evolution.py @@ -42,10 +42,9 @@ class Flow(AbstractEvolution): """Evolution for continous-time dynamical systems.""" solver: dfx.AbstractAdaptiveSolver = eqx.static_field(default_factory=dfx.Dopri5) - step: dfx.AbstractStepSizeController = eqx.static_field( - default_factory=dfx.ConstantStepSize - ) # TODO: replace with adaptive step size - dt0: Optional[float] = eqx.static_field(default=None) + stepsize_controller: dfx.AbstractStepSizeController = eqx.static_field( + default_factory=lambda: dfx.ConstantStepSize() + ) def __call__( self, @@ -95,11 +94,15 @@ def __call__( # Solve ODE. diffeqsolve_default_options = dict( solver=self.solver, - stepsize_controller=self.step, + stepsize_controller=self.stepsize_controller, saveat=dfx.SaveAt(ts=t), max_steps=50 * len(t), # completely arbitrary number of steps adjoint=dfx.DirectAdjoint(), - dt0=self.dt0 if self.dt0 is not None else t[1], + dt0=( + t[1] + if isinstance(self.stepsize_controller, dfx.ConstantStepSize) + else None + ), ) diffeqsolve_default_options |= diffeqsolve_kwargs vector_field = lambda t, x, self: self.system.vector_field(x, _ufun(t), t) diff --git a/dynax/example_models.py b/dynax/example_models.py index df741d3..eb60a9a 100644 --- a/dynax/example_models.py +++ b/dynax/example_models.py @@ -102,7 +102,7 @@ class LotkaVolterra(DynamicalSystem): gamma: float = non_negative_field() delta: float = non_negative_field() - initial_state = jnp.ones(2) + initial_state = jnp.ones(2) * 0.5 n_inputs = 0 def vector_field(self, x, u=None, t=None): diff --git a/examples/fit_ode.ipynb b/examples/fit_ode.ipynb index 55d9ffe..857d6e6 100644 --- a/examples/fit_ode.ipynb +++ b/examples/fit_ode.ipynb @@ -117,7 +117,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 4, "metadata": {}, "outputs": [ { @@ -134,7 +134,7 @@ " k(boxed: (0.001, 2))=1.0\n", " ),\n", " solver(static)=Dopri8(scan_kind=None),\n", - " step(static)=PIDController(\n", + " stepsize_controller(static)=PIDController(\n", " rtol=0.001,\n", " atol=1e-06,\n", " pcoeff=0,\n", @@ -150,17 +150,16 @@ " norm=,\n", " safety=0.9,\n", " error_order=None\n", - " ),\n", - " dt0(static)=None\n", + " )\n", ")\n" ] } ], "source": [ "true_model = Flow(\n", - " system=NonlinearDrag(m=1.0, r=2.0, r2=3., k=1.0),\n", + " system=NonlinearDrag(m=1.0, r=2.0, r2=3.0, k=1.0),\n", " solver=diffrax.Dopri8(),\n", - " step=diffrax.PIDController(rtol=1e-3, atol=1e-6),\n", + " stepsize_controller=diffrax.PIDController(rtol=1e-3, atol=1e-6),\n", ")\n", "print(\"true forward model:\", true_model)" ] @@ -174,12 +173,12 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 5, "metadata": {}, "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -192,7 +191,7 @@ "# time\n", "t = jnp.linspace(0, 2, 20)\n", "# forcing signal\n", - "u = 100*jnp.sin(t * 2 * np.pi)\n", + "u = 100 * jnp.sin(t * 2 * np.pi)\n", "# x are the states and y is the output\n", "x, y = true_model(t, u)\n", "# add noise to measurement\n", @@ -212,7 +211,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -229,7 +228,7 @@ " k(boxed: (0.001, 2))=1.0\n", " ),\n", " solver(static)=Tsit5(scan_kind=None),\n", - " step(static)=PIDController(\n", + " stepsize_controller(static)=PIDController(\n", " rtol=0.001,\n", " atol=1e-06,\n", " pcoeff=0,\n", @@ -245,8 +244,7 @@ " norm=,\n", " safety=0.9,\n", " error_order=None\n", - " ),\n", - " dt0(static)=None\n", + " )\n", ")\n" ] } @@ -255,7 +253,7 @@ "init_model = Flow(\n", " system=NonlinearDrag(m=1.0, r=1.0, r2=1.0, k=1.0),\n", " solver=diffrax.Tsit5(),\n", - " step=diffrax.PIDController(rtol=1e-3, atol=1e-6),\n", + " stepsize_controller=diffrax.PIDController(rtol=1e-3, atol=1e-6),\n", ")\n", "print(\"initial system:\", init_model)" ] @@ -269,7 +267,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 8, "metadata": {}, "outputs": [ { @@ -277,31 +275,28 @@ "output_type": "stream", "text": [ " Iteration Total nfev Cost Cost reduction Step norm Optimality \n", - " 0 1 1.6927e+00 5.15e-01 \n", - " 1 2 1.2288e-01 1.57e+00 6.59e+00 7.46e-02 \n", - " 2 3 2.1478e-02 1.01e-01 2.72e+00 1.70e-02 \n", - " 3 4 1.8862e-02 2.62e-03 3.15e-01 2.02e-03 \n", - " 4 5 1.8099e-02 7.63e-04 3.60e-01 8.95e-04 \n", - " 5 6 1.7903e-02 1.96e-04 1.97e-01 1.32e-04 \n", - " 6 9 1.7900e-02 3.55e-06 4.54e-03 1.17e-04 \n", - " 7 12 1.7899e-02 5.33e-07 6.28e-04 1.16e-04 \n", - " 8 13 1.7898e-02 1.27e-06 1.27e-03 1.15e-04 \n", - " 9 14 1.7897e-02 1.37e-06 2.57e-03 1.10e-04 \n", - " 10 16 1.7896e-02 5.21e-07 1.27e-03 1.08e-04 \n", - " 11 17 1.7893e-02 3.46e-06 1.40e-03 1.07e-04 \n", - " 12 19 1.7892e-02 6.18e-07 7.11e-04 1.06e-04 \n", - " 13 27 1.7892e-02 0.00e+00 0.00e+00 1.06e-04 \n", + " 0 1 1.6932e+00 5.19e-01 \n", + " 1 2 1.2050e-01 1.57e+00 6.56e+00 7.58e-02 \n", + " 2 3 2.1584e-02 9.89e-02 2.69e+00 1.97e-02 \n", + " 3 4 1.8803e-02 2.78e-03 3.26e-01 2.40e-03 \n", + " 4 5 1.8095e-02 7.08e-04 3.40e-01 2.85e-03 \n", + " 5 6 1.7878e-02 2.17e-04 2.22e-01 1.79e-03 \n", + " 6 7 1.7854e-02 2.34e-05 1.01e-01 8.56e-04 \n", + " 7 10 1.7842e-02 1.26e-05 6.96e-03 7.91e-05 \n", + " 8 16 1.7842e-02 7.74e-08 1.66e-05 7.91e-05 \n", + " 9 18 1.7842e-02 1.64e-09 8.29e-06 7.95e-05 \n", + " 10 23 1.7842e-02 0.00e+00 0.00e+00 7.95e-05 \n", "`xtol` termination condition is satisfied.\n", - "Function evaluations 27, initial cost 1.6927e+00, final cost 1.7892e-02, first-order optimality 1.06e-04.\n", + "Function evaluations 23, initial cost 1.6932e+00, final cost 1.7842e-02, first-order optimality 7.95e-05.\n", "fitted system: NonlinearDrag(\n", " initial_state(static)=Array([0., 0.], dtype=float64),\n", " n_inputs(static)='scalar',\n", - " m=Array(1.15256173, dtype=float64),\n", - " r(boxed: (0.0, inf))=Array(10.09924961, dtype=float64),\n", + " m=Array(1.16036016, dtype=float64),\n", + " r(boxed: (0.0, inf))=Array(10.03637659, dtype=float64),\n", " r2(boxed: (0.01, 1))=Array(1., dtype=float64),\n", - " k(boxed: (0.001, 2))=Array(1.86684786, dtype=float64)\n", + " k(boxed: (0.001, 2))=Array(1.91832787, dtype=float64)\n", ")\n", - "Normalized mean squared error: [0.07727285]\n" + "Normalized mean squared error: [0.07717369]\n" ] } ], @@ -321,12 +316,12 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 9, "metadata": {}, "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -355,12 +350,12 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 10, "metadata": {}, "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -396,7 +391,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 11, "metadata": {}, "outputs": [ { @@ -404,9 +399,9 @@ "output_type": "stream", "text": [ "Flow.system.m: 0.022\n", - "Flow.system.r: 1.810\n", - "Flow.system.r2: 0.378\n", - "Flow.system.k: 0.241\n" + "Flow.system.r: 1.785\n", + "Flow.system.r2: 0.372\n", + "Flow.system.k: 0.235\n" ] } ], diff --git a/tests/test_estimation.py b/tests/test_estimation.py index 5197bc2..d19aee5 100644 --- a/tests/test_estimation.py +++ b/tests/test_estimation.py @@ -19,26 +19,27 @@ from dynax.example_models import LotkaVolterra, NonlinearDrag, SpringMassDamper -tols = dict(rtol=1e-05, atol=1e-05) +tols = dict(rtol=1e-02, atol=1e-04) @pytest.mark.parametrize("outputs", [[0], [0, 1]]) def test_fit_least_squares(outputs): # data - t = np.linspace(0, 2, 200) + t = np.linspace(0, 1, 100) u = ( - np.sin(1 * 2 * np.pi * t) + 0.1 * np.sin(1 * 2 * np.pi * t) + np.sin(0.1 * 2 * np.pi * t) + np.sin(10 * 2 * np.pi * t) ) - x0 = jnp.array([1.0, 0.0]) - true_model = Flow(NonlinearDrag(1.0, 2.0, 3.0, 4.0, outputs)) - _, y_true = true_model(t, u, x0) + true_model = Flow( + NonlinearDrag(1.0, 2.0, 3.0, 4.0, outputs), + ) + _, y_true = true_model(t, u) # fit - init_model = Flow(NonlinearDrag(1.0, 1.0, 1.0, 1.0, outputs)) - pred_model = fit_least_squares(init_model, t, y_true, x0, u).result + init_model = Flow(NonlinearDrag(1.0, 2.0, 3.0, 4.0, outputs)) + pred_model = fit_least_squares(init_model, t, y_true, u, verbose=2).result # check result - _, y_pred = pred_model(t, u, x0) + _, y_pred = pred_model(t, u) npt.assert_allclose(y_pred, y_true, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -49,7 +50,7 @@ def test_fit_least_squares(outputs): def test_fit_least_squares_on_batch(): # data - t = np.linspace(0, 2, 200) + t = np.linspace(0, 1, 100) us = np.stack( ( np.sin(1 * 2 * np.pi * t), @@ -58,16 +59,18 @@ def test_fit_least_squares_on_batch(): ), axis=0, ) - x0 = np.array([1.0, 0.0]) - x0s = np.repeat(x0[None], us.shape[0], axis=0) ts = np.repeat(t[None], us.shape[0], axis=0) - true_model = Flow(NonlinearDrag(1.0, 2.0, 3.0, 4.0)) - _, ys = jax.vmap(true_model)(ts, us, x0s) + true_model = Flow( + NonlinearDrag(1.0, 2.0, 3.0, 4.0), + ) + _, ys = jax.vmap(true_model)(ts, us) # fit - init_model = Flow(NonlinearDrag(1.0, 1.0, 1.0, 1.0)) - pred_model = fit_least_squares(init_model, ts, ys, x0s, us, batched=True).result + init_model = Flow( + NonlinearDrag(1.0, 2.0, 3.0, 4.0), + ) + pred_model = fit_least_squares(init_model, ts, ys, us, batched=True).result # check result - _, ys_pred = jax.vmap(pred_model)(ts, us, x0s) + _, ys_pred = jax.vmap(pred_model)(ts, us) npt.assert_allclose(ys_pred, ys, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -80,7 +83,9 @@ def test_can_compute_jacfwd_with_implicit_methods(): # don't get catched by https://github.com/patrick-kidger/diffrax/issues/135 t = jnp.linspace(0, 1, 10) x0 = jnp.array([1.0, 0.0]) - solver_opt = dict(solver=Kvaerno5(), step=PIDController(atol=1e-6, rtol=1e-3)) + solver_opt = dict( + solver=Kvaerno5(), stepsize_controller=PIDController(atol=1e-6, rtol=1e-3) + ) def fun(m, r, k, x0=x0, solver_opt=solver_opt, t=t): model = Flow(SpringMassDamper(m, r, k), **solver_opt) @@ -94,19 +99,18 @@ def fun(m, r, k, x0=x0, solver_opt=solver_opt, t=t): def test_fit_with_bounded_parameters(): # data t = jnp.linspace(0, 1, 100) - x0 = jnp.array([0.5, 0.5]) - solver_opt = dict(step=PIDController(rtol=1e-5, atol=1e-7)) + solver_opt = dict(stepsize_controller=PIDController(rtol=1e-5, atol=1e-7)) true_model = Flow( LotkaVolterra(alpha=2 / 3, beta=4 / 3, gamma=1.0, delta=1.0), **solver_opt ) - x_true, _ = true_model(t, initial_state=x0) + x_true, _ = true_model(t) # fit init_model = Flow( LotkaVolterra(alpha=1.0, beta=1.0, gamma=1.5, delta=2.0), **solver_opt ) - pred_model = fit_least_squares(init_model, t, x_true, x0).result + pred_model = fit_least_squares(init_model, t, x_true).result # check result - x_pred, _ = pred_model(t, initial_state=x0) + x_pred, _ = pred_model(t) npt.assert_allclose(x_pred, x_true, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -134,7 +138,7 @@ def vector_field(self, x, u=None, t=None): # data t = jnp.linspace(0, 1, 100) - solver_opt = dict(step=PIDController(rtol=1e-5, atol=1e-7)) + solver_opt = dict(stepsize_controller=PIDController(rtol=1e-5, atol=1e-7)) true_model = Flow( LotkaVolterraBounded( alpha=2 / 3, beta=4 / 3, delta_gamma=jnp.array([1.0, 1.0]) @@ -159,25 +163,23 @@ def vector_field(self, x, u=None, t=None): @pytest.mark.parametrize("num_shots", [1, 2, 3]) def test_fit_multiple_shooting_with_input(num_shots): # data - t = jnp.linspace(0, 10, 10000) + t = jnp.linspace(0, 1, 200) u = jnp.sin(1 * 2 * np.pi * t) - x0 = jnp.array([1.0, 0.0]) true_model = Flow(SpringMassDamper(1.0, 2.0, 3.0)) - x_true, _ = true_model(t, u, initial_state=x0) + x_true, _ = true_model(t, u) # fit init_model = Flow(SpringMassDamper(1.0, 1.0, 1.0)) pred_model = fit_multiple_shooting( init_model, t, x_true, - x0, u, continuity_penalty=1, num_shots=num_shots, verbose=2, ).result # check result - x_pred, _ = pred_model(t, u, initial_state=x0) + x_pred, _ = pred_model(t, u) npt.assert_allclose(x_pred, x_true, **tols) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -189,22 +191,21 @@ def test_fit_multiple_shooting_with_input(num_shots): @pytest.mark.parametrize("num_shots", [1, 2, 3]) def test_fit_multiple_shooting_without_input(num_shots): # data - t = jnp.linspace(0, 1, 1000) - x0 = jnp.array([0.5, 0.5]) - solver_opt = dict(step=PIDController(rtol=1e-3, atol=1e-6)) + t = jnp.linspace(0, 1, 200) + solver_opt = dict(stepsize_controller=PIDController(rtol=1e-3, atol=1e-6)) true_model = Flow( LotkaVolterra(alpha=2 / 3, beta=4 / 3, gamma=1.0, delta=1.0), **solver_opt ) - x_true, _ = true_model(t, initial_state=x0) + x_true, _ = true_model(t) # fit init_model = Flow( LotkaVolterra(alpha=1.0, beta=1.0, gamma=1.5, delta=2.0), **solver_opt ) pred_model = fit_multiple_shooting( - init_model, t, x_true, x0, num_shots=num_shots, continuity_penalty=1 + init_model, t, x_true, num_shots=num_shots, continuity_penalty=1 ).result # check result - x_pred, _ = pred_model(t, initial_state=x0) + x_pred, _ = pred_model(t) npt.assert_allclose(x_pred, x_true, atol=1e-3, rtol=1e-3) npt.assert_allclose( jax.tree_util.tree_flatten(pred_model)[0], @@ -228,15 +229,14 @@ def test_csd_matching(): np.random.seed(123) # model sys = SpringMassDamper(1.0, 1.0, 1.0) - model = Flow(sys, step=PIDController(rtol=1e-4, atol=1e-6)) - x0 = np.zeros(jnp.shape(sys.initial_state)) + model = Flow(sys, stepsize_controller=PIDController(rtol=1e-4, atol=1e-6)) # input duration = 1000 sr = 50 t = np.arange(int(duration * sr)) / sr u = np.random.normal(size=len(t)) # output - _, y = model(t, u, initial_state=x0) + _, y = model(t, u) # fit init_sys = SpringMassDamper(1.0, 1.0, 1.0) fitted_sys = fit_csd_matching(init_sys, u, y, sr, nperseg=1024, verbose=1).result diff --git a/tests/test_evolution.py b/tests/test_evolution.py index 98ea8fd..a994952 100644 --- a/tests/test_evolution.py +++ b/tests/test_evolution.py @@ -44,7 +44,7 @@ def x(t, x0, dx0): x0 = jnp.array([1, 0]) # x(t=0)=1, dx(t=0)=0 t = jnp.linspace(0, 1) - model = Flow(sys, step=dfx.PIDController(rtol=1e-7, atol=1e-9)) + model = Flow(sys, stepsize_controller=dfx.PIDController(rtol=1e-7, atol=1e-9)) x_pred = model(t, initial_state=x0)[1] x_true = x(t, *x0) assert np.allclose(x_true, x_pred) @@ -70,7 +70,7 @@ def x(t, x0, dx0, uconst): x0 = jnp.array([1, 0]) # x(t=0)=1, dx(t=0)=0 t = jnp.linspace(0, 1) u = jnp.ones(t.shape + (1,)) * uconst - model = Flow(sys, step=dfx.PIDController(rtol=1e-7, atol=1e-9)) + model = Flow(sys, stepsize_controller=dfx.PIDController(rtol=1e-7, atol=1e-9)) x_pred = model(t, u, initial_state=x0)[1] x_true = x(t, x0[0], x0[1], uconst) assert np.allclose(x_true, x_pred) From 855a11d616e9847cbd249bb521d1ee72bdf3ff20 Mon Sep 17 00:00:00 2001 From: fhchl Date: Sun, 11 Feb 2024 15:00:28 +0100 Subject: [PATCH 09/10] add example for estimating initial state --- examples/fit_initial_state.py | 96 +++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 examples/fit_initial_state.py diff --git a/examples/fit_initial_state.py b/examples/fit_initial_state.py new file mode 100644 index 0000000..52d8e6d --- /dev/null +++ b/examples/fit_initial_state.py @@ -0,0 +1,96 @@ +"""Example: fit a second-order nonlinear system to data.""" + + +import jax.numpy as jnp +import matplotlib.pyplot as plt +import numpy as np + +from dynax import ControlAffine, fit_csd_matching, fit_least_squares, Flow, free_field + + +# Define a dynamical system of the form +# +# ẋ = f(x) + g(x)u +# y = h(x) +# +# The `ControlAffine` class inherits from eqinox.Module which inherits from +# `dataclasses.dataclass`. +class NonlinearDrag(ControlAffine): + """Spring-mass-damper system with nonliner drag. + + .. math:: m ẍ + r ẋ + r2 ẋ |ẋ| + k x = u + y = x + + """ + + # Declare parameters as dataclass fields. + m: float + r: float + r2: float + k: float + + # The initial_state attribute is static by default. If we want to make it learnable + # we must declare it using the `free_field` function. + initial_state: jnp.ndarray = free_field(init=True) + + n_inputs = "scalar" + + # Define the dynamical system via the methods f, g, and h + def f(self, x): + x1, x2 = x + return jnp.array( + [x2, (-self.r * x2 - self.r2 * jnp.abs(x2) * x2 - self.k * x1) / self.m] + ) + + def g(self, x): + return jnp.array([0.0, 1.0 / self.m]) + + def h(self, x): + return x[0] + + +# initiate a dynamical system representing the some "true" parameters +true_system = NonlinearDrag( + m=1.0, r=2.0, r2=0.1, k=4.0, initial_state=jnp.array([1.0, 1.0]) +) +# combine ODE system with ODE solver (Dopri5 and constant stepsize by default) +true_model = Flow(true_system) +print("true system:", true_system) + +# some training data using the true model. This could be your measurement data. +t_train = np.linspace(0, 10, 1000) +samplerate = 1 / t_train[1] +np.random.seed(42) +u_train = np.random.normal(size=len(t_train)) +x_train, y_train = true_model(t_train, u_train) + +# create our model system with some initial parameters +initial_sys = NonlinearDrag( + m=1.0, r=1.0, r2=1.0, k=1.0, initial_state=jnp.array([0.0, 0.0]) +) +print("initial system:", initial_sys) + +# If we have long-duration, wide-band input data we can fit the linear +# parameters by matching the transfer-functions. In this example the result is +# not very good. +initial_sys = fit_csd_matching( + initial_sys, u_train, y_train, samplerate, nperseg=100 +).result +print("linear params fitted:", initial_sys) + +# Combine the ODE with an ODE solver +init_model = Flow(initial_sys) +# Fit all parameters with previously estimated parameters as a starting guess. +pred_model = fit_least_squares( + model=init_model, t=t_train, y=y_train, u=u_train, verbose=0 +).result +print("fitted system:", pred_model.system) + +# check the results +x_pred, y_pred = pred_model(t_train, u_train) +assert np.allclose(x_train, x_pred) + +plt.plot(t_train, x_train, label="target") +plt.plot(t_train, x_pred, "--", label="prediction") +plt.legend() +plt.show() From f93ee877c906c9c88cfa13974c63ac48917f55a1 Mon Sep 17 00:00:00 2001 From: fhchl Date: Sun, 11 Feb 2024 15:00:39 +0100 Subject: [PATCH 10/10] fix typo --- dynax/system.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dynax/system.py b/dynax/system.py index 2dbf60b..89d1451 100644 --- a/dynax/system.py +++ b/dynax/system.py @@ -49,7 +49,7 @@ def boxed_field(lower: float, upper: float, **kwargs): def free_field(**kwargs): - """Remove the value constrained from attribute, e.g. when subclassing.""" + """Remove the value constraint from attribute, e.g. when subclassing.""" try: metadata = dict(kwargs["metadata"]) except KeyError: