From 7a51f00c6c23b49f02320a229326540991cb7008 Mon Sep 17 00:00:00 2001 From: mauropasse Date: Tue, 13 Aug 2024 15:41:17 +0100 Subject: [PATCH] Iron: Action IPC Fixes (#151) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixes for intra-process actions (#144) * Fixes for intra-process Actions * Fixes for Clang builds * Fix deadlock * Server to store results until client requests them * Fix feedback/result data race See https://github.com/ros2/rclcpp/issues/2451 * Add missing mutex * Check return value of intra_process_action_send --------- Co-authored-by: Mauro Passerino * Fix IPC Actions data race (#147) * Check if goal was sent through IPC before send responses * Add intra_process_action_server_is_available API to intra-process Client --------- Co-authored-by: Mauro Passerino * Fix data race in Actions: Part 2 (#148) * Fix data race in Actions: Part 2 * Fix warning - copy elision --------- Co-authored-by: Mauro Passerino * fix: Fixed race condition in action server between is_ready and take"… (#2531) * fix: Fixed race condition in action server between is_ready and take" (#2495) Some background information: is_ready, take_data and execute data may be called from different threads in any order. The code in the old state expected them to be called in series, without interruption. This lead to multiple race conditions, as the state of the pimpl objects was altered by the three functions in a non thread safe way. Co-authored-by: William Woodall Signed-off-by: Janosch Machowinski * fix: added workaround for call to double calls to take_data This adds a workaround for a known bug in the executor in iron. Signed-off-by: Janosch Machowinski --------- Signed-off-by: Janosch Machowinski Co-authored-by: Janosch Machowinski Co-authored-by: William Woodall --------- Signed-off-by: Janosch Machowinski Co-authored-by: Mauro Passerino Co-authored-by: jmachowinski Co-authored-by: Janosch Machowinski Co-authored-by: William Woodall --- .../experimental/intra_process_manager.hpp | 4 +- .../include/rclcpp_action/client.hpp | 26 +- .../include/rclcpp_action/server.hpp | 3 +- rclcpp_action/src/client.cpp | 377 +++++++++------ rclcpp_action/src/server.cpp | 432 +++++++++++++----- 5 files changed, 591 insertions(+), 251 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 26d3b8ebd5..3a410231c3 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -1061,7 +1061,7 @@ class IntraProcessManager auto ptr = MessageAllocTraits::allocate(allocator, 1); MessageAllocTraits::construct(allocator, ptr, *message); - subscription->provide_intra_process_data(std::move(MessageUniquePtr(ptr, deleter))); + subscription->provide_intra_process_data(MessageUniquePtr(ptr, deleter)); } continue; @@ -1104,7 +1104,7 @@ class IntraProcessManager MessageAllocTraits::construct(allocator, ptr, *message); ros_message_subscription->provide_intra_process_message( - std::move(MessageUniquePtr(ptr, deleter))); + MessageUniquePtr(ptr, deleter)); } } } diff --git a/rclcpp_action/include/rclcpp_action/client.hpp b/rclcpp_action/include/rclcpp_action/client.hpp index fcac2e02b3..ca62174f80 100644 --- a/rclcpp_action/include/rclcpp_action/client.hpp +++ b/rclcpp_action/include/rclcpp_action/client.hpp @@ -97,6 +97,16 @@ class ClientBase : public rclcpp::Waitable ); } + /// Return true if there is an intra-process action server that is ready to take goal requests. + bool + intra_process_action_server_is_available() + { + if (auto ipm = weak_ipm_.lock()) { + return ipm->action_server_is_available(ipc_action_client_id_); + } + return false; + } + // ------------- // Waitables API @@ -835,12 +845,18 @@ class Client : public ClientBase // the server might be available in another process or was configured to not use IPC. if (intra_process_server_available) { size_t hashed_guuid = std::hash()(goal_handle->get_goal_id()); - ipc_action_client_->store_result_response_callback( - hashed_guuid, result_response_callback); - intra_process_send_done = ipm->template intra_process_action_send_result_request( - ipc_action_client_id_, - std::move(goal_result_request)); + // Determine if goal was sent through inter or intra process by checking the goal ID + bool goal_sent_by_ipc = ipm->get_action_client_id_from_goal_uuid(hashed_guuid); + + if (goal_sent_by_ipc) { + ipc_action_client_->store_result_response_callback( + hashed_guuid, result_response_callback); + + intra_process_send_done = ipm->template intra_process_action_send_result_request( + ipc_action_client_id_, + std::move(goal_result_request)); + } } } diff --git a/rclcpp_action/include/rclcpp_action/server.hpp b/rclcpp_action/include/rclcpp_action/server.hpp index e1a78c7e47..0f598270e9 100644 --- a/rclcpp_action/include/rclcpp_action/server.hpp +++ b/rclcpp_action/include/rclcpp_action/server.hpp @@ -675,7 +675,8 @@ class Server : public ServerBase, public std::enable_shared_from_thisaction_name_.c_str()); + this->action_name_); + return true; } diff --git a/rclcpp_action/src/client.cpp b/rclcpp_action/src/client.cpp index 40e1804dc3..1f16f25aa9 100644 --- a/rclcpp_action/src/client.cpp +++ b/rclcpp_action/src/client.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rcl_action/action_client.h" #include "rcl_action/wait.h" @@ -31,6 +32,67 @@ namespace rclcpp_action { +struct ClientBaseData +{ + struct FeedbackReadyData + { + FeedbackReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), feedback_message(msg) {} + rcl_ret_t ret; + std::shared_ptr feedback_message; + }; + struct StatusReadyData + { + StatusReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), status_message(msg) {} + rcl_ret_t ret; + std::shared_ptr status_message; + }; + struct GoalResponseData + { + GoalResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), goal_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr goal_response; + }; + struct CancelResponseData + { + CancelResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), cancel_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + }; + struct ResultResponseData + { + ResultResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), result_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr result_response; + }; + + std::variant< + FeedbackReadyData, + StatusReadyData, + GoalResponseData, + CancelResponseData, + ResultResponseData + > data; + + explicit ClientBaseData(FeedbackReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(StatusReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(GoalResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(CancelResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(ResultResponseData && data_in) + : data(std::move(data_in)) {} +}; + class ClientBaseImpl { public: @@ -94,11 +156,13 @@ class ClientBaseImpl size_t num_clients{0u}; size_t num_services{0u}; - bool is_feedback_ready{false}; - bool is_status_ready{false}; - bool is_goal_response_ready{false}; - bool is_cancel_response_ready{false}; - bool is_result_response_ready{false}; + // Lock for action_client_ + std::recursive_mutex action_client_mutex_; + + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; + // used to indicate that next_ready_event has no ready event for processing + static constexpr size_t NO_EVENT_READY = std::numeric_limits::max(); rclcpp::Context::SharedPtr context_; rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; @@ -142,6 +206,7 @@ bool ClientBase::action_server_is_ready() const { bool is_ready; + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_server_is_available( this->pimpl_->node_handle.get(), this->pimpl_->client_handle.get(), @@ -255,6 +320,7 @@ ClientBase::get_number_of_ready_services() void ClientBase::add_to_wait_set(rcl_wait_set_t * wait_set) { + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_wait_set_add_action_client( wait_set, pimpl_->client_handle.get(), nullptr, nullptr); if (RCL_RET_OK != ret) { @@ -265,23 +331,56 @@ ClientBase::add_to_wait_set(rcl_wait_set_t * wait_set) bool ClientBase::is_ready(rcl_wait_set_t * wait_set) { - rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready( - wait_set, pimpl_->client_handle.get(), - &pimpl_->is_feedback_ready, - &pimpl_->is_status_ready, - &pimpl_->is_goal_response_ready, - &pimpl_->is_cancel_response_ready, - &pimpl_->is_result_response_ready); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error( - ret, "failed to check for any ready entities"); + bool is_feedback_ready{false}; + bool is_status_ready{false}; + bool is_goal_response_ready{false}; + bool is_cancel_response_ready{false}; + bool is_result_response_ready{false}; + + rcl_ret_t ret; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + ret = rcl_action_client_wait_set_get_entities_ready( + wait_set, pimpl_->client_handle.get(), + &is_feedback_ready, + &is_status_ready, + &is_goal_response_ready, + &is_cancel_response_ready, + &is_result_response_ready); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to check for any ready entities"); + } + } + + pimpl_->next_ready_event = ClientBaseImpl::NO_EVENT_READY; + + if (is_feedback_ready) { + pimpl_->next_ready_event = static_cast(EntityType::FeedbackSubscription); + return true; + } + + if (is_status_ready) { + pimpl_->next_ready_event = static_cast(EntityType::StatusSubscription); + return true; + } + + if (is_goal_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::GoalClient); + return true; + } + + if (is_result_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::ResultClient); + return true; + } + + if (is_cancel_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::CancelClient); + return true; } - return - pimpl_->is_feedback_ready || - pimpl_->is_status_ready || - pimpl_->is_goal_response_ready || - pimpl_->is_cancel_response_ready || - pimpl_->is_result_response_ready; + + return false; } void @@ -432,7 +531,6 @@ ClientBase::set_callback_to_entity( } }; - // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has // been replaced but the middleware hasn't been told about the new one yet. @@ -550,140 +648,159 @@ ClientBase::clear_on_ready_callback() std::shared_ptr ClientBase::take_data() { - if (pimpl_->is_goal_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr goal_response = this->create_goal_response(); - rcl_ret_t ret = rcl_action_take_goal_response( - pimpl_->client_handle.get(), &response_header, goal_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, goal_response)); - } else if (pimpl_->is_result_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr result_response = this->create_result_response(); - rcl_ret_t ret = rcl_action_take_result_response( - pimpl_->client_handle.get(), &response_header, result_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, result_response)); - } else if (pimpl_->is_cancel_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr cancel_response = this->create_cancel_response(); - rcl_ret_t ret = rcl_action_take_cancel_response( - pimpl_->client_handle.get(), &response_header, cancel_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, cancel_response)); - } else if (pimpl_->is_feedback_ready) { - std::shared_ptr feedback_message = this->create_feedback_message(); - rcl_ret_t ret = rcl_action_take_feedback( - pimpl_->client_handle.get(), feedback_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, feedback_message)); - } else if (pimpl_->is_status_ready) { - std::shared_ptr status_message = this->create_status_message(); - rcl_ret_t ret = rcl_action_take_status( - pimpl_->client_handle.get(), status_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, status_message)); - } else { - throw std::runtime_error("Taking data from action client but nothing is ready"); + // next_ready_event is an atomic, caching localy + size_t next_ready_event = pimpl_->next_ready_event.exchange(ClientBaseImpl::NO_EVENT_READY); + + if (next_ready_event == ClientBaseImpl::NO_EVENT_READY) { + // there is a known bug in iron, that take_data might be called multiple + // times. Therefore instead of throwing, we just return a nullptr as a workaround. + return nullptr; } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ClientBase::take_data_by_entity_id(size_t id) { + std::shared_ptr data_ptr; + rcl_ret_t ret; + // Mark as ready the entity from which we want to take data switch (static_cast(id)) { case EntityType::GoalClient: - pimpl_->is_goal_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr goal_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + + goal_response = this->create_goal_response(); + ret = rcl_action_take_goal_response( + pimpl_->client_handle.get(), &response_header, goal_response.get()); + } + data_ptr = std::make_shared( + ClientBaseData::GoalResponseData( + ret, response_header, goal_response)); + } break; case EntityType::ResultClient: - pimpl_->is_result_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr result_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + result_response = this->create_result_response(); + ret = rcl_action_take_result_response( + pimpl_->client_handle.get(), &response_header, result_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::ResultResponseData( + ret, response_header, result_response)); + } break; case EntityType::CancelClient: - pimpl_->is_cancel_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + cancel_response = this->create_cancel_response(); + ret = rcl_action_take_cancel_response( + pimpl_->client_handle.get(), &response_header, cancel_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::CancelResponseData( + ret, response_header, cancel_response)); + } break; case EntityType::FeedbackSubscription: - pimpl_->is_feedback_ready = true; + { + std::shared_ptr feedback_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + feedback_message = this->create_feedback_message(); + ret = rcl_action_take_feedback( + pimpl_->client_handle.get(), feedback_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::FeedbackReadyData( + ret, feedback_message)); + } break; case EntityType::StatusSubscription: - pimpl_->is_status_ready = true; + { + std::shared_ptr status_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + status_message = this->create_status_message(); + ret = rcl_action_take_status( + pimpl_->client_handle.get(), status_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::StatusReadyData( + ret, status_message)); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ClientBase::execute(std::shared_ptr & data) +ClientBase::execute(std::shared_ptr & data_in) { - if (!data) { - throw std::runtime_error("'data' is empty"); + if (!data_in) { + // workaround, if take_data was called multiple timed, it returns a nullptr + // normally we should throw here, but as an API stable bug fix, we just ignore this... + return; } - if (pimpl_->is_goal_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_goal_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto goal_response = std::get<2>(*shared_ptr); - this->handle_goal_response(response_header, goal_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking goal response"); - } - } else if (pimpl_->is_result_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_result_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto result_response = std::get<2>(*shared_ptr); - this->handle_result_response(response_header, result_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking result response"); - } - } else if (pimpl_->is_cancel_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_cancel_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto cancel_response = std::get<2>(*shared_ptr); - this->handle_cancel_response(response_header, cancel_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking cancel response"); - } - } else if (pimpl_->is_feedback_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_feedback_ready = false; - if (RCL_RET_OK == ret) { - auto feedback_message = std::get<1>(*shared_ptr); - this->handle_feedback_message(feedback_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback"); - } - } else if (pimpl_->is_status_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_status_ready = false; - if (RCL_RET_OK == ret) { - auto status_message = std::get<1>(*shared_ptr); - this->handle_status_message(status_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status"); - } - } else { - throw std::runtime_error("Executing action client but nothing is ready"); - } + std::shared_ptr data_ptr = std::static_pointer_cast(data_in); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_feedback_message(data.feedback_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_status_message(data.status_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking status"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_goal_response(data.response_header, data.goal_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking goal response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_result_response(data.response_header, data.result_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_cancel_response(data.response_header, data.cancel_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking cancel response"); + } + } + }, data_ptr->data); } void diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index 1da91d9c45..78934d17ed 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "rcl_action/action_server.h" @@ -33,8 +34,50 @@ using rclcpp_action::ServerBase; using rclcpp_action::GoalUUID; +struct ServerBaseData; + namespace rclcpp_action { + +struct ServerBaseData +{ + using GoalRequestData = std::tuple< + rcl_ret_t, + const rcl_action_goal_info_t, + rmw_request_id_t, + std::shared_ptr + >; + + using CancelRequestData = std::tuple< + rcl_ret_t, + std::shared_ptr, + rmw_request_id_t + >; + + using ResultRequestData = std::tuple, rmw_request_id_t>; + + using GoalExpiredData = struct Empty {}; + + std::variant data; + + explicit ServerBaseData(GoalRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(CancelRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(ResultRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(GoalExpiredData && data_in) + : data(std::move(data_in)) {} +}; + +enum class ActionEventType : std::size_t +{ + GoalService, + ResultService, + CancelService, + Expired, +}; + class ServerBaseImpl { public: @@ -60,11 +103,6 @@ class ServerBaseImpl size_t num_services_ = 0; size_t num_guard_conditions_ = 0; - std::atomic goal_request_ready_{false}; - std::atomic cancel_request_ready_{false}; - std::atomic result_request_ready_{false}; - std::atomic goal_expired_{false}; - // Lock for unordered_maps std::recursive_mutex unordered_map_mutex_; @@ -75,8 +113,15 @@ class ServerBaseImpl // rcl goal handles are kept so api to send result doesn't try to access freed memory std::unordered_map> goal_handles_; + + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; + // used to indicate that next_ready_event has no ready event for processing + static constexpr size_t NO_EVENT_READY = std::numeric_limits::max(); + rclcpp::Logger logger_; }; + } // namespace rclcpp_action ServerBase::ServerBase( @@ -194,124 +239,170 @@ ServerBase::is_ready(rcl_wait_set_t * wait_set) &goal_expired); } - pimpl_->goal_request_ready_ = goal_request_ready; - pimpl_->cancel_request_ready_ = cancel_request_ready; - pimpl_->result_request_ready_ = result_request_ready; - pimpl_->goal_expired_ = goal_expired; - if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - return pimpl_->goal_request_ready_.load() || - pimpl_->cancel_request_ready_.load() || - pimpl_->result_request_ready_.load() || - pimpl_->goal_expired_.load(); -} + pimpl_->next_ready_event = ServerBaseImpl::NO_EVENT_READY; -std::shared_ptr -ServerBase::take_data() -{ - if (pimpl_->goal_request_ready_.load()) { - rcl_ret_t ret; - rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); - rmw_request_id_t request_header; + if (goal_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::GoalService); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + if (cancel_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::CancelService); + return true; + } - std::shared_ptr message = create_goal_request(); - ret = rcl_action_take_goal_request( - pimpl_->action_server_.get(), - &request_header, - message.get()); - - return std::static_pointer_cast( - std::make_shared - >>( - ret, - goal_info, - request_header, message)); - } else if (pimpl_->cancel_request_ready_.load()) { - rcl_ret_t ret; - rmw_request_id_t request_header; + if (result_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::ResultService); + return true; + } - // Initialize cancel request - auto request = std::make_shared(); + if (goal_expired) { + pimpl_->next_ready_event = static_cast(ActionEventType::Expired); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_cancel_request( - pimpl_->action_server_.get(), - &request_header, - request.get()); + return false; +} - return std::static_pointer_cast( - std::make_shared - , - rmw_request_id_t>>(ret, request, request_header)); - } else if (pimpl_->result_request_ready_.load()) { - rcl_ret_t ret; - // Get the result request message - rmw_request_id_t request_header; - std::shared_ptr result_request = create_result_request(); - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_result_request( - pimpl_->action_server_.get(), &request_header, result_request.get()); +std::shared_ptr +ServerBase::take_data() +{ + size_t next_ready_event = pimpl_->next_ready_event.exchange(ServerBaseImpl::NO_EVENT_READY); - return std::static_pointer_cast( - std::make_shared, rmw_request_id_t>>( - ret, result_request, request_header)); - } else if (pimpl_->goal_expired_.load()) { + if (next_ready_event == ServerBaseImpl::NO_EVENT_READY) { + // there is a known bug in iron, that take_data might be called multiple + // times. Therefore instead of throwing, we just return a nullptr as a workaround. return nullptr; - } else { - throw std::runtime_error("Taking data from action server but nothing is ready"); } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ServerBase::take_data_by_entity_id(size_t id) { + static_assert( + static_cast(EntityType::GoalService) == + static_cast(ActionEventType::GoalService)); + static_assert( + static_cast(EntityType::ResultService) == + static_cast(ActionEventType::ResultService)); + static_assert( + static_cast(EntityType::CancelService) == + static_cast(ActionEventType::CancelService)); + + std::shared_ptr data_ptr; // Mark as ready the entity from which we want to take data - switch (static_cast(id)) { - case EntityType::GoalService: - pimpl_->goal_request_ready_ = true; + switch (static_cast(id)) { + case ActionEventType::GoalService: + { + rcl_ret_t ret; + rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); + rmw_request_id_t request_header; + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + + std::shared_ptr message = create_goal_request(); + ret = rcl_action_take_goal_request( + pimpl_->action_server_.get(), + &request_header, + message.get()); + + data_ptr = std::make_shared( + ServerBaseData::GoalRequestData(ret, goal_info, request_header, message)); + } break; - case EntityType::ResultService: - pimpl_->result_request_ready_ = true; + case ActionEventType::ResultService: + { + rcl_ret_t ret; + // Get the result request message + rmw_request_id_t request_header; + std::shared_ptr result_request = create_result_request(); + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_result_request( + pimpl_->action_server_.get(), &request_header, result_request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::ResultRequestData(ret, result_request, request_header)); + } break; - case EntityType::CancelService: - pimpl_->cancel_request_ready_ = true; + case ActionEventType::CancelService: + { + rcl_ret_t ret; + rmw_request_id_t request_header; + + // Initialize cancel request + auto request = std::make_shared(); + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_cancel_request( + pimpl_->action_server_.get(), + &request_header, + request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::CancelRequestData(ret, request, request_header)); + } + break; + case ActionEventType::Expired: + { + data_ptr = + std::make_shared(ServerBaseData::GoalExpiredData()); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ServerBase::execute(std::shared_ptr & data) +ServerBase::execute(std::shared_ptr & data_in) { - if (!data && !pimpl_->goal_expired_.load()) { - throw std::runtime_error("'data' is empty"); + if (!data_in) { + // workaround, if take_data was called multiple timed, it returns a nullptr + // normally we should throw here, but as an API stable bug fix, we just ignore this... + return; } - if (pimpl_->goal_request_ready_.load()) { - execute_goal_request_received(data); - } else if (pimpl_->cancel_request_ready_.load()) { - execute_cancel_request_received(data); - } else if (pimpl_->result_request_ready_.load()) { - execute_result_request_received(data); - } else if (pimpl_->goal_expired_.load()) { - execute_check_expired_goals(); - } else { - throw std::runtime_error("Executing action server but nothing is ready"); - } + std::shared_ptr data_ptr = std::static_pointer_cast(data_in); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + execute_goal_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_cancel_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_result_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_check_expired_goals(); + } + }, + data_ptr->data); } void ServerBase::execute_goal_request_received(std::shared_ptr & data) { - auto shared_ptr = std::static_pointer_cast - >>(data); - rcl_ret_t ret = std::get<0>(*shared_ptr); + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::GoalRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + rcl_action_goal_info_t goal_info = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + const std::shared_ptr message = std::get<3>(gData); + if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -320,14 +411,6 @@ ServerBase::execute_goal_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - rcl_action_goal_info_t goal_info = std::get<1>(*shared_ptr); - rmw_request_id_t request_header = std::get<2>(*shared_ptr); - std::shared_ptr message = std::get<3>(*shared_ptr); - - bool expected = true; - if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) { - return; - } GoalUUID uuid = get_goal_id_from_goal_request(message.get()); convert(uuid, &goal_info); @@ -361,8 +444,36 @@ ServerBase::execute_goal_request_received(std::shared_ptr & data) // if goal is accepted, create a goal handle, and store it if (GoalResponse::ACCEPT_AND_EXECUTE == status || GoalResponse::ACCEPT_AND_DEFER == status) { RCLCPP_DEBUG(pimpl_->logger_, "Accepted goal %s", to_string(uuid).c_str()); + // rcl_action will set time stamp + auto deleter = [](rcl_action_goal_handle_t * ptr) + { + if (nullptr != ptr) { + rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr); + if (RCL_RET_OK != fail_ret) { + RCLCPP_DEBUG( + rclcpp::get_logger("rclcpp_action"), + "failed to fini rcl_action_goal_handle_t in deleter"); + } + delete ptr; + } + }; + rcl_action_goal_handle_t * rcl_handle; + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info); + } + if (!rcl_handle) { + throw std::runtime_error("Failed to accept new goal\n"); + } - auto handle = get_rcl_action_goal_handle(goal_info, uuid); + std::shared_ptr handle(new rcl_action_goal_handle_t, deleter); + // Copy out goal handle since action server storage disappears when it is fini'd + *handle = *rcl_handle; + + { + std::lock_guard lock(pimpl_->unordered_map_mutex_); + pimpl_->goal_handles_[uuid] = handle; + } if (GoalResponse::ACCEPT_AND_EXECUTE == status) { // Change status to executing @@ -479,12 +590,15 @@ ServerBase::process_cancel_request(rcl_action_cancel_request_t & cancel_request) void ServerBase::execute_cancel_request_received(std::shared_ptr & data) { - pimpl_->cancel_request_ready_ = false; + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::CancelRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + std::shared_ptr request = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + - auto shared_ptr = std::static_pointer_cast - , - rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -493,8 +607,6 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); // Convert c++ message to C message rcl_action_cancel_request_t cancel_request = rcl_action_get_zero_initialized_cancel_request(); @@ -502,7 +614,53 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) cancel_request.goal_info.stamp.sec = request->goal_info.stamp.sec; cancel_request.goal_info.stamp.nanosec = request->goal_info.stamp.nanosec; - auto response = process_cancel_request(cancel_request); + // Get a list of goal info that should be attempted to be cancelled + rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response(); + + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_process_cancel_request( + pimpl_->action_server_.get(), + &cancel_request, + &cancel_response); + } + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + RCPPUTILS_SCOPE_EXIT( + { + ret = rcl_action_cancel_response_fini(&cancel_response); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR(pimpl_->logger_, "Failed to fini cancel response"); + } + }); + + auto response = std::make_shared(); + + response->return_code = cancel_response.msg.return_code; + auto & goals = cancel_response.msg.goals_canceling; + // For each canceled goal, call cancel callback + for (size_t i = 0; i < goals.size; ++i) { + const rcl_action_goal_info_t & goal_info = goals.data[i]; + GoalUUID uuid; + convert(goal_info, &uuid); + auto response_code = call_handle_cancel_callback(uuid); + if (CancelResponse::ACCEPT == response_code) { + action_msgs::msg::GoalInfo cpp_info; + cpp_info.goal_id.uuid = uuid; + cpp_info.stamp.sec = goal_info.stamp.sec; + cpp_info.stamp.nanosec = goal_info.stamp.nanosec; + response->goals_canceling.push_back(cpp_info); + } + } + + // If the user rejects all individual requests to cancel goals, + // then we consider the top-level cancel request as rejected. + if (goals.size >= 1u && 0u == response->goals_canceling.size()) { + response->return_code = action_msgs::srv::CancelGoal::Response::ERROR_REJECTED; + } if (!response->goals_canceling.empty()) { // at least one goal state changed, publish a new status message @@ -533,9 +691,14 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) void ServerBase::execute_result_request_received(std::shared_ptr & data) { - auto shared_ptr = std::static_pointer_cast - , rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::ResultRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + std::shared_ptr result_request = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -544,10 +707,7 @@ ServerBase::execute_result_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto result_request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); - pimpl_->result_request_ready_ = false; std::shared_ptr result_response; // check if the goal exists @@ -678,10 +838,56 @@ ServerBase::get_status_array() void ServerBase::publish_status() { - auto status_msg = get_status_array(); + rcl_ret_t ret; + + // We need to hold the lock across this entire method because + // rcl_action_server_get_goal_handles() returns an internal pointer to the + // goal data. + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + + // Get all goal handles known to C action server + rcl_action_goal_handle_t ** goal_handles = NULL; + size_t num_goals = 0; + ret = rcl_action_server_get_goal_handles( + pimpl_->action_server_.get(), &goal_handles, &num_goals); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + auto status_msg = std::make_shared(); + status_msg->status_list.reserve(num_goals); + // Populate a c++ status message with the goals and their statuses + rcl_action_goal_status_array_t c_status_array = + rcl_action_get_zero_initialized_goal_status_array(); + ret = rcl_action_get_goal_status_array(pimpl_->action_server_.get(), &c_status_array); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + RCPPUTILS_SCOPE_EXIT( + { + ret = rcl_action_goal_status_array_fini(&c_status_array); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR(pimpl_->logger_, "Failed to fini status array message"); + } + }); + + for (size_t i = 0; i < c_status_array.msg.status_list.size; ++i) { + auto & c_status_msg = c_status_array.msg.status_list.data[i]; + + action_msgs::msg::GoalStatus msg; + msg.status = c_status_msg.status; + // Convert C goal info to C++ goal info + convert(c_status_msg.goal_info, &msg.goal_info.goal_id.uuid); + msg.goal_info.stamp.sec = c_status_msg.goal_info.stamp.sec; + msg.goal_info.stamp.nanosec = c_status_msg.goal_info.stamp.nanosec; + + status_msg->status_list.push_back(msg); + } // Publish the message through the status publisher - rcl_ret_t ret = rcl_action_publish_status(pimpl_->action_server_.get(), status_msg.get()); + ret = rcl_action_publish_status(pimpl_->action_server_.get(), status_msg.get()); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret);