diff --git a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp index 9578e7243..bb13ef2f5 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp @@ -37,21 +37,6 @@ bool spin_and_wait_for(Timeout timeout, const Node & node, Condition condition) return true; } -template -bool wait_until_shutdown(Timeout timeout, Condition condition) -{ - using clock = std::chrono::system_clock; - auto start = clock::now(); - while (!condition()) { - if ((clock::now() - start) > timeout) { - return false; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - rclcpp::shutdown(); - return true; -} - template bool wait_until_condition( Condition condition, diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index 3ab7aeb98..401deb311 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -14,8 +14,10 @@ #include +#include #include #include +#include #include #include "rosbag2_cpp/info.hpp" @@ -65,6 +67,7 @@ class Rosbag2CPPGetServiceInfoTest void TearDown() override { + stop_spinning(); fs::remove_all(root_bag_path_); } @@ -81,22 +84,36 @@ class Rosbag2CPPGetServiceInfoTest template void start_async_spin(T node) { - node_spinner_future_ = std::async( - std::launch::async, - [node, this]() -> void { - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(node); - while (!exit_from_node_spinner_) { - exec.spin_some(); - } - }); + if (exec_ == nullptr) { + exec_ = std::make_unique(); + exec_->add_node(node); + spin_thread_ = std::thread( + [this]() { + exec_->spin(); + }); + // Wait for the executor to start spinning in the newly spawned thread to avoid race condition + // with exec_->cancel() + using clock = std::chrono::steady_clock; + auto start = clock::now(); + while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!exec_->is_spinning()) { + throw std::runtime_error("Failed to start spinning node"); + } + } else { + throw std::runtime_error("Already spinning a node, can't start a new node spin"); + } } void stop_spinning() { - exit_from_node_spinner_ = true; - if (node_spinner_future_.valid()) { - node_spinner_future_.wait(); + if (exec_ != nullptr) { + exec_->cancel(); + if (spin_thread_.joinable()) { + spin_thread_.join(); + } + exec_ = nullptr; } } @@ -146,8 +163,8 @@ class Rosbag2CPPGetServiceInfoTest // relative path to the root of the bag file. fs::path root_bag_path_; - std::future node_spinner_future_; - std::atomic_bool exit_from_node_spinner_{false}; + std::unique_ptr exec_{nullptr}; + std::thread spin_thread_; }; TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_only) { @@ -194,36 +211,36 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only std::move(writer), storage_options, record_options); recorder->record(); - start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit( - [&]() {stop_spinning();}); - - ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); - ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); - constexpr size_t num_service_requests = 3; - for (size_t i = 0; i < num_service_requests; i++) { - ASSERT_TRUE(service_client_manager->send_request()); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - auto & writer_ref = recorder->get_writer_handle(); - auto & recorder_writer = - dynamic_cast(writer_ref.get_implementation_handle()); + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); + ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); - // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. - size_t expected_messages = num_service_requests * 2; - auto ret = rosbag2_test_common::wait_until_condition( - [&recorder_writer, &expected_messages]() { - return recorder_writer.get_number_of_written_messages() >= expected_messages; - }, - std::chrono::seconds(5)); - EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + for (size_t i = 0; i < num_service_requests; i++) { + ASSERT_TRUE(service_client_manager->send_request()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - recorder->stop(); - stop_spinning(); - cleanup_process_handle.cancel(); + auto & writer_ref = recorder->get_writer_handle(); + auto & recorder_writer = + dynamic_cast(writer_ref.get_implementation_handle()); + + // By default, only client introspection is enabled. + // For one request, service event topic get 2 messages. + size_t expected_messages = num_service_requests * 2; + auto ret = rosbag2_test_common::wait_until_condition( + [&recorder_writer, &expected_messages]() { + return recorder_writer.get_number_of_written_messages() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + + recorder->stop(); + } rosbag2_cpp::Info info; std::vector> ret_service_infos; @@ -272,45 +289,45 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se std::move(writer), storage_options, record_options); recorder->record(); - start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit( - [&]() {stop_spinning();}); - - ASSERT_TRUE( - wait_for_subscriptions( - *recorder, - {"/test_service1/_service_event", - "/test_service2/_service_event", - "/test_topic1", - "/test_topic2"} - ) - ); - constexpr size_t num_service_requests = 2; - for (size_t i = 0; i < num_service_requests; i++) { - ASSERT_TRUE(service_client_manager1->send_request()); - ASSERT_TRUE(service_client_manager2->send_request()); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE( + wait_for_subscriptions( + *recorder, + {"/test_service1/_service_event", + "/test_service2/_service_event", + "/test_topic1", + "/test_topic2"} + ) + ); + + for (size_t i = 0; i < num_service_requests; i++) { + ASSERT_TRUE(service_client_manager1->send_request()); + ASSERT_TRUE(service_client_manager2->send_request()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + pub_manager.run_publishers(); + + auto & writer_ref = recorder->get_writer_handle(); + auto & recorder_writer = + dynamic_cast(writer_ref.get_implementation_handle()); + + // By default, only client introspection is enabled. + // For one request, service event topic get 2 messages. + size_t expected_messages = num_service_requests * 2 + 2; + auto ret = rosbag2_test_common::wait_until_condition( + [&recorder_writer, &expected_messages]() { + return recorder_writer.get_number_of_written_messages() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + + recorder->stop(); } - pub_manager.run_publishers(); - - auto & writer_ref = recorder->get_writer_handle(); - auto & recorder_writer = - dynamic_cast(writer_ref.get_implementation_handle()); - - // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. - size_t expected_messages = num_service_requests * 2 + 2; - auto ret = rosbag2_test_common::wait_until_condition( - [&recorder_writer, &expected_messages]() { - return recorder_writer.get_number_of_written_messages() >= expected_messages; - }, - std::chrono::seconds(5)); - EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; - - recorder->stop(); - stop_spinning(); - cleanup_process_handle.cancel(); rosbag2_cpp::Info info; std::vector> ret_service_infos; diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index cd1089fec..c85f69cb9 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -12,26 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - +#include +#include #include #include +#include #include #include #include #include "rclcpp/rclcpp.hpp" -#include "rosbag2_transport/record_options.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_test_common/memory_management.hpp" #include "rosbag2_transport_test_fixture.hpp" -using namespace ::testing; // NOLINT -using namespace rosbag2_transport; // NOLINT -using namespace std::chrono_literals; // NOLINT - #ifndef ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ #define ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ @@ -50,21 +47,43 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture void TearDown() override { + stop_spinning(); rclcpp::shutdown(); } template void start_async_spin(T node) { - future_ = std::async( - std::launch::async, [node]() -> void {rclcpp::spin(node);}); + if (exec_ == nullptr) { + exec_ = std::make_unique(); + exec_->add_node(node); + spin_thread_ = std::thread( + [this]() { + exec_->spin(); + }); + // Wait for the executor to start spinning in the newly spawned thread to avoid race condition + // with exec_->cancel() + using clock = std::chrono::steady_clock; + auto start = clock::now(); + while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!exec_->is_spinning()) { + throw std::runtime_error("Failed to start spinning node"); + } + } else { + throw std::runtime_error("Already spinning a node, can't start a new node spin"); + } } void stop_spinning() { - rclcpp::shutdown(); - if (future_.valid()) { - future_.wait(); + if (exec_ != nullptr) { + exec_->cancel(); + if (spin_thread_.joinable()) { + spin_thread_.join(); + } + exec_ = nullptr; } } @@ -83,8 +102,9 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture return filtered_messages; } + std::unique_ptr exec_{nullptr}; MemoryManagement memory_; - std::future future_; + std::thread spin_thread_; }; #endif // ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index 3097be472..60be757b6 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -203,13 +203,12 @@ TEST_F(RecordIntegrationTestFixture, test_keyboard_controls) recorder->record(); - this->start_async_spin(recorder); + start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); EXPECT_THAT(recorder->is_paused(), true); keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey); EXPECT_THAT(recorder->is_paused(), false); keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey); EXPECT_THAT(recorder->is_paused(), true); - - this->stop_spinning(); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index dd5288eb5..2e7fc8106 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -52,30 +52,39 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are std::move(writer_), storage_options_, record_options); recorder->record(); - start_async_spin(recorder); - - ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - - pub_manager.run_publishers(); + constexpr size_t expected_messages = 4; + std::vector> recorded_messages; + std::unordered_map< + std::string, + std::pair + > recorded_topics; - auto & writer = recorder->get_writer_handle(); - MockSequentialWriter & mock_writer = - static_cast(writer.get_implementation_handle()); + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + + pub_manager.run_publishers(); + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + auto ret = rosbag2_test_common::wait_until_condition( + [ =, &mock_writer]() { + return mock_writer.get_messages().size() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "failed to capture expected messages in time" << + "recorded messages = " << recorded_messages.size(); + recorded_messages = mock_writer.get_messages(); + recorded_topics = mock_writer.get_topics(); + } - constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), - [ =, &mock_writer]() { - return mock_writer.get_messages().size() >= expected_messages; - }); - auto recorded_messages = mock_writer.get_messages(); - EXPECT_TRUE(ret) << "failed to capture expected messages in time" << - "recorded messages = " << recorded_messages.size(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); - stop_spinning(); - auto recorded_topics = mock_writer.get_topics(); ASSERT_THAT(recorded_topics, SizeIs(2)); EXPECT_THAT(recorded_topics.at(string_topic).first.serialization_format, Eq("rmw_format")); EXPECT_THAT(recorded_topics.at(array_topic).first.serialization_format, Eq("rmw_format")); @@ -126,10 +135,12 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) std::move(writer_), storage_options_, record_options); recorder->record(); + start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); - start_async_spin(recorder); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); @@ -146,11 +157,11 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) // 4 because we're running recorder->record() and publishers twice constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -187,6 +198,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -197,11 +209,11 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -251,6 +263,7 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -261,11 +274,11 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); auto recorded_topics = mock_writer.get_topics(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; @@ -294,6 +307,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -302,11 +316,11 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) static_cast(writer.get_implementation_handle()); size_t expected_messages = num_latched_messages; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [&mock_writer, &expected_messages]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); auto recorded_topics = mock_writer.get_topics(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; @@ -428,6 +442,7 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); auto & writer = recorder->get_writer_handle(); mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -436,14 +451,15 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(string_topic, string_message, expected_messages); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [&mock_writer, &expected_messages]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 597deb40a..2b2b85a40 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -30,6 +30,7 @@ #include "rosbag2_transport/recorder.hpp" +#include "mock_recorder.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -56,6 +57,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); @@ -66,11 +68,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); @@ -98,17 +100,25 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a rosbag2_transport::RecordOptions record_options = {false, true, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); ASSERT_TRUE(client_manager_2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /test_service_1 and /test_service_2, along with + // the event topics /test_service_1/_service_event and /test_service_2/_service_event are + // available to be recorded. However, wait_for_service_to_be_ready() only checks the services, + // not the event topics, so ask the recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service_1/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service_2/_service_event")); + // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. + // For one request, service event topic gets 2 messages. ASSERT_TRUE(client_manager_1->send_request()); ASSERT_TRUE(client_manager_2->send_request()); @@ -116,11 +126,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); @@ -140,16 +150,24 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a rosbag2_transport::RecordOptions record_options = {true, true, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); + // At this point, we expect that the service /test_service_1, along with the topic /string_topic, + // along with the event topic /test_service_1, along with the split topic /events/write_split are + // available to be recorded. However, wait_for_matched() and wait_for_service_to_be_ready() only + // check on the service and the topic, not the event or the split topic, so ask the recorder to + // make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service/_service_event")); + pub_manager.run_publishers(); // By default, only client introspection is enabled. @@ -160,11 +178,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp index 955855d13..17080477a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp @@ -69,11 +69,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_two_topics_ignore_l static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp index 018e260f4..d0a07d33e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp @@ -23,6 +23,7 @@ #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" #include "rosbag2_test_common/publication_manager.hpp" +#include "rosbag2_test_common/wait_for.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -40,6 +41,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignore auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); @@ -58,6 +60,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_include auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); @@ -78,6 +81,7 @@ TEST_F( auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp index 5e30702e6..ce9181514 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp @@ -44,6 +44,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_ recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(topic, string_message, 5); @@ -54,11 +55,11 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_ static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 0; - rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(2), + rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() > expected_messages; - }); + }, + std::chrono::seconds(2)); // We can't EXPECT anything here, since there may be some messages from rosout auto recorded_topics = mock_writer.get_topics(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index 4b6476ac3..2e612310b 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -98,32 +98,38 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) std::move(writer_), storage_options_, record_options); recorder->record(); + constexpr size_t expected_messages = 10; + std::vector> recorded_messages; + std::unordered_map messages_per_topic; + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); - ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); + ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); - pub_manager.run_publishers(); + pub_manager.run_publishers(); - auto & writer = recorder->get_writer_handle(); - MockSequentialWriter & mock_writer = - static_cast(writer.get_implementation_handle()); + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); - constexpr size_t expected_messages = 10; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), - [ =, &mock_writer]() { - return mock_writer.get_messages().size() >= expected_messages; - }); - auto recorded_messages = mock_writer.get_messages(); - EXPECT_TRUE(ret) << "failed to capture expected messages in time. " << - "recorded messages = " << recorded_messages.size(); - stop_spinning(); - - auto messages_per_topic = mock_writer.messages_per_topic(); + auto ret = rosbag2_test_common::wait_until_condition( + [ =, &mock_writer]() { + return mock_writer.get_messages().size() >= expected_messages; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(ret) << "failed to capture expected messages in time. " << + "recorded messages = " << recorded_messages.size(); + recorded_messages = mock_writer.get_messages(); + messages_per_topic = mock_writer.messages_per_topic(); + } + + ASSERT_EQ(messages_per_topic.count(string_topic), 1u); EXPECT_EQ(messages_per_topic[string_topic], 5u); EXPECT_THAT(recorded_messages, SizeIs(Ge(expected_messages))); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index f6d4d81fc..6cf26a4be 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -33,6 +33,7 @@ #include "test_msgs/message_fixtures.hpp" #include "test_msgs/srv/basic_types.hpp" +#include "mock_recorder.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -77,6 +78,7 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); @@ -87,11 +89,11 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -137,7 +139,6 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) record_options.regex = regex; record_options.exclude_regex = topics_regex_to_exclude; - // TODO(karsten1987) Refactor this into publication manager rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(v1, test_string_messages[0], 3); @@ -151,6 +152,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -162,11 +164,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -226,6 +228,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -237,11 +240,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -290,11 +293,12 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) auto service_manager_b2 = std::make_shared>(b2); - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); @@ -302,6 +306,14 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) ASSERT_TRUE(service_manager_b1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_b2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /still_nice_service and /awesome_nice_service, + // along with the event topics /still_nice_service/_service_event + // and /awesome_nice_service/_service_event are available to be recorded. However, + // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the + // recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v1 + "/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v2 + "/_service_event")); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -312,11 +324,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) ASSERT_TRUE(service_manager_b2->send_request()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -363,11 +375,12 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording auto service_manager_b2 = std::make_shared>(b2); - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); @@ -375,6 +388,14 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording ASSERT_TRUE(service_manager_b1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_b2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /still_nice_service and /awesome_nice_service, + // along with the event topics /still_nice_service/_service_event + // and /awesome_nice_service/_service_event are available to be recorded. However, + // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the + // recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v1 + "/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v2 + "/_service_event")); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -385,11 +406,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording ASSERT_TRUE(service_manager_b2->send_request()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages));