diff --git a/rclcpp_action/src/client.cpp b/rclcpp_action/src/client.cpp index 2d5018d5af..054730fd37 100644 --- a/rclcpp_action/src/client.cpp +++ b/rclcpp_action/src/client.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rcl_action/action_client.h" #include "rcl_action/wait.h" @@ -31,6 +32,67 @@ namespace rclcpp_action { +struct ClientBaseData +{ + struct FeedbackReadyData + { + FeedbackReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), feedback_message(msg) {} + rcl_ret_t ret; + std::shared_ptr feedback_message; + }; + struct StatusReadyData + { + StatusReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), status_message(msg) {} + rcl_ret_t ret; + std::shared_ptr status_message; + }; + struct GoalResponseData + { + GoalResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), goal_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr goal_response; + }; + struct CancelResponseData + { + CancelResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), cancel_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + }; + struct ResultResponseData + { + ResultResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), result_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr result_response; + }; + + std::variant< + FeedbackReadyData, + StatusReadyData, + GoalResponseData, + CancelResponseData, + ResultResponseData + > data; + + explicit ClientBaseData(FeedbackReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(StatusReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(GoalResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(CancelResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(ResultResponseData && data_in) + : data(std::move(data_in)) {} +}; + class ClientBaseImpl { public: @@ -94,11 +156,13 @@ class ClientBaseImpl size_t num_clients{0u}; size_t num_services{0u}; - bool is_feedback_ready{false}; - bool is_status_ready{false}; - bool is_goal_response_ready{false}; - bool is_cancel_response_ready{false}; - bool is_result_response_ready{false}; + // Lock for action_client_ + std::recursive_mutex action_client_mutex_; + + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; + // used to indicate that next_ready_event has no ready event for processing + static constexpr size_t NO_EVENT_READY = std::numeric_limits::max(); rclcpp::Context::SharedPtr context_; rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; @@ -142,6 +206,7 @@ bool ClientBase::action_server_is_ready() const { bool is_ready; + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_server_is_available( this->pimpl_->node_handle.get(), this->pimpl_->client_handle.get(), @@ -255,6 +320,7 @@ ClientBase::get_number_of_ready_services() void ClientBase::add_to_wait_set(rcl_wait_set_t * wait_set) { + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_wait_set_add_action_client( wait_set, pimpl_->client_handle.get(), nullptr, nullptr); if (RCL_RET_OK != ret) { @@ -265,23 +331,56 @@ ClientBase::add_to_wait_set(rcl_wait_set_t * wait_set) bool ClientBase::is_ready(rcl_wait_set_t * wait_set) { - rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready( - wait_set, pimpl_->client_handle.get(), - &pimpl_->is_feedback_ready, - &pimpl_->is_status_ready, - &pimpl_->is_goal_response_ready, - &pimpl_->is_cancel_response_ready, - &pimpl_->is_result_response_ready); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error( - ret, "failed to check for any ready entities"); + bool is_feedback_ready{false}; + bool is_status_ready{false}; + bool is_goal_response_ready{false}; + bool is_cancel_response_ready{false}; + bool is_result_response_ready{false}; + + rcl_ret_t ret; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + ret = rcl_action_client_wait_set_get_entities_ready( + wait_set, pimpl_->client_handle.get(), + &is_feedback_ready, + &is_status_ready, + &is_goal_response_ready, + &is_cancel_response_ready, + &is_result_response_ready); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to check for any ready entities"); + } + } + + pimpl_->next_ready_event = ClientBaseImpl::NO_EVENT_READY; + + if (is_feedback_ready) { + pimpl_->next_ready_event = static_cast(EntityType::FeedbackSubscription); + return true; + } + + if (is_status_ready) { + pimpl_->next_ready_event = static_cast(EntityType::StatusSubscription); + return true; + } + + if (is_goal_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::GoalClient); + return true; + } + + if (is_result_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::ResultClient); + return true; + } + + if (is_cancel_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::CancelClient); + return true; } - return - pimpl_->is_feedback_ready || - pimpl_->is_status_ready || - pimpl_->is_goal_response_ready || - pimpl_->is_cancel_response_ready || - pimpl_->is_result_response_ready; + + return false; } void @@ -432,7 +531,6 @@ ClientBase::set_callback_to_entity( } }; - // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has // been replaced but the middleware hasn't been told about the new one yet. @@ -550,140 +648,159 @@ ClientBase::clear_on_ready_callback() std::shared_ptr ClientBase::take_data() { - if (pimpl_->is_feedback_ready) { - std::shared_ptr feedback_message = this->create_feedback_message(); - rcl_ret_t ret = rcl_action_take_feedback( - pimpl_->client_handle.get(), feedback_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, feedback_message)); - } else if (pimpl_->is_status_ready) { - std::shared_ptr status_message = this->create_status_message(); - rcl_ret_t ret = rcl_action_take_status( - pimpl_->client_handle.get(), status_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, status_message)); - } else if (pimpl_->is_goal_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr goal_response = this->create_goal_response(); - rcl_ret_t ret = rcl_action_take_goal_response( - pimpl_->client_handle.get(), &response_header, goal_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, goal_response)); - } else if (pimpl_->is_result_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr result_response = this->create_result_response(); - rcl_ret_t ret = rcl_action_take_result_response( - pimpl_->client_handle.get(), &response_header, result_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, result_response)); - } else if (pimpl_->is_cancel_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr cancel_response = this->create_cancel_response(); - rcl_ret_t ret = rcl_action_take_cancel_response( - pimpl_->client_handle.get(), &response_header, cancel_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, cancel_response)); - } else { - throw std::runtime_error("Taking data from action client but nothing is ready"); + // next_ready_event is an atomic, caching localy + size_t next_ready_event = pimpl_->next_ready_event.exchange(ClientBaseImpl::NO_EVENT_READY); + + if (next_ready_event == ClientBaseImpl::NO_EVENT_READY) { + // there is a known bug in iron, that take_data might be called multiple + // times. Therefore instead of throwing, we just return a nullptr as a workaround. + return nullptr; } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ClientBase::take_data_by_entity_id(size_t id) { + std::shared_ptr data_ptr; + rcl_ret_t ret; + // Mark as ready the entity from which we want to take data switch (static_cast(id)) { case EntityType::GoalClient: - pimpl_->is_goal_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr goal_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + + goal_response = this->create_goal_response(); + ret = rcl_action_take_goal_response( + pimpl_->client_handle.get(), &response_header, goal_response.get()); + } + data_ptr = std::make_shared( + ClientBaseData::GoalResponseData( + ret, response_header, goal_response)); + } break; case EntityType::ResultClient: - pimpl_->is_result_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr result_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + result_response = this->create_result_response(); + ret = rcl_action_take_result_response( + pimpl_->client_handle.get(), &response_header, result_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::ResultResponseData( + ret, response_header, result_response)); + } break; case EntityType::CancelClient: - pimpl_->is_cancel_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + cancel_response = this->create_cancel_response(); + ret = rcl_action_take_cancel_response( + pimpl_->client_handle.get(), &response_header, cancel_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::CancelResponseData( + ret, response_header, cancel_response)); + } break; case EntityType::FeedbackSubscription: - pimpl_->is_feedback_ready = true; + { + std::shared_ptr feedback_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + feedback_message = this->create_feedback_message(); + ret = rcl_action_take_feedback( + pimpl_->client_handle.get(), feedback_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::FeedbackReadyData( + ret, feedback_message)); + } break; case EntityType::StatusSubscription: - pimpl_->is_status_ready = true; + { + std::shared_ptr status_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + status_message = this->create_status_message(); + ret = rcl_action_take_status( + pimpl_->client_handle.get(), status_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::StatusReadyData( + ret, status_message)); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ClientBase::execute(std::shared_ptr & data) +ClientBase::execute(std::shared_ptr & data_in) { - if (!data) { - throw std::runtime_error("'data' is empty"); + if (!data_in) { + // workaround, if take_data was called multiple timed, it returns a nullptr + // normally we should throw here, but as an API stable bug fix, we just ignore this... + return; } - if (pimpl_->is_feedback_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_feedback_ready = false; - if (RCL_RET_OK == ret) { - auto feedback_message = std::get<1>(*shared_ptr); - this->handle_feedback_message(feedback_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback"); - } - } else if (pimpl_->is_status_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_status_ready = false; - if (RCL_RET_OK == ret) { - auto status_message = std::get<1>(*shared_ptr); - this->handle_status_message(status_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status"); - } - } else if (pimpl_->is_goal_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_goal_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto goal_response = std::get<2>(*shared_ptr); - this->handle_goal_response(response_header, goal_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking goal response"); - } - } else if (pimpl_->is_result_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_result_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto result_response = std::get<2>(*shared_ptr); - this->handle_result_response(response_header, result_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking result response"); - } - } else if (pimpl_->is_cancel_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_cancel_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto cancel_response = std::get<2>(*shared_ptr); - this->handle_cancel_response(response_header, cancel_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking cancel response"); - } - } else { - throw std::runtime_error("Executing action client but nothing is ready"); - } + std::shared_ptr data_ptr = std::static_pointer_cast(data_in); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_feedback_message(data.feedback_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_status_message(data.status_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking status"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_goal_response(data.response_header, data.goal_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking goal response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_result_response(data.response_header, data.result_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_cancel_response(data.response_header, data.cancel_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking cancel response"); + } + } + }, data_ptr->data); } } // namespace rclcpp_action diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index 780b2ba20c..1f615fb89b 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "rcl_action/action_server.h" @@ -33,8 +34,50 @@ using rclcpp_action::ServerBase; using rclcpp_action::GoalUUID; +struct ServerBaseData; + namespace rclcpp_action { + +struct ServerBaseData +{ + using GoalRequestData = std::tuple< + rcl_ret_t, + const rcl_action_goal_info_t, + rmw_request_id_t, + std::shared_ptr + >; + + using CancelRequestData = std::tuple< + rcl_ret_t, + std::shared_ptr, + rmw_request_id_t + >; + + using ResultRequestData = std::tuple, rmw_request_id_t>; + + using GoalExpiredData = struct Empty {}; + + std::variant data; + + explicit ServerBaseData(GoalRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(CancelRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(ResultRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(GoalExpiredData && data_in) + : data(std::move(data_in)) {} +}; + +enum class ActionEventType : std::size_t +{ + GoalService, + ResultService, + CancelService, + Expired, +}; + class ServerBaseImpl { public: @@ -60,11 +103,6 @@ class ServerBaseImpl size_t num_services_ = 0; size_t num_guard_conditions_ = 0; - std::atomic goal_request_ready_{false}; - std::atomic cancel_request_ready_{false}; - std::atomic result_request_ready_{false}; - std::atomic goal_expired_{false}; - // Lock for unordered_maps std::recursive_mutex unordered_map_mutex_; @@ -75,8 +113,15 @@ class ServerBaseImpl // rcl goal handles are kept so api to send result doesn't try to access freed memory std::unordered_map> goal_handles_; + + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; + // used to indicate that next_ready_event has no ready event for processing + static constexpr size_t NO_EVENT_READY = std::numeric_limits::max(); + rclcpp::Logger logger_; }; + } // namespace rclcpp_action ServerBase::ServerBase( @@ -194,124 +239,170 @@ ServerBase::is_ready(rcl_wait_set_t * wait_set) &goal_expired); } - pimpl_->goal_request_ready_ = goal_request_ready; - pimpl_->cancel_request_ready_ = cancel_request_ready; - pimpl_->result_request_ready_ = result_request_ready; - pimpl_->goal_expired_ = goal_expired; - if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - return pimpl_->goal_request_ready_.load() || - pimpl_->cancel_request_ready_.load() || - pimpl_->result_request_ready_.load() || - pimpl_->goal_expired_.load(); -} + pimpl_->next_ready_event = ServerBaseImpl::NO_EVENT_READY; -std::shared_ptr -ServerBase::take_data() -{ - if (pimpl_->goal_request_ready_.load()) { - rcl_ret_t ret; - rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); - rmw_request_id_t request_header; + if (goal_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::GoalService); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + if (cancel_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::CancelService); + return true; + } - std::shared_ptr message = create_goal_request(); - ret = rcl_action_take_goal_request( - pimpl_->action_server_.get(), - &request_header, - message.get()); - - return std::static_pointer_cast( - std::make_shared - >>( - ret, - goal_info, - request_header, message)); - } else if (pimpl_->cancel_request_ready_.load()) { - rcl_ret_t ret; - rmw_request_id_t request_header; + if (result_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::ResultService); + return true; + } - // Initialize cancel request - auto request = std::make_shared(); + if (goal_expired) { + pimpl_->next_ready_event = static_cast(ActionEventType::Expired); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_cancel_request( - pimpl_->action_server_.get(), - &request_header, - request.get()); + return false; +} - return std::static_pointer_cast( - std::make_shared - , - rmw_request_id_t>>(ret, request, request_header)); - } else if (pimpl_->result_request_ready_.load()) { - rcl_ret_t ret; - // Get the result request message - rmw_request_id_t request_header; - std::shared_ptr result_request = create_result_request(); - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_result_request( - pimpl_->action_server_.get(), &request_header, result_request.get()); +std::shared_ptr +ServerBase::take_data() +{ + size_t next_ready_event = pimpl_->next_ready_event.exchange(ServerBaseImpl::NO_EVENT_READY); - return std::static_pointer_cast( - std::make_shared, rmw_request_id_t>>( - ret, result_request, request_header)); - } else if (pimpl_->goal_expired_.load()) { + if (next_ready_event == ServerBaseImpl::NO_EVENT_READY) { + // there is a known bug in iron, that take_data might be called multiple + // times. Therefore instead of throwing, we just return a nullptr as a workaround. return nullptr; - } else { - throw std::runtime_error("Taking data from action server but nothing is ready"); } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ServerBase::take_data_by_entity_id(size_t id) { + static_assert( + static_cast(EntityType::GoalService) == + static_cast(ActionEventType::GoalService)); + static_assert( + static_cast(EntityType::ResultService) == + static_cast(ActionEventType::ResultService)); + static_assert( + static_cast(EntityType::CancelService) == + static_cast(ActionEventType::CancelService)); + + std::shared_ptr data_ptr; // Mark as ready the entity from which we want to take data - switch (static_cast(id)) { - case EntityType::GoalService: - pimpl_->goal_request_ready_ = true; + switch (static_cast(id)) { + case ActionEventType::GoalService: + { + rcl_ret_t ret; + rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); + rmw_request_id_t request_header; + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + + std::shared_ptr message = create_goal_request(); + ret = rcl_action_take_goal_request( + pimpl_->action_server_.get(), + &request_header, + message.get()); + + data_ptr = std::make_shared( + ServerBaseData::GoalRequestData(ret, goal_info, request_header, message)); + } break; - case EntityType::ResultService: - pimpl_->result_request_ready_ = true; + case ActionEventType::ResultService: + { + rcl_ret_t ret; + // Get the result request message + rmw_request_id_t request_header; + std::shared_ptr result_request = create_result_request(); + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_result_request( + pimpl_->action_server_.get(), &request_header, result_request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::ResultRequestData(ret, result_request, request_header)); + } break; - case EntityType::CancelService: - pimpl_->cancel_request_ready_ = true; + case ActionEventType::CancelService: + { + rcl_ret_t ret; + rmw_request_id_t request_header; + + // Initialize cancel request + auto request = std::make_shared(); + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_cancel_request( + pimpl_->action_server_.get(), + &request_header, + request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::CancelRequestData(ret, request, request_header)); + } + break; + case ActionEventType::Expired: + { + data_ptr = + std::make_shared(ServerBaseData::GoalExpiredData()); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ServerBase::execute(std::shared_ptr & data) +ServerBase::execute(std::shared_ptr & data_in) { - if (!data && !pimpl_->goal_expired_.load()) { - throw std::runtime_error("'data' is empty"); - } - - if (pimpl_->goal_request_ready_.load()) { - execute_goal_request_received(data); - } else if (pimpl_->cancel_request_ready_.load()) { - execute_cancel_request_received(data); - } else if (pimpl_->result_request_ready_.load()) { - execute_result_request_received(data); - } else if (pimpl_->goal_expired_.load()) { - execute_check_expired_goals(); - } else { - throw std::runtime_error("Executing action server but nothing is ready"); + if (!data_in) { + // workaround, if take_data was called multiple timed, it returns a nullptr + // normally we should throw here, but as an API stable bug fix, we just ignore this... + return; } + + std::shared_ptr data_ptr = std::static_pointer_cast(data_in); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + execute_goal_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_cancel_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_result_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_check_expired_goals(); + } + }, + data_ptr->data); } void ServerBase::execute_goal_request_received(std::shared_ptr & data) { - auto shared_ptr = std::static_pointer_cast - >>(data); - rcl_ret_t ret = std::get<0>(*shared_ptr); + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::GoalRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + rcl_action_goal_info_t goal_info = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + const std::shared_ptr message = std::get<3>(gData); + if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -320,14 +411,6 @@ ServerBase::execute_goal_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - rcl_action_goal_info_t goal_info = std::get<1>(*shared_ptr); - rmw_request_id_t request_header = std::get<2>(*shared_ptr); - std::shared_ptr message = std::get<3>(*shared_ptr); - - bool expected = true; - if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) { - return; - } GoalUUID uuid = get_goal_id_from_goal_request(message.get()); convert(uuid, &goal_info); @@ -411,10 +494,15 @@ ServerBase::execute_goal_request_received(std::shared_ptr & data) void ServerBase::execute_cancel_request_received(std::shared_ptr & data) { - auto shared_ptr = std::static_pointer_cast - , - rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::CancelRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + std::shared_ptr request = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + + if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -423,9 +511,6 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); - pimpl_->cancel_request_ready_ = false; // Convert c++ message to C message rcl_action_cancel_request_t cancel_request = rcl_action_get_zero_initialized_cancel_request(); @@ -510,9 +595,14 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) void ServerBase::execute_result_request_received(std::shared_ptr & data) { - auto shared_ptr = std::static_pointer_cast - , rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::ResultRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + std::shared_ptr result_request = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -521,10 +611,7 @@ ServerBase::execute_result_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto result_request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); - pimpl_->result_request_ready_ = false; std::shared_ptr result_response; // check if the goal exists