diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 4b75cc8711..38e51c7f79 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -467,6 +467,14 @@ class Executor void execute_any_executable(AnyExecutable & any_exec); + /// Trigger the notify guard condition to wake up the executor. + /** + * This function is thread safe. + */ + RCLCPP_PUBLIC + void + trigger_executor_notify(); + /// Run subscription executable. /** * Do necessary setup and tear-down as well as executing the subscription. @@ -520,6 +528,27 @@ class Executor void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// Prepare the wait set to bee waited on. + /** + * Builds a set of waitable entities, which are passed to the middleware. + * After building wait set, waits on middleware to notify. + * \throws std::runtime_error if the wait set can be cleared + */ + RCLCPP_PUBLIC + void + prepare_work(); + + /// Block until more work becomes avilable or timeout is reached. + /** + * Builds a set of waitable entities, which are passed to the middleware. + * After building wait set, waits on middleware to notify. + * \param[in] timeout duration to wait for new work to become available. + * \throws std::runtime_error if the wait set can be cleared + */ + RCLCPP_PUBLIC + void + wait_for_work_simple(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// Check for executable in ready state and populate union structure. /** * \param[out] any_executable populated union structure of ready executable @@ -568,6 +597,10 @@ class Executor mutable std::mutex mutex_; + mutable std::mutex notify_mutex_; + + std::vector wait_for_work_cbgs_; + /// The context associated with this executor. std::shared_ptr context_; @@ -595,6 +628,10 @@ class Executor rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_); std::optional> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Previous WaitSet to be waited on. + rclcpp::WaitSet previous_wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::optional> previous_wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Hold the current state of the collection being waited on by the waitset rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY( mutex_); diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 70e2f69989..3533179918 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -26,6 +26,7 @@ #include "rcl/allocator.h" #include "rcl/error_handling.h" #include "rclcpp/executors/executor_notify_waitable.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/subscription_wait_set_mask.hpp" #include "rcpputils/scope_exit.hpp" @@ -496,9 +497,18 @@ Executor::execute_any_executable(AnyExecutable & any_exec) const std::shared_ptr & const_data = any_exec.data; any_exec.waitable->execute(const_data); } +} - // Reset the callback_group, regardless of type - any_exec.callback_group->can_be_taken_from().store(true); +void +Executor::trigger_executor_notify() +{ + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group change: ") + ex.what()); + } } template @@ -770,6 +780,53 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) } } +void Executor::prepare_work() +{ + { + std::lock_guard guard(mutex_); + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Rebuilding executor entities"); + this->collect_entities(); + } + + auto callback_groups = this->collector_.get_all_callback_groups(); + wait_for_work_cbgs_.resize(callback_groups.size()); + for(const auto & w_ptr : callback_groups) { + auto shr_ptr = w_ptr.lock(); + if(shr_ptr) { + wait_for_work_cbgs_.push_back(std::move(shr_ptr)); + } + } + } +} + +void +Executor::wait_for_work_simple(std::chrono::nanoseconds timeout) +{ + TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); + + this->wait_result_.reset(); + this->wait_result_.emplace(wait_set_.wait(timeout)); + + // drop references to the callback groups, before trying to execute anything + wait_for_work_cbgs_.clear(); + + if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in wait(). This should never happen."); + } else { + if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Getting wait set"); + auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); + if (current_notify_waitable_->is_ready(rcl_wait_set)) { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Executing notify waitable"); + current_notify_waitable_->execute(current_notify_waitable_->take_data()); + } + } + } +} + bool Executor::get_next_ready_executable(AnyExecutable & any_executable) { @@ -896,6 +953,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) bool Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout) { + notify_mutex_.lock(); bool success = false; // Check to see if there are any subscriptions or timers needing service // TODO(wjwwood): improve run to run efficiency of this function @@ -903,6 +961,8 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos // If there are none if (!success) { // Wait for subscriptions or timers to work on + prepare_work(); + notify_mutex_.unlock(); wait_for_work(timeout); if (!spinning.load()) { return false; @@ -910,6 +970,9 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos // Try again success = get_next_ready_executable(any_executable); } + else { + notify_mutex_.unlock(); + } return success; } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 378fdfb8fa..1752747a25 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -98,15 +98,15 @@ MultiThreadedExecutor::run([[maybe_unused]] size_t this_thread_number) execute_any_executable(any_exec); - if (any_exec.callback_group && - any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group change: ") + ex.what()); + std::lock_guard wait_lock{notify_mutex_}; + + any_exec.callback_group->can_be_taken_from().store(true); + + if (any_exec.callback_group && + any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) + { + trigger_executor_notify(); } } diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index 09dfa03f90..59c9735458 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -102,3 +102,54 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) { executor.add_node(node); executor.spin(); } + +/* + Test that no tasks are starved + */ +TEST_F(TestMultiThreadedExecutor, starvation) { + rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), + 2u); + + std::shared_ptr node = + std::make_shared("test_multi_threaded_executor_starvation"); + + std::atomic_int timer_one_count{0}; + std::atomic_int timer_two_count{0}; + + rclcpp::TimerBase::SharedPtr timer_one; + rclcpp::TimerBase::SharedPtr timer_two; + + auto timer_one_callback = [&timer_one_count, &timer_two_count]() { + std::this_thread::sleep_for(100ms); + + timer_one_count++; + + auto diff = std::abs(timer_one_count - timer_two_count); + + std::cout << "Difference in counts: " << diff << std::endl; + + if (timer_one_count > 10 || timer_two_count > 10) { + rclcpp::shutdown(); + ASSERT_LE(diff, 1); + } + }; + + auto timer_two_callback = [&timer_one_count, &timer_two_count]() { + std::this_thread::sleep_for(100ms); + + timer_two_count++; + + auto diff = std::abs(timer_one_count - timer_two_count); + + if (timer_one_count > 10 || timer_two_count > 10) { + rclcpp::shutdown(); + ASSERT_LE(diff, 1); + } + }; + + timer_one = node->create_wall_timer(10ms, timer_one_callback); + timer_two = node->create_wall_timer(10ms, timer_two_callback); + + executor.add_node(node); + executor.spin(); +}