diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp index 517894a2a2..ef08492fe8 100644 --- a/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp +++ b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -72,24 +73,28 @@ void update_entities( std::function on_removed ) { - for (auto it = update_to.begin(); it != update_to.end(); ) { + for (auto it = update_to.begin_ordered(); it != update_to.end_ordered(); ) { if (update_from.count(it->first) == 0) { auto entity = it->second.entity.lock(); if (entity) { on_removed(entity); } - it = update_to.erase(it); + it = update_to.erase_ordered(it); } else { ++it; } } - for (auto it = update_from.begin(); it != update_from.end(); ++it) { + for (auto it = update_from.begin_ordered(); it != update_from.end_ordered(); ++it) { if (update_to.count(it->first) == 0) { auto entity = it->second.entity.lock(); if (entity) { on_added(entity); } - update_to.insert(*it); + bool inserted = update_to.insert(*it); + // Should never be false, so this is a defensive check, mark unused too + // in order to avoid a warning in release builds. + assert(inserted); + RCUTILS_UNUSED(inserted); } } } @@ -97,9 +102,18 @@ void update_entities( /// A collection of entities, indexed by their corresponding handles template class EntityCollection - : public std::unordered_map> { public: + /// Type of the map used for random access + using MapType = std::unordered_map>; + + /// Type of the vector for insertion order access + // Note, we cannot use typename MapType::value_type because it makes the first + // item in the pair const, which prevents copy assignment of the pair, which + // prevents std::vector::erase from working later... + using VectorType = std::vector>>; + /// Key type of the map using Key = const EntityKeyType *; @@ -125,6 +139,125 @@ class EntityCollection { update_entities(other, *this, on_added, on_removed); } + + // Below are some forwarded functions to the map and vector as appropriate. + + typename MapType::size_type count(const Key & key) const + { + return map_.count(key); + } + + typename MapType::iterator begin() + { + return map_.begin(); + } + + typename MapType::const_iterator begin() const + { + return map_.begin(); + } + + typename MapType::iterator end() + { + return map_.end(); + } + + typename MapType::const_iterator end() const + { + return map_.end(); + } + + typename VectorType::iterator begin_ordered() + { + return insertion_order_.begin(); + } + + typename VectorType::const_iterator begin_ordered() const + { + return insertion_order_.begin(); + } + + typename VectorType::iterator end_ordered() + { + return insertion_order_.end(); + } + + typename VectorType::const_iterator end_ordered() const + { + return insertion_order_.end(); + } + + typename MapType::const_iterator find(const Key & key) const + { + return map_.find(key); + } + + bool empty() const noexcept + { + return insertion_order_.empty(); + } + + typename VectorType::size_type size() const noexcept + { + return insertion_order_.size(); + } + + typename MapType::iterator erase(typename MapType::const_iterator pos) + { + // from: https://en.cppreference.com/w/cpp/container/unordered_map/erase + // The iterator pos must be valid and dereferenceable. + // Thus the end() iterator (which is valid, but is not dereferenceable) + // cannot be used as a value for pos. + // + // Therefore we can use pos-> here safely. + insertion_order_.erase( + std::remove_if( + insertion_order_.begin(), + insertion_order_.end(), + [&pos](auto value) { + return value.first == pos->first; + })); + return map_.erase(pos); + } + + typename VectorType::iterator erase_ordered(typename VectorType::const_iterator pos) + { + // from: https://en.cppreference.com/w/cpp/container/vector/erase + // The iterator pos must be valid and dereferenceable. Thus the end + // () iterator (which is valid, but is not dereferenceable) cannot be used + // as a value for pos. + // + // Therefore we can use pos-> here safely. + + assert(map_.erase(pos->first) == 1); + return insertion_order_.erase(pos); + } + + void clear() noexcept + { + insertion_order_.clear(); + return map_.clear(); + } + + /// Insert into the collection and return true if inserted, otherwise false + /** + * Insertion will fail, and return false, if attempting to insert a duplicate + * entity. + */ + [[nodiscard]] + bool insert(typename MapType::value_type value) + { + if (!map_.insert(value).second) { + // attempting to insert a duplicate entity + return false; + } + insertion_order_.push_back(value); + return true; + } + +private: + MapType map_; + VectorType insertion_order_; }; /// Represent the total set of entities for a single executor diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 95b0d7fcc8..c4ffa47928 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -673,7 +673,11 @@ Executor::collect_entities() current_notify_waitable_ = std::make_shared( *notify_waitable_); auto notify_waitable = std::static_pointer_cast(current_notify_waitable_); - collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}}); + bool inserted = collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}}); + // Should never be false, so this is a defensive check, mark unused too + // in order to avoid a warning in release builds. + assert(inserted); + RCUTILS_UNUSED(inserted); } // We must remove expired entities here, so that we don't continue to use older entities. diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index 68ac56b656..c1e18cb50e 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -43,10 +43,10 @@ size_t ExecutorEntitiesCollection::remove_expired_entities() { auto remove_entities = [](auto & collection) -> size_t { size_t removed = 0; - for (auto it = collection.begin(); it != collection.end(); ) { + for (auto it = collection.begin_ordered(); it != collection.end_ordered(); ) { if (it->second.entity.expired()) { ++removed; - it = collection.erase(it); + it = collection.erase_ordered(it); } else { ++it; } @@ -79,39 +79,59 @@ build_entities_collection( if (group_ptr->can_be_taken_from().load()) { group_ptr->collect_all_ptrs( [&collection, weak_group_ptr](const rclcpp::SubscriptionBase::SharedPtr & subscription) { - collection.subscriptions.insert( + bool inserted = collection.subscriptions.insert( { subscription->get_subscription_handle().get(), {subscription, weak_group_ptr} }); + // Should never be false, so this is a defensive check, mark unused too + // in order to avoid a warning in release builds. + assert(inserted); + RCUTILS_UNUSED(inserted); }, [&collection, weak_group_ptr](const rclcpp::ServiceBase::SharedPtr & service) { - collection.services.insert( + bool inserted = collection.services.insert( { service->get_service_handle().get(), {service, weak_group_ptr} }); + // Should never be false, so this is a defensive check, mark unused too + // in order to avoid a warning in release builds. + assert(inserted); + RCUTILS_UNUSED(inserted); }, [&collection, weak_group_ptr](const rclcpp::ClientBase::SharedPtr & client) { - collection.clients.insert( + bool inserted = collection.clients.insert( { client->get_client_handle().get(), {client, weak_group_ptr} }); + // Should never be false, so this is a defensive check, mark unused too + // in order to avoid a warning in release builds. + assert(inserted); + RCUTILS_UNUSED(inserted); }, [&collection, weak_group_ptr](const rclcpp::TimerBase::SharedPtr & timer) { - collection.timers.insert( + bool inserted = collection.timers.insert( { timer->get_timer_handle().get(), {timer, weak_group_ptr} }); + // Should never be false, so this is a defensive check, mark unused too + // in order to avoid a warning in release builds. + assert(inserted); + RCUTILS_UNUSED(inserted); }, [&collection, weak_group_ptr](const rclcpp::Waitable::SharedPtr & waitable) { - collection.waitables.insert( + bool inserted = collection.waitables.insert( { waitable.get(), {waitable, weak_group_ptr} }); + // Should never be false, so this is a defensive check, mark unused too + // in order to avoid a warning in release builds. + assert(inserted); + RCUTILS_UNUSED(inserted); } ); } diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index ce6a103ab2..a17860960a 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -511,9 +511,19 @@ EventsExecutor::add_notify_waitable_to_collection( { // The notify waitable is not associated to any group, so use an invalid one rclcpp::CallbackGroup::WeakPtr weak_group_ptr; - collection.insert( + bool inserted = collection.insert( { this->notify_waitable_.get(), {this->notify_waitable_, weak_group_ptr} }); + // Explicitly ignore if the notify waitable was not inserted because that means + // it was already inserted, which happens initially as it is explicitly added + // in the constructor as well as every time the collection is reset, so on + // the first reset there is a second insertion attempt. + // We could check before trying to insert, but that would require a "find" call + // on each refresh, which is expensive, and otherwise it would require additional + // state in this class to detect the initial case where it is added twice. + // Therefore we just insert and ignore it if it fails (the only way it fails + // is when a duplicate is inserted). + RCUTILS_UNUSED(inserted); } diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index a82b702db5..84c24713f5 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -37,8 +37,10 @@ #include "rclcpp/guard_condition.hpp" #include "rclcpp/rclcpp.hpp" #include "rclcpp/time_source.hpp" +#include "rcpputils/scope_exit.hpp" #include "test_msgs/msg/empty.hpp" +#include "test_msgs/srv/empty.hpp" #include "./executor_types.hpp" @@ -878,6 +880,370 @@ TEST(TestExecutors, testSpinWithNonDefaultContext) rclcpp::shutdown(non_default_context); } +// The purpose of this test is to check that the order of callbacks happen +// in some relation to the order of events and the order in which the callbacks +// were registered. +// This is not a guarantee of executor API, but it is a bit of UB that some +// have come to depend on, see: +// +// https://github.com/ros2/rclcpp/issues/2532 +// +// It should not be changed unless there's a good reason for it (users find it +// the least surprising outcome even if it is not guaranteed), but if there +// is a good reason for changing it, then the executors effected can be skipped, +// or the test can be removed. +// The purpose of this test is to catch this regressions and let the authors of +// the change read up on the above context and act accordingly. +TYPED_TEST(TestExecutors, deterministic_execution_order_ub) +{ + using ExecutorType = TypeParam; + + // number of each entity to test + constexpr size_t number_of_entities = 20; + std::vector forward(number_of_entities); + std::iota(std::begin(forward), std::end(forward), 0); + std::vector reverse(number_of_entities); + std::reverse_copy(std::begin(forward), std::end(forward), std::begin(reverse)); + + // The expected results vary based on the registration order (always 0..N-1), + // the call order (what this means varies based on the entity type), the + // entity types, and in some cases the executor type. + // It is also possible that the rmw implementation can play a role in the + // ordering, depending on how the executor uses the rmw layer. + // The follow structure and logic tries to capture these details. + // Each test case represents a case-entity pair, + // e.g. "forward call order for waitables" or "reverse call order for timers" + struct test_case + { + // If this is true, then the test case should be skipped. + bool should_skip; + // Order in which to invoke the entities, where that is possible to control. + // For example, the order in which we trigger() the waitables, or the + // order in which we set the timers up to execute (using increasing periods). + std::vector call_order; + // Order in which we expect the entities to be executed by the executor. + std::vector expected_execution_order; + }; + // tests cases are "test_name: {"entity type": {skip, call_order, expected_execution_order}" + std::map> test_cases = { + { + "forward call order", + { + {"waitable", {false, forward, forward}}, + {"subscription", {false, forward, forward}}, + {"service", {false, forward, forward}}, + {"timer", {false, forward, forward}} + } + }, + { + "reverse call order", + { + {"waitable", {false, reverse, forward}}, + {"subscription", {false, reverse, forward}}, + {"service", {false, reverse, forward}}, + // timers are always called in order of which expires soonest, so + // the registration order doesn't necessarily affect them + {"timer", {false, reverse, reverse}} + } + }, + }; + + // Note use this to exclude or modify expected results for executors if this + // undefined behavior doesn't hold for them: + if ( + std::is_same()) + { + // for the EventsExecutor the call order is the execution order because it + // tracks the individual events (triggers in the case of waitables) and + // executes in that order + test_cases["reverse call order"]["waitable"] = {false, reverse, reverse}; + // timers are unaffected by the above about waitables, as they are always + // executed in "call order" even in the other executors + // but, subscription and service execution order is driven by the rmw impl + // due to how the EventsExecutor uses the rmw interface, so we'll skip those + for (auto & test_case_pair : test_cases) { + for (auto & entity_test_case_pair : test_case_pair.second) { + if ( + entity_test_case_pair.first == "subscription" || + entity_test_case_pair.first == "service") + { + entity_test_case_pair.second = {true, {}, {}}; + } + } + } + } + + // Set up a situation with N waitables, added in order (1, ..., N) and then + // trigger them in various orders between calls to spin, to see that the order + // is impacted by the registration order (in most cases). + // Note that we always add/register, trigger, then wait/spin, because this + // undefined behavior related to execution order only applies to entities + // that were "ready" in between calls to spin, i.e. they appear to become + // "ready" to the executor at the "same time". + // Also note, that this ordering only applies within entities of the same type + // as well, there are other parts of the executor that determine the order + // between entity types, e.g. the default scheduling (at the time of writing) + // prefers timers, then subscriptions, then service servers, then service + // clients, and then waitables, see: Executor::get_next_ready_executable() + // But that might be different for different executors and may change in the + // future. + // So here we just test order withing a few different waitable instances only. + // Further down we test similar set ups with other entities like subscriptions + // and timers. + + constexpr bool automatically_add_to_executor_with_node = false; + auto isolated_callback_group = this->node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + automatically_add_to_executor_with_node); + + // perform each of the test cases for waitables + { + auto waitable_interfaces = this->node->get_node_waitables_interface(); + + std::vector> waitables; + for (size_t i = 0; i < number_of_entities; ++i) { + auto my_waitable = std::make_shared(); + waitable_interfaces->add_waitable(my_waitable, isolated_callback_group); + waitables.push_back(my_waitable); + } + + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("waitable"); + if (test_case.should_skip) { + continue; + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT({ + for (size_t i = 0; i < number_of_entities; ++i) { + waitables[i]->set_on_execute_callback(nullptr); + } + }); + + std::vector actual_order; + for (size_t i : test_case.call_order) { + waitables[i]->set_on_execute_callback([&actual_order, i]() {actual_order.push_back(i);}); + waitables[i]->trigger(); + } + + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of waitables in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } + + // perform each of the test cases for subscriptions + { + const std::string test_topic_name = "~/deterministic_execution_order_ub"; + std::map> on_sub_data_callbacks; + std::vector::SharedPtr> subscriptions; + rclcpp::SubscriptionOptions so; + so.callback_group = isolated_callback_group; + for (size_t i = 0; i < number_of_entities; ++i) { + size_t next_sub_index = subscriptions.size(); + auto sub = this->node->template create_subscription( + test_topic_name, + 10, + [&on_sub_data_callbacks, &subscriptions, next_sub_index](const test_msgs::msg::Empty &) { + auto this_sub_pointer = subscriptions[next_sub_index].get(); + auto callback_for_sub_it = on_sub_data_callbacks.find(this_sub_pointer); + ASSERT_NE(callback_for_sub_it, on_sub_data_callbacks.end()); + auto on_sub_data_callback = callback_for_sub_it->second; + if (on_sub_data_callback) { + on_sub_data_callback(); + } + }, + so); + subscriptions.push_back(sub); + } + + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("subscription"); + if (test_case.should_skip) { + continue; + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT({ + on_sub_data_callbacks.clear(); + }); + + std::vector actual_order; + for (size_t i = 0; i < number_of_entities; ++i) { + auto sub = subscriptions[i]; + on_sub_data_callbacks[sub.get()] = [&actual_order, i]() { + actual_order.push_back(i); + }; + } + + // create publisher and wait for all of the subscriptions to match + auto pub = this->node->template create_publisher(test_topic_name, 10); + size_t number_of_matches = pub->get_subscription_count(); + while (number_of_matches < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + number_of_matches = pub->get_subscription_count(); + } + + // publish once and wait for all subscriptions to be handled + pub->publish(test_msgs::msg::Empty()); + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of subscriptions in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } + + // perform each of the test cases for service servers + { + const std::string test_service_name = "~/deterministic_execution_order_ub"; + std::map> on_request_callbacks; + std::vector::SharedPtr> services; + std::vector::SharedPtr> clients; + for (size_t i = 0; i < number_of_entities; ++i) { + size_t next_srv_index = services.size(); + auto srv = this->node->template create_service( + test_service_name + "_" + std::to_string(i), + [&on_request_callbacks, &services, next_srv_index]( + std::shared_ptr, + std::shared_ptr + ) { + auto this_srv_pointer = services[next_srv_index].get(); + auto callback_for_srv_it = on_request_callbacks.find(this_srv_pointer); + ASSERT_NE(callback_for_srv_it, on_request_callbacks.end()); + auto on_request_callback = callback_for_srv_it->second; + if (on_request_callback) { + on_request_callback(); + } + }, + 10, + isolated_callback_group); + services.push_back(srv); + auto client = this->node->template create_client( + test_service_name + "_" + std::to_string(i), + 10, + isolated_callback_group + ); + clients.push_back(client); + } + + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("service"); + if (test_case.should_skip) { + continue; + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT({ + on_request_callbacks.clear(); + }); + + std::vector actual_order; + for (size_t i = 0; i < number_of_entities; ++i) { + auto srv = services[i]; + on_request_callbacks[srv.get()] = [&actual_order, i]() { + actual_order.push_back(i); + }; + } + + // wait for all of the services to match + for (const auto & client : clients) { + bool matched = client->wait_for_service(10s); // long timeout, but should be quick + ASSERT_TRUE(matched); + } + + // send requests in order + for (size_t i : test_case.call_order) { + clients[i]->async_send_request(std::make_shared()); + } + + // wait for all the requests to be handled + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of service servers in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } + + // perform each of the test cases for timers + { + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("timer"); + if (test_case.should_skip) { + continue; + } + + std::map> timer_callbacks; + std::vector timers; + for (size_t i = 0; i < number_of_entities; ++i) { + // "call order" for timers will be simulated by setting them at different + // periods, with the "first" ones having the smallest period. + auto period = 1ms + std::chrono::milliseconds(test_case.call_order[i]); + auto timer = this->node->create_timer( + period, + [&timer_callbacks](rclcpp::TimerBase & timer) { + auto timer_callback_it = timer_callbacks.find(&timer); + ASSERT_NE(timer_callback_it, timer_callbacks.end()); + if (nullptr != timer_callback_it->second) { + timer_callback_it->second(); + } + }, + isolated_callback_group); + timers.push_back(timer); + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT({ + timer_callbacks.clear(); + }); + + std::vector actual_order; + for (size_t i = 0; i < number_of_entities; ++i) { + ASSERT_LT(i, timers.size()); + auto & timer = timers[i]; + timer_callbacks[timer.get()] = [&actual_order, &timer, i]() { + actual_order.push_back(i); + // only allow execution once + timer->cancel(); + }; + } + + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of timers in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } +} + template class TestBusyWaiting : public ::testing::Test {