Skip to content

Commit

Permalink
Removed duplicate launch file, added READMEs, added docs and typehint…
Browse files Browse the repository at this point in the history
…s, code quality stuff
  • Loading branch information
Mokaz committed Mar 10, 2024
1 parent 3389619 commit e03088b
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 208 deletions.
35 changes: 0 additions & 35 deletions asv_setup/launch/vessel_sim.launch.py

This file was deleted.

12 changes: 12 additions & 0 deletions guidance/hybridpath_guidance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Hybrid Path Guidance

This package provides the implementation of hybrid path guidance for the Vortex ASV.

## Usage

To use the hybrid path guidance launch it using: `ros2 launch hybridpath_guidance hybridpath_guidance.launch`
Or alternatively, run it together with the hybridpath controller using the launch file `hybridpath.launch.py` in asv_setup

## Configuration

You can configure the behavior of the hybrid path guidance by modifying the parameters in the `config` directory.
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
hybridpath_guidance:
lambda_val: 0.6 # Curvature constant
path_generator_order: 2 # Differentiability order
time_to_max_speed: 10.0 # Time to reach maximum speed
time_to_max_speed: 10.0 # Time to reach maximum speed
dt: 0.05 # Time step
Original file line number Diff line number Diff line change
Expand Up @@ -17,88 +17,106 @@ def __init__(self, waypoints):
parameters=[
('hybridpath_guidance.lambda_val', 0.6),
('hybridpath_guidance.path_generator_order', 2),
('hybridpath_guidance.time_to_max_speed', 10.0)
('hybridpath_guidance.time_to_max_speed', 10.0),
('hybridpath_guidance.dt', 0.05),
])
# Subscribers
self.state_subscriber_ = self.create_subscription(Odometry, "/sensor/seapath/odometry/ned", self.state_cb, 1)

# Publishers
self.state_subscriber_ = self.create_subscription(Odometry, "/sensor/seapath/odometry/ned", self.state_cb, 1)
self.guidance_publisher_ = self.create_publisher(HybridpathReference, "guidance/hybridpath/reference", 1)

# Init parameters
self.waypoints = waypoints
self.lambda_val = self.get_parameter('hybridpath_guidance.lambda_val').get_parameter_value().double_value
self.path_generator_order = self.get_parameter('hybridpath_guidance.path_generator_order').get_parameter_value().integer_value
self.time_to_max_speed = self.get_parameter('hybridpath_guidance.time_to_max_speed').get_parameter_value().double_value # Not used
# TODO: Add support for WaypointManager

self.waypoints_ = waypoints
self.lambda_val_ = self.get_parameter('hybridpath_guidance.lambda_val').get_parameter_value().double_value
self.path_generator_order_ = self.get_parameter('hybridpath_guidance.path_generator_order').get_parameter_value().integer_value
self.time_to_max_speed_ = self.get_parameter('hybridpath_guidance.time_to_max_speed').get_parameter_value().double_value # Not used
self.dt_ = self.get_parameter('hybridpath_guidance.dt').get_parameter_value().double_value

self.generator = HybridPathGenerator(waypoints, self.path_generator_order, self.lambda_val, 1)
self.path = self.generator.Path
self.dt = 0.05

self.eta_d = np.zeros(3)
self.eta_d_prev = np.zeros(3)

self.eta = np.zeros(3)
self.eta_prev = np.zeros(3)
self.nu = np.zeros(3)
self.tau = np.zeros(3)
self.s = 0
# self.sig = HybridPathSignals(self.path, self.s)
self.u_desired = 1
self.generator_ = HybridPathGenerator(waypoints, self.path_generator_order_, self.lambda_val_, 1)
self.path_ = self.generator_.Path

self.eta_d_ = np.zeros(3)
self.eta_d_prev_ = np.zeros(3)
self.eta_ = np.zeros(3)
self.eta_prev_ = np.zeros(3)
self.nu_ = np.zeros(3)
self.tau_ = np.zeros(3)
self.s_ = 0
self.u_desired_ = 1

self.get_logger().info("hybridpath_guidance_node started")

def calculate_desired_states(self, dt):
sig = HybridPathSignals(self.path, self.s)
v_ref, v_ref_s = sig.calc_vs(self.u_desired)
self.s += v_ref * dt
pd = sig.pd # Reference position
psi_d = sig.psi # Reference heading
self.eta_d = np.array([pd[0], pd[1], psi_d])
psi0 = self.eta_d_prev[2]
psi1 = self.eta_d[2]
def calculate_desired_states(self, dt: float) -> tuple[float, float, float, float, np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate the desired states for the guidance system
Args:
dt (float): Time step
Returns:
w_ref, v_ref, v_ref_t, v_ref_s, eta_d, eta_d_s, eta_d_ss
"""
sig = HybridPathSignals(self.path_, self.s_)
v_ref, v_ref_s = sig.calc_vs(self.u_desired_)
self.s_ += v_ref * dt
pd = sig.pd # Reference position
psi_d = sig.psi # Reference heading
self.eta_d_ = np.array([pd[0], pd[1], psi_d])
psi0 = self.eta_d_prev_[2]
psi1 = self.eta_d_[2]
psi_vec = np.array([psi0, psi1])
psi_vec = np.unwrap(psi_vec, period=2*np.pi)
self.eta_d[2] = psi_vec[1]
psi_vec = np.unwrap(psi_vec, period=2 * np.pi)
self.eta_d_[2] = psi_vec[1]

# Variables needed for tau
#w_ref = 0
v_ref_t = 0 # Constant speed
v_ref_t = 0 # Constant speed
eta_d_s = np.array([sig.pd_der[0][0], sig.pd_der[0][1], sig.psi_der])
eta_d_ss = np.array([sig.pd_der[1][0], sig.pd_der[1][1], sig.psi_dder])
R, R_trsp = Rot(self.eta_prev[2])
eta_error = self.eta_prev - self.eta_d
R, R_trsp = Rot(self.eta_prev_[2])
eta_error = self.eta_prev_ - self.eta_d_
my = 0.5
w_ref = my * (eta_d_s @ R @ (R_trsp @ eta_error)) / (np.linalg.norm(eta_d_s)**2)
w_ref = my * (eta_d_s @ R @ (R_trsp @ eta_error)) / (np.linalg.norm(eta_d_s) ** 2)

return w_ref, v_ref, v_ref_t, v_ref_s, self.eta_d_, eta_d_s, eta_d_ss

return w_ref, v_ref, v_ref_t, v_ref_s, self.eta_d, eta_d_s, eta_d_ss
def update_path(self, waypoints: np.ndarray) -> None:
"""
Update the path generator with new waypoints
def update_path(self, waypoints):
self.generator = HybridPathGenerator(waypoints, self.path_generator_order, self.lambda_val, 1)
self.path = self.generator.Path
Args:
waypoints (np.ndarray): New waypoints
"""
self.generator_ = HybridPathGenerator(waypoints, self.path_generator_order_, self.lambda_val_, 1)
self.path_ = self.generator_.Path

def state_cb(self, msg: Odometry) -> None:
"""
Callback function for the state message. Calculates the desired states and publishes the reference message.
Args:
msg (Odometry): The state message.
"""

def state_cb(self, msg):
state = odometrymsg_to_state(msg)
self.eta, self.nu = state[:3], state[3:]
w_ref, v_ref, v_ref_t, v_ref_s, eta_d, eta_d_s, eta_d_ss = self.calculate_desired_states(self.dt)
self.eta_, self.nu_ = state[:3], state[3:]
w_ref, v_ref, v_ref_t, v_ref_s, eta_d, eta_d_s, eta_d_ss = self.calculate_desired_states(self.dt_)

# Send (eta[:,i-1], nu[:,i-1], w, v_ref, v_ref_t, v_ref_s, eta_d[:,i], eta_d_s, eta_d_ss)
hp_ref_msg = HybridpathReference()
hp_ref_msg.w_ref = w_ref
hp_ref_msg.v_ref = v_ref
hp_ref_msg.v_ref_t = float(v_ref_t)
hp_ref_msg.v_ref_s = float(v_ref_s)

hp_ref_msg.eta = Pose2D(x=self.eta[0], y=self.eta[1], theta=self.eta[2])
hp_ref_msg.nu = Pose2D(x=self.nu[0], y=self.nu[1], theta=self.nu[2])
hp_ref_msg.eta = Pose2D(x=self.eta_[0], y=self.eta_[1], theta=self.eta_[2])
hp_ref_msg.nu = Pose2D(x=self.nu_[0], y=self.nu_[1], theta=self.nu_[2])
hp_ref_msg.eta_d = Pose2D(x=eta_d[0], y=eta_d[1], theta=eta_d[2])
hp_ref_msg.eta_d_s = Pose2D(x=eta_d_s[0], y=eta_d_s[1], theta=eta_d_s[2])
hp_ref_msg.eta_d_ss = Pose2D(x=eta_d_ss[0], y=eta_d_ss[1], theta=eta_d_ss[2])

self.guidance_publisher_.publish(hp_ref_msg)
self.eta_d_prev = self.eta_d
self.eta_prev = self.eta

self.eta_d_prev_ = self.eta_d_
self.eta_prev_ = self.eta_


def main(args=None):
Expand Down
12 changes: 12 additions & 0 deletions motion/hybridpath_controller/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Hybrid Path Controller

This package provides the implementation of hybrid path controller for the Vortex ASV.

## Usage

To use the hybrid path guidance launch it using: `ros2 launch hybridpath_controller hybridpath_controller.launch`
Or alternatively, run it together with the hybridpath guidance using the launch file `hybridpath.launch.py` in asv_setup

## Configuration

You can configure the behavior of the hybrid path controller by modifying the parameters in the `config` directory.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ def __init__(self):
('hybridpath_controller.kappa', 0.5)
])

self.kappa = self.get_parameter('hybridpath_controller.kappa').get_parameter_value().double_value
self.kappa_ = self.get_parameter('hybridpath_controller.kappa').get_parameter_value().double_value # Unused, currently hardcoded in AdaptiveBackstep

self.AB_controller = AdaptiveBackstep()
self.AB_controller_ = AdaptiveBackstep()

self.get_logger().info("hybridpath_controller_node started")

def hpref_cb(self, msg: HybridpathReference):
"""
Callback function for the HybridpathReference message. This function calculates the control law and publishes thruster wrench messages.
Args:
msg (HybridpathReference): The HybridpathReference message containing the reference values.
Returns:
None
"""
w_ref = msg.w_ref
v_ref = msg.v_ref
v_ref_t = msg.v_ref_t
Expand All @@ -37,133 +46,18 @@ def hpref_cb(self, msg: HybridpathReference):
eta_d_s = np.array([msg.eta_d_s.x, msg.eta_d_s.y, msg.eta_d_s.theta])
eta_d_ss = np.array([msg.eta_d_ss.x, msg.eta_d_ss.y, msg.eta_d_ss.theta])

tau = self.AB_controller.control_law(eta, nu, w_ref, v_ref, v_ref_t, v_ref_s, eta_d, eta_d_s, eta_d_ss)
tau = self.AB_controller_.control_law(eta, nu, w_ref, v_ref, v_ref_t, v_ref_s, eta_d, eta_d_s, eta_d_ss)

wrench_msg = Wrench()
wrench_msg.force.x = float(tau[0])
wrench_msg.force.y = float(tau[1]) # Assuming tau[1] is a single-element array
wrench_msg.force.y = float(tau[1])
wrench_msg.force.z = 0.0
wrench_msg.torque.x = 0.0
wrench_msg.torque.y = 0.0
wrench_msg.torque.z = float(tau[2]) # Assuming tau[2] is a single-element array
wrench_msg.torque.z = float(tau[2])

self.wrench_publisher_.publish(wrench_msg)

# def run_simulation(self, waypoints, T, dt):
# generator = HybridPathGenerator(waypoints, self.path_generator_order, self.lambda_val, 1)
# path = generator.Path
# time = np.arange(0, T, dt)

# # Initial vectors
# eta_d = np.zeros((3, len(time)))
# eta = np.zeros((3, len(time)))
# eta_d[:,0] = np.array([waypoints[0,0], waypoints[0,1], np.pi/2])
# eta[:,0] = np.array([waypoints[0,0], waypoints[0,1], np.pi/2])
# nu = np.zeros((3, len(time)))
# tau = np.zeros((3, len(time)))
# AB = AdaptiveBackstep()
# s = 0
# sig = HybridPathSignals(path, s)

# # Simulation loop
# for i in range(1,len(time)):
# if time[i] < self.time_to_max_speed :
# u_desired = 1/10 * time[i]
# else:
# u_desired = 1 # Desired speed
# v_ref, v_ref_s = sig.calc_vs(u_desired)
# s += v_ref * dt
# sig = HybridPathSignals(path, s)
# pd = sig.pd # Reference position
# psi_d = sig.psi # Reference heading
# eta_d[:,i] = np.array([pd[0], pd[1], psi_d])
# psi0 = eta_d[2,i-1]
# psi1 = eta_d[2,i]
# psi_vec = np.array([psi0, psi1])
# psi_vec = np.unwrap(psi_vec, period=2*np.pi)
# eta_d[2,i] = psi_vec[1]

# # Variables needed for tau
# #w = 0
# v_ref_t = 0 # Constant speed
# eta_d_s = np.array([sig.pd_der[0][0], sig.pd_der[0][1], sig.psi_der])
# eta_d_ss = np.array([sig.pd_der[1][0], sig.pd_der[1][1], sig.psi_dder])
# R, R_trsp = AB.R(eta[2,i-1])
# eta_error = eta[:,i-1] - eta_d[:,i]
# my = 0.5
# w = my * (eta_d_s @ R @ (R_trsp @ eta_error)) / (np.linalg.norm(eta_d_s)**2)

# tau[:,i] = AB.control_law(eta[:,i-1], nu[:,i-1], w, v_ref, v_ref_t, v_ref_s, eta_d[:,i], eta_d_s, eta_d_ss)

# # Step in nu and eta
# nu_dot = np.linalg.inv(AB.M) @ tau[:, i] - np.linalg.inv(AB.M) @ AB.D @ nu[:,i-1]
# nu[:,i] = nu[:,i-1] + nu_dot * dt
# eta_dot = R @ nu[:,i]
# eta[:,i] = eta[:,i-1] + eta_dot * dt

# psi0 = eta[2,i-1]
# psi1 = eta[2,i]
# psi_vec = np.array([psi0, psi1])
# psi_vec = np.unwrap(psi_vec, period=2*np.pi)
# eta[2,i] = psi_vec[1]

# # Plotting
# plt.figure()
# plt.plot(eta_d[1,:], eta_d[0,:], label='Reference path', zorder = 0)
# plt.plot(eta[1,:], eta[0,:], label='Actual path', zorder = 1)
# for i in range(0, len(eta_d[2]), 100):
# plt.quiver(eta[1,i], eta[0,i], np.sin(eta[2,i]), np.cos(eta[2,i]), zorder = 2)
# plt.title('Actual path vs reference path')
# plt.xlabel('y [m]')
# plt.ylabel('x [m]')
# plt.gca().set_axisbelow(True)
# plt.grid()
# plt.legend()

# plt.figure()
# plt.plot(time, eta[0,:], label='Actual x')
# plt.plot(time, eta_d[0,:], label='Reference x')
# plt.plot(time, eta[1,:], label='Actual y')
# plt.plot(time, eta_d[1,:], label='Reference y')
# plt.title('Actual position vs reference position')
# plt.xlabel('Time [s]')
# plt.ylabel('Position [m]')
# plt.gca().set_axisbelow(True)
# plt.grid()
# plt.legend()

# plt.figure()
# plt.plot(time, eta[2,:], label='Actual heading')
# plt.plot(time, eta_d[2,:], label='Reference heading')
# plt.title('Actual heading vs reference heading')
# plt.xlabel('Time [s]')
# plt.ylabel('Heading [rad]')
# plt.gca().set_axisbelow(True)
# plt.grid()
# plt.legend()

# plt.figure()
# plt.plot(time, nu[0,:], label='Surge velocity')
# plt.plot(time, nu[1,:], label='Sway velocity')
# plt.title('velocity')
# plt.xlabel('Time [s]')
# plt.ylabel('Velocity [m/s]')
# plt.gca().set_axisbelow(True)
# plt.grid()
# plt.legend()

# plt.figure()
# plt.plot(time, tau[0,:], label='Surge force')
# plt.plot(time, tau[1,:], label='Sway force')
# plt.title('Force')
# plt.xlabel('Time [s]')
# plt.ylabel('Force [N]')
# plt.gca().set_axisbelow(True)
# plt.grid()
# plt.legend()

# plt.show()

def main(args=None):
rclpy.init(args=args)
node = HybridPathControllerNode()
Expand Down

0 comments on commit e03088b

Please sign in to comment.