Skip to content

Commit

Permalink
Merge pull request #33 from dfki-ric/fix/spatial_scaling
Browse files Browse the repository at this point in the history
Fix spatial scaling
  • Loading branch information
AlexanderFabisch authored Nov 23, 2023
2 parents 34fbaf0 + 5de8701 commit 0e072f4
Show file tree
Hide file tree
Showing 12 changed files with 641 additions and 169 deletions.
33 changes: 24 additions & 9 deletions examples/plot_dmp_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@
The standard DMP definition of Ijspeert et al. (2013) does not scale well,
when the original demonstration has both the start and the goal state close
together: small perturbation in the start and/or goal state often result in
large accelerations. '''To fix this issue, our implementation of the DMP does
not scale the forcing term by $(g - y_0)$!''' This example demonstrates the
behavior of this DMP implementation in these cases.
large accelerations. To fix this issue, our implementation of the DMP does
not scale the forcing term by $(g - y_0)$. We also adopted the modification of
P. Pastor, H. Hoffmann, T. Asfour, S. Schaal:
Learning and Generalization of Motor Skills by Learning from Demonstration
Note that this has to be activated explicitly by setting smooth scaling in the
constructor of the DMP. This behavior is implemented for all types of DMPs.
This example demonstrates the behavior of this DMP implementation in these
cases.
"""
import numpy as np
import matplotlib.pyplot as plt
Expand All @@ -18,22 +25,30 @@
T = np.linspace(0, 1, 101)
x = np.sin(T ** 2 * 1.99 * np.pi)
y = np.cos(T ** 2 * 1.99 * np.pi)
Y = np.column_stack((x, y))
z = qx = qy = qz = np.zeros_like(x)
qw = np.ones_like(x)
Y = np.column_stack((x, y, z, qw, qx, qy, qz, x, y, z, qw, qx, qy, qz))
start = Y[0]
goal = Y[-1]
new_goal = np.array([0.5, 0.5])
Y_shifted = Y - goal[np.newaxis] + new_goal[np.newaxis]
new_start = np.array([0.5, 0.5, 0, 1, 0, 0, 0, 0.5, 0.5, 0, 1, 0, 0, 0])
new_goal = np.array([-0.5, 0.5, 0, 1, 0, 0, 0, -0.5, 0.5, 0, 1, 0, 0, 0])
Y_start_shifted = Y - start[np.newaxis] + new_start[np.newaxis]
Y_goal_shifted = Y - goal[np.newaxis] + new_goal[np.newaxis]

dmp = DMP(n_dims=len(start), execution_time=1.0, dt=0.01, n_weights_per_dim=20)
dmp = DMP(n_dims=len(start), execution_time=1.0, dt=0.01, n_weights_per_dim=20,
smooth_scaling=True)
dmp.imitate(T, Y)
dmp.configure(goal_y=new_goal)
dmp.configure(start_y=new_start, goal_y=new_goal)
_, Y_dmp = dmp.open_loop()

plt.plot(Y[:, 0], Y[:, 1], label=r"Demonstration, $g \approx y_0$", ls="--")
plt.plot(Y_shifted[:, 0], Y_shifted[:, 1], label="Original shape with new goal", ls="--")
plt.plot(Y_start_shifted[:, 0], Y_start_shifted[:, 1], label="Original shape with new start", ls="--")
plt.plot(Y_goal_shifted[:, 0], Y_goal_shifted[:, 1], label="Original shape with new goal", ls="--")
plt.plot(Y_dmp[:, 0], Y_dmp[:, 1], label="DMP with new goal", lw=5, alpha=0.5)
plt.scatter(Y_dmp[:, 0], Y_dmp[:, 1], alpha=0.5)
plt.scatter(start[0], start[1], c="r", label="Goal of demonstration: $g$")
plt.scatter(goal[0], goal[1], c="g", label="Start of demonstration: $y_0$")
plt.scatter(new_start[0], new_start[1], c="c", label="New start: $y_0'$")
plt.scatter(new_goal[0], new_goal[1], c="y", label="New goal: $g'$")
plt.xlabel("$y_1$")
plt.ylabel("$y_2$")
Expand Down
124 changes: 96 additions & 28 deletions movement_primitives/dmp/_cartesian_dmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ._forcing_term import ForcingTerm
from ._canonical_system import canonical_system_alpha
from ._dmp import (dmp_open_loop, dmp_imitate, ridge_regression,
DMP_STEP_FUNCTIONS, DEFAULT_DMP_STEP_FUNCTION)
DMP_STEP_FUNCTIONS, DEFAULT_DMP_STEP_FUNCTION, phase)


def dmp_step_quaternion_python(
Expand All @@ -16,7 +16,8 @@ def dmp_step_quaternion_python(
forcing_term,
coupling_term=None,
coupling_term_precomputed=None,
int_dt=0.001):
int_dt=0.001,
smooth_scaling=False):
"""Integrate quaternion DMP for one step with Euler integration.
Parameters
Expand Down Expand Up @@ -77,6 +78,11 @@ def dmp_step_quaternion_python(
int_dt : float, optional (default: 0.001)
Time delta used internally for integration.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
Raises
------
ValueError
Expand Down Expand Up @@ -107,14 +113,25 @@ def dmp_step_quaternion_python(
cd += coupling_term_precomputed[0]
cdd += coupling_term_precomputed[1]

f = forcing_term(current_t).squeeze()
z = forcing_term.phase(current_t, int_dt)
f = forcing_term.forcing_term(z).squeeze()

if smooth_scaling:
goal_y_minus_start_y = pr.compact_axis_angle_from_quaternion(
pr.concatenate_quaternions(goal_y, pr.q_conj(start_y)))
smoothing = beta_y * z * goal_y_minus_start_y
else:
smoothing = 0.0

current_ydd[:] = (
alpha_y * (beta_y * pr.compact_axis_angle_from_quaternion(
pr.concatenate_quaternions(
goal_y, pr.q_conj(current_y)))
- execution_time * current_yd)
+ f + cdd) / execution_time ** 2
alpha_y * (
beta_y * pr.compact_axis_angle_from_quaternion(pr.concatenate_quaternions(goal_y, pr.q_conj(current_y)))
- execution_time * current_yd
- smoothing
)
+ f
+ cdd
) / execution_time ** 2
current_yd += dt * current_ydd + cd / execution_time
current_y[:] = pr.concatenate_quaternions(
pr.quaternion_from_compact_axis_angle(dt * current_yd), current_y)
Expand Down Expand Up @@ -144,7 +161,16 @@ class CartesianDMP(DMPBase):
Orientation in Cartesian space dynamic movement primitives (2014),
IEEE International Conference on Robotics and Automation (ICRA),
pp. 2997-3004, doi: 10.1109/ICRA.2014.6907291,
https://ieeexplore.ieee.org/document/6907291
https://ieeexplore.ieee.org/document/6907291,
https://acat-project.eu/modules/BibtexModule/uploads/PDF/udenemecpetric2014.pdf
(if smooth scaling is activated) with modification of scaling proposed by
P. Pastor, H. Hoffmann, T. Asfour, S. Schaal:
Learning and Generalization of Motor Skills by Learning from Demonstration,
2009 IEEE International Conference on Robotics and Automation,
Kobe, Japan, 2009, pp. 763-768, doi: 10.1109/ROBOT.2009.5152385,
https://h2t.iar.kit.edu/pdf/Pastor2009.pdf
While the dimension of the state space is 7, the dimension of the
velocity, acceleration, and forcing term is 6.
Expand All @@ -163,6 +189,11 @@ class CartesianDMP(DMPBase):
int_dt : float, optional (default: 0.001)
Time difference for Euler integration.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
Attributes
----------
execution_time_ : float
Expand All @@ -174,12 +205,13 @@ class CartesianDMP(DMPBase):
"""
def __init__(
self, execution_time=1.0, dt=0.01, n_weights_per_dim=10,
int_dt=0.001):
int_dt=0.001, smooth_scaling=False):
super(CartesianDMP, self).__init__(7, 6)
self._execution_time = execution_time
self.dt_ = dt
self.n_weights_per_dim = n_weights_per_dim
self.int_dt = int_dt
self.smooth_scaling = smooth_scaling

self._init_forcing_term()

Expand Down Expand Up @@ -258,7 +290,8 @@ def step(self, last_y, last_yd, coupling_term=None,
self.alpha_y, self.beta_y,
self.forcing_term_pos,
coupling_term=coupling_term,
int_dt=self.int_dt)
int_dt=self.int_dt,
smooth_scaling=self.smooth_scaling)
quaternion_step_function(
self.last_t, self.t,
self.current_y[3:], self.current_yd[3:],
Expand All @@ -268,7 +301,8 @@ def step(self, last_y, last_yd, coupling_term=None,
self.alpha_y, self.beta_y,
self.forcing_term_rot,
coupling_term=coupling_term,
int_dt=self.int_dt)
int_dt=self.int_dt,
smooth_scaling=self.smooth_scaling)
return np.copy(self.current_y), np.copy(self.current_yd)

def open_loop(self, run_t=None, coupling_term=None,
Expand Down Expand Up @@ -312,7 +346,8 @@ def open_loop(self, run_t=None, coupling_term=None,
self.forcing_term_pos,
coupling_term,
run_t, self.int_dt,
step_function=step_function)
step_function=step_function,
smooth_scaling=self.smooth_scaling)
try:
quaternion_step_function = CARTESIAN_DMP_STEP_FUNCTIONS[
quaternion_step_function]
Expand All @@ -327,7 +362,8 @@ def open_loop(self, run_t=None, coupling_term=None,
self.forcing_term_rot,
coupling_term,
run_t, self.int_dt,
quaternion_step_function)
quaternion_step_function,
self.smooth_scaling)
return T, np.hstack((Yp, Yr))

def imitate(self, T, Y, regularization_coefficient=0.0,
Expand Down Expand Up @@ -355,15 +391,17 @@ def imitate(self, T, Y, regularization_coefficient=0.0,
alpha_y=self.alpha_y, beta_y=self.beta_y,
overlap=self.forcing_term_pos.overlap,
alpha_z=self.forcing_term_pos.alpha_z,
allow_final_velocity=allow_final_velocity)[0]
allow_final_velocity=allow_final_velocity,
smooth_scaling=self.smooth_scaling)[0]
self.forcing_term_rot.weights_[:, :] = dmp_quaternion_imitation(
T, Y[:, 3:],
n_weights_per_dim=self.n_weights_per_dim,
regularization_coefficient=regularization_coefficient,
alpha_y=self.alpha_y, beta_y=self.beta_y,
overlap=self.forcing_term_rot.overlap,
alpha_z=self.forcing_term_rot.alpha_z,
allow_final_velocity=allow_final_velocity)[0]
allow_final_velocity=allow_final_velocity,
smooth_scaling=self.smooth_scaling)[0]

self.configure(start_y=Y[0], goal_y=Y[-1])

Expand Down Expand Up @@ -395,7 +433,7 @@ def set_weights(self, weights):

def dmp_quaternion_imitation(
T, Y, n_weights_per_dim, regularization_coefficient, alpha_y, beta_y,
overlap, alpha_z, allow_final_velocity):
overlap, alpha_z, allow_final_velocity, smooth_scaling=False):
"""Compute weights and metaparameters of quaternion DMP.
Parameters
Expand Down Expand Up @@ -428,6 +466,11 @@ def dmp_quaternion_imitation(
allow_final_velocity : bool
Whether a final velocity is allowed. Will be set to 0 otherwise.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
Returns
-------
weights : array, shape (3, n_weights_per_dim)
Expand Down Expand Up @@ -457,16 +500,19 @@ def dmp_quaternion_imitation(
forcing_term = ForcingTerm(
3, n_weights_per_dim, T[-1], T[0], overlap, alpha_z)
F, start_y, start_yd, start_ydd, goal_y, goal_yd, goal_ydd = \
determine_forces_quaternion(T, Y, alpha_y, beta_y,
allow_final_velocity) # n_steps x n_dims
determine_forces_quaternion(
T, Y, alpha_y, beta_y, alpha_z, allow_final_velocity,
smooth_scaling) # n_steps x n_dims

X = forcing_term.design_matrix(T) # n_weights_per_dim x n_steps

return (ridge_regression(X, F, regularization_coefficient),
start_y, start_yd, start_ydd, goal_y, goal_yd, goal_ydd)


def determine_forces_quaternion(T, Y, alpha_y, beta_y, allow_final_velocity):
def determine_forces_quaternion(
T, Y, alpha_y, beta_y, alpha_z, allow_final_velocity,
smooth_scaling=False):
"""Determine forces that the forcing term should generate.
Parameters
Expand All @@ -483,9 +529,17 @@ def determine_forces_quaternion(T, Y, alpha_y, beta_y, allow_final_velocity):
beta_y : float
Parameter of the transformation system.
alpha_z : float
Parameter of the canonical system.
allow_final_velocity : bool
Whether a final velocity is allowed. Will be set to 0 otherwise.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
Returns
-------
F : array, shape (n_steps, n_dims)
Expand Down Expand Up @@ -524,22 +578,31 @@ def determine_forces_quaternion(T, Y, alpha_y, beta_y, allow_final_velocity):

execution_time = T[-1] - T[0]
goal_y = Y[-1]
start_y = Y[0]
goal_y_minus_start_y = pr.compact_axis_angle_from_quaternion(
pr.concatenate_quaternions(goal_y, pr.q_conj(start_y)))
S = phase(T, alpha_z, T[-1], T[0])
F = np.empty((len(T), n_dims))
for t in range(len(T)):
F[t, :] = (
execution_time ** 2 * Ydd[t]
- alpha_y * (beta_y * pr.compact_axis_angle_from_quaternion(
pr.concatenate_quaternions(
goal_y, pr.q_conj(Y[t])))
- Yd[t] * execution_time))
if smooth_scaling:
smoothing = beta_y * S[t] * goal_y_minus_start_y
else:
smoothing = 0.0
F[t, :] = execution_time ** 2 * Ydd[t] - alpha_y * (
beta_y * pr.compact_axis_angle_from_quaternion(
pr.concatenate_quaternions(goal_y, pr.q_conj(Y[t])))
- execution_time * Yd[t]
- smoothing
)
return F, Y[0], Yd[0], Ydd[0], Y[-1], Yd[-1], Ydd[-1]


def dmp_open_loop_quaternion(
goal_t, start_t, dt, start_y, goal_y, alpha_y, beta_y, forcing_term,
coupling_term=None, run_t=None, int_dt=0.001,
quaternion_step_function=CARTESIAN_DMP_STEP_FUNCTIONS[
DEFAULT_CARTESIAN_DMP_STEP_FUNCTION]):
DEFAULT_CARTESIAN_DMP_STEP_FUNCTION],
smooth_scaling=False):
"""Run Cartesian DMP without external feedback.
Parameters
Expand Down Expand Up @@ -581,6 +644,11 @@ def dmp_open_loop_quaternion(
quaternion_step_function : callable, optional (default: cython code if available)
DMP integration function.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
Returns
-------
T : array, shape (n_steps,)
Expand Down Expand Up @@ -612,7 +680,7 @@ def dmp_open_loop_quaternion(
goal_t=goal_t, start_t=start_t,
alpha_y=alpha_y, beta_y=beta_y,
forcing_term=forcing_term, coupling_term=coupling_term,
int_dt=int_dt)
int_dt=int_dt, smooth_scaling=smooth_scaling)
Y[i] = current_y

return T, Y
Loading

0 comments on commit 0e072f4

Please sign in to comment.