Skip to content

Commit

Permalink
refac: make LinearSystem sublass of ControlAffine
Browse files Browse the repository at this point in the history
  • Loading branch information
fhchl committed Mar 3, 2024
1 parent 4a3812f commit dc2fe79
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 88 deletions.
1 change: 0 additions & 1 deletion dynax/derivative.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from jax.experimental.jet import jet


@lru_cache
def lie_derivative(f, h, n=1):
r"""Return n-th directional derivative of h along f.
Expand Down
50 changes: 28 additions & 22 deletions dynax/linearize.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,25 @@


# TODO: make this a method of ControlAffine
def relative_degree(
sys: ControlAffine, xs, max_reldeg=10, output: Optional[int] = None
) -> int:
def relative_degree(sys: ControlAffine, xs: Array, output: Optional[int] = None) -> int:
"""Estimate relative degree of system on region xs."""
# TODO: when ControlAffine has y = h(x) + i(x)u, include test for n = 0,
# i.e. i(x) == 0 for all x in xs.
assert sys.n_inputs in ["scalar", 1]
if sys.n_inputs not in ["scalar", 1]:
raise ValueError("System must be single input.")
if output is None:
# Make sure system has single output
msg = f"Output is None, but system has {sys.n_outputs} outputs."
assert sys.n_outputs in ["scalar", 1], msg
if sys.n_outputs not in ["scalar", 1]:
raise ValueError(f"Output is None, but system has {sys.n_outputs} outputs.")
h = sys.h
else:
h = lambda *args, **kwargs: sys.h(*args, **kwargs)[output]

for n in range(1, max_reldeg + 1):
LgLfn1h = lie_derivative(sys.g, lie_derivative(sys.f, h, n - 1))
res = jax.vmap(LgLfn1h)(xs)
max_reldeg = jnp.size(sys.initial_state)
for n in range(0, max_reldeg + 1):
if n == 0:
res = jax.vmap(sys.i)(xs)
else:
LgLfn1h = lie_derivative(sys.g, lie_derivative(sys.f, h, n - 1))
res = jax.vmap(LgLfn1h)(xs)
if np.all(res == 0.0):
continue
elif np.all(res != 0.0):
Expand All @@ -62,7 +63,7 @@ def input_output_linearize(
output: Optional[int] = None,
asymptotic: Optional[Sequence] = None,
reg: Optional[float] = None,
) -> Callable[[Array, Array, float], float]:
) -> Callable[[Array, Array, float], Array]:
"""Construct input-output linearizing feedback law.
Args:
Expand All @@ -80,7 +81,7 @@ def input_output_linearize(
Only single-input-single-output systems are currently supported.
"""
assert sys.n_inputs == ref.n_inputs, "systems habe same input dimension"
assert sys.n_inputs == ref.n_inputs, "systems have same input dimension"
assert sys.n_inputs in [1, "scalar"]

if output is None:
Expand All @@ -99,24 +100,28 @@ def input_output_linearize(

if asymptotic is None:

def feedbacklaw(x: Array, z: Array, v: float) -> float:
def feedbacklaw(x: Array, z: Array, v: float) -> Array:
y_reldeg_ref = cAn.dot(z) + cAnm1b * v
y_reldeg = Lfnh(x)
return (y_reldeg_ref - y_reldeg) / LgLfnm1h(x)
out = (y_reldeg_ref - y_reldeg) / LgLfnm1h(x)
return out if sys.n_inputs != "scalar" else out.squeeze()

else:
msg = f"asymptotic must be of length {reldeg=} but, {len(asymptotic)=}"
assert len(asymptotic) == reldeg, msg
if len(asymptotic) != reldeg:
raise ValueError(
f"asymptotic must be of length {reldeg=} but, {len(asymptotic)=}"
)

coeffs = np.concatenate(([1], asymptotic))
msg = "Polynomial must be Hurwitz"
assert np.all(np.real(np.roots(coeffs)) <= 0)
if not np.all(np.real(np.roots(coeffs)) <= 0):
raise ValueError("Polynomial must be Hurwitz")

alphas = asymptotic

cAis = [c.dot(np.linalg.matrix_power(A, i)) for i in range(reldeg)]
Lfihs = [lie_derivative(sys.f, h, i) for i in range(reldeg)]

def feedbacklaw(x: Array, z: Array, v: float) -> float:
def feedbacklaw(x: Array, z: Array, v: float) -> Array:
y_reldeg_ref = cAn.dot(z) + cAnm1b * v
y_reldeg = Lfnh(x)
ae0s = jnp.array(
Expand All @@ -127,10 +132,11 @@ def feedbacklaw(x: Array, z: Array, v: float) -> float:
)
error = y_reldeg_ref - y_reldeg + jnp.sum(ae0s)
if reg is None:
return error / LgLfnm1h(x)
out = error / LgLfnm1h(x)
else:
l = LgLfnm1h(x)
return error * l / (l + reg)
out = error * l / (l + reg)
return out if sys.n_inputs != "scalar" else out.squeeze()

return feedbacklaw

Expand Down
129 changes: 65 additions & 64 deletions dynax/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ class DynamicalSystem(eqx.Module):
ẋ &= f(x, u, t) \\
y &= h(x, u, t)
Subclasses must set values for attributes n_states, n_inputs, and implement the
`vector_field` method. Use the optional `output` method to describe measurent
Subclasses must set values for attributes n_states, n_inputs, and implement the
`vector_field` method. Use the optional `output` method to describe measurent
equations. Otherwise, the total state is returned as output.
In most cases, it is not needed to define a custom __init__ method, as
`DynamicalSystem` is a dataclass.
Example::
class IntegratorAndGain(DynamicalSystem):
Expand All @@ -108,7 +108,7 @@ def __check_init__(self):
if not hasattr(self, attr):
raise AttributeError(f"Attribute '{attr}' not initialized.")

# Check that vector_field returns Arrays or scalars and not PyTrees
# Check that vector_field and output returns Arrays or scalars and not PyTrees
x = self.initial_state
u = jax.ShapeDtypeStruct(dim2shape(self.n_inputs), jnp.float64)
try:
Expand All @@ -118,12 +118,11 @@ def __check_init__(self):
raise ValueError(
"Can not evaluate output shapes. Check your definitions!"
) from e
if not isinstance(dx, jax.ShapeDtypeStruct):
raise ValueError(
f"vector_field must return arrays or scalars, not {type(dx)}"
)
if not isinstance(y, jax.ShapeDtypeStruct):
raise ValueError(f"outpuut must return arrays or scalars, not {type(y)}")
for val, func in zip((dx, y), ("vector_field, output")): # noqa: B905
if not isinstance(val, jax.ShapeDtypeStruct):
raise ValueError(
f"{func} must return arrays or scalars, not {type(val)}"
)

@abstractmethod
def vector_field(self, x, u=None, t=None) -> Array:
Expand Down Expand Up @@ -212,7 +211,44 @@ def linearize(self, x0=None, u0=None, t=None) -> "LinearSystem":
# (x, pytree_interal_states_x)


class LinearSystem(DynamicalSystem):
class ControlAffine(DynamicalSystem):
r"""A control-affine dynamical system.
.. math::
ẋ &= f(x) + g(x)u \\
y &= h(x) + i(x)u
"""

@abstractmethod
def f(self, x: Array) -> Array:
pass

@abstractmethod
def g(self, x: Array) -> Array:
pass

def h(self, x: Array) -> Array:
return x

def i(self, x: Array) -> Array:
return jnp.array(0.0)

def vector_field(self, x, u=None, t=None):
out = self.f(x)
if u is not None:
out += self.g(x).dot(u)
return out

def output(self, x, u=None, t=None):
out = self.h(x)
if u is not None:
out += self.i(x).dot(u)
return out


class LinearSystem(ControlAffine):
r"""A linear, time-invariant dynamical system.
.. math::
Expand All @@ -222,10 +258,6 @@ class LinearSystem(DynamicalSystem):
"""

# TODO: could be subclass of control-affine? Two blocking problems:
# - may h depend on u? Needed for D. If so, then one could compute
# relative degree.

A: Array
B: Array
C: Array
Expand Down Expand Up @@ -255,48 +287,17 @@ def n_inputs(self) -> int | Literal["scalar"]: # type: ignore
return self.B.shape[1]
raise ValueError("Dimension mismatch.")

def vector_field(self, x, u=None, t=None) -> Array:
out = self.A.dot(x)
if u is not None:
out += self.B.dot(u)
return out

def output(self, x, u=None, t=None) -> Array:
out = self.C.dot(x)
if u is not None:
out += self.D.dot(u)
return out


# TODO: make abstract


class ControlAffine(DynamicalSystem):
r"""A control-affine dynamical system.
.. math::
ẋ &= f(x) + g(x)u \\
y &= h(x)
"""

def f(self, x):
raise NotImplementedError
return self.A.dot(x)

def g(self, x):
raise NotImplementedError
return self.B

def h(self, x):
return x
return self.C.dot(x)

def vector_field(self, x, u=None, t=None):
if u is None:
u = 0
return self.f(x) + self.g(x) * u

def output(self, x, u=None, t=None):
return self.h(x)
def i(self, x):
return self.D


class _CoupledSystemMixin(eqx.Module):
Expand Down Expand Up @@ -334,7 +335,7 @@ class SeriesSystem(DynamicalSystem, _CoupledSystemMixin):
y_2 &= h_2(x_2, y1, t)
.. aafig::
+------+ +------+
u --+->+ sys1 +--y1->+ sys2 +--> y2
+------+ +------+
Expand Down Expand Up @@ -369,7 +370,7 @@ def output(self, x, u=None, t=None):

class FeedbackSystem(DynamicalSystem, _CoupledSystemMixin):
r"""Two systems connected via feedback.
.. math::
ẋ_1 &= f_1(x_1, u + y_2, t) \\
Expand All @@ -378,7 +379,7 @@ class FeedbackSystem(DynamicalSystem, _CoupledSystemMixin):
y_2 &= h_2(x_2, y_1, t) \\
.. aafig::
+------+
u --+->+ sys1 +--+-> y1
^ +------+ |
Expand Down Expand Up @@ -427,12 +428,12 @@ class StaticStateFeedbackSystem(DynamicalSystem):
y &= h(x, u, t)
.. aafig::
+-----+
u --+------------->+ sys +----> y
^ +--+--+
| |
| | x
^ +--+--+
| |
| | x
| +--------+ |
+--+ "v(x)" +<----+
+--------+
Expand Down Expand Up @@ -468,22 +469,22 @@ class DynamicStateFeedbackSystem(DynamicalSystem, _CoupledSystemMixin):
r"""System with dynamic state-feedback.
.. math::
ẋ_1 &= f_1(x_1, v(x_1, x_2, u), t) \\
ẋ_2 &= f_2(x_2, u, t) \\
y &= h_1(x_1, u, t)
.. aafig::
+--------------+ +-----+
u -+->+ v(x1, x2, u) +--v->+ sys +-> y
| +-+-------+----+ +--+--+
| +-+-------+----+ +--+--+
| ^ ^ |
| | x2 | x1 |
| | +-------------+
| +------+
+->+ sys2 |
+------+
| +------+
+->+ sys2 |
+------+
"""

Expand Down
2 changes: 1 addition & 1 deletion tests/test_linearize.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_relative_degree():

def test_discrete_relative_degree():
xs = np.random.normal(size=(100, 2))
us = np.random.normal(size=(100, 1))
us = np.random.normal(size=(100))

sys = SpringMassDamperWithOutput(out=0)
assert discrete_relative_degree(sys, xs, us) == 2
Expand Down

0 comments on commit dc2fe79

Please sign in to comment.