Skip to content

Commit

Permalink
Improve the reliability of rosbag2 tests (#1796) (#1806)
Browse files Browse the repository at this point in the history
* Remove wait_until_shutdown.

This has almost exactly the same functionality as wait_for_condition,
except for two things:

1.  It is templated on the Timeout type.
2.  It calls rclcpp::shutdown after the loop completes.

However, neither of those is necessary; all callers to it use
a std::chrono::duration, and all of the test fixtures already
call rclcpp::shutdown.  Thus, just remove it and make all
callers use wait_for_condition instead.



* Shutdown the async spinner node without rclcpp::shutdown.

That is, we really don't actually want to do a full
rclcpp shutdown here; we only want to stop spinning.
Accomplish that with an executor, and timing out
every 100 milliseconds to check if we are done yet.



* Small fixes to start_async_spin in rosbag2_tests.

Make sure it only spins as long as we haven't shutdown,
and that it wakes up every so often to check that fact.



* Wait for topics to be discovered during recorder->record().

The main reason for that is that these tests generally want
to test certain expectations around how many messages were
received.  However, if discovery takes longer than we expect,
then it could be the case that we "missed" messages at the
beginning because discovery hadn't yet completed.

Fix this by just waiting around for the recorder to get all
the subscriptions it expects before moving on with the test.



* Feedback from review.



* Switch to using MockRecorder.



* Fixes from review.



* Feedback from review.



* Apply suggestions from code review




* Switch to using spin, rather than spin_some.

That's because there is currently at least one bug
associated with spin_some in rclcpp.  However, it turns
out that we don't even need to use it, as we can just as
easily use spin() along with exec.cancel().



* Make sure to stop_spinning when we tear down the test.



* Use scopes to shutdown spinning.



* Nested contexts just to explicitly cleanup the async spinners.



* Update rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp




* Apply the same fix to rosbag2_tests.



---------

Signed-off-by: Chris Lalancette <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
3 people authored Sep 18, 2024
1 parent 5bfef44 commit 8b8b81d
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 199 deletions.
15 changes: 0 additions & 15 deletions rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,6 @@ bool spin_and_wait_for(Timeout timeout, const Node & node, Condition condition)
return true;
}

template<typename Timeout, typename Condition>
bool wait_until_shutdown(Timeout timeout, Condition condition)
{
using clock = std::chrono::system_clock;
auto start = clock::now();
while (!condition()) {
if ((clock::now() - start) > timeout) {
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
rclcpp::shutdown();
return true;
}

template<typename Condition>
bool wait_until_condition(
Condition condition,
Expand Down
171 changes: 94 additions & 77 deletions rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

#include <gmock/gmock.h>

#include <atomic>
#include <chrono>
#include <filesystem>
#include <stdexcept>
#include <string>

#include "rosbag2_cpp/info.hpp"
Expand Down Expand Up @@ -65,6 +67,7 @@ class Rosbag2CPPGetServiceInfoTest

void TearDown() override
{
stop_spinning();
fs::remove_all(root_bag_path_);
}

Expand All @@ -81,22 +84,36 @@ class Rosbag2CPPGetServiceInfoTest
template<class T>
void start_async_spin(T node)
{
node_spinner_future_ = std::async(
std::launch::async,
[node, this]() -> void {
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(node);
while (!exit_from_node_spinner_) {
exec.spin_some();
}
});
if (exec_ == nullptr) {
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
exec_->add_node(node);
spin_thread_ = std::thread(
[this]() {
exec_->spin();
});
// Wait for the executor to start spinning in the newly spawned thread to avoid race condition
// with exec_->cancel()
using clock = std::chrono::steady_clock;
auto start = clock::now();
while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
if (!exec_->is_spinning()) {
throw std::runtime_error("Failed to start spinning node");
}
} else {
throw std::runtime_error("Already spinning a node, can't start a new node spin");
}
}

void stop_spinning()
{
exit_from_node_spinner_ = true;
if (node_spinner_future_.valid()) {
node_spinner_future_.wait();
if (exec_ != nullptr) {
exec_->cancel();
if (spin_thread_.joinable()) {
spin_thread_.join();
}
exec_ = nullptr;
}
}

Expand Down Expand Up @@ -146,8 +163,8 @@ class Rosbag2CPPGetServiceInfoTest

// relative path to the root of the bag file.
fs::path root_bag_path_;
std::future<void> node_spinner_future_;
std::atomic_bool exit_from_node_spinner_{false};
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_{nullptr};
std::thread spin_thread_;
};

TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_only) {
Expand Down Expand Up @@ -194,36 +211,36 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only
std::move(writer), storage_options, record_options);
recorder->record();

start_async_spin(recorder);
auto cleanup_process_handle = rcpputils::make_scope_exit(
[&]() {stop_spinning();});

ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready());
ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"}));

constexpr size_t num_service_requests = 3;
for (size_t i = 0; i < num_service_requests; i++) {
ASSERT_TRUE(service_client_manager->send_request());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

auto & writer_ref = recorder->get_writer_handle();
auto & recorder_writer =
dynamic_cast<SequentialWriterForTest &>(writer_ref.get_implementation_handle());
start_async_spin(recorder);
{
auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();});

ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready());
ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"}));

// By default, only client introspection is enabled.
// For one request, service event topic get 2 messages.
size_t expected_messages = num_service_requests * 2;
auto ret = rosbag2_test_common::wait_until_condition(
[&recorder_writer, &expected_messages]() {
return recorder_writer.get_number_of_written_messages() >= expected_messages;
},
std::chrono::seconds(5));
EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time";
for (size_t i = 0; i < num_service_requests; i++) {
ASSERT_TRUE(service_client_manager->send_request());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

recorder->stop();
stop_spinning();
cleanup_process_handle.cancel();
auto & writer_ref = recorder->get_writer_handle();
auto & recorder_writer =
dynamic_cast<SequentialWriterForTest &>(writer_ref.get_implementation_handle());

// By default, only client introspection is enabled.
// For one request, service event topic get 2 messages.
size_t expected_messages = num_service_requests * 2;
auto ret = rosbag2_test_common::wait_until_condition(
[&recorder_writer, &expected_messages]() {
return recorder_writer.get_number_of_written_messages() >= expected_messages;
},
std::chrono::seconds(5));
EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time";

recorder->stop();
}

rosbag2_cpp::Info info;
std::vector<std::shared_ptr<rosbag2_cpp::rosbag2_service_info_t>> ret_service_infos;
Expand Down Expand Up @@ -272,45 +289,45 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se
std::move(writer), storage_options, record_options);
recorder->record();

start_async_spin(recorder);
auto cleanup_process_handle = rcpputils::make_scope_exit(
[&]() {stop_spinning();});

ASSERT_TRUE(
wait_for_subscriptions(
*recorder,
{"/test_service1/_service_event",
"/test_service2/_service_event",
"/test_topic1",
"/test_topic2"}
)
);

constexpr size_t num_service_requests = 2;
for (size_t i = 0; i < num_service_requests; i++) {
ASSERT_TRUE(service_client_manager1->send_request());
ASSERT_TRUE(service_client_manager2->send_request());
std::this_thread::sleep_for(std::chrono::milliseconds(10));

start_async_spin(recorder);
{
auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();});

ASSERT_TRUE(
wait_for_subscriptions(
*recorder,
{"/test_service1/_service_event",
"/test_service2/_service_event",
"/test_topic1",
"/test_topic2"}
)
);

for (size_t i = 0; i < num_service_requests; i++) {
ASSERT_TRUE(service_client_manager1->send_request());
ASSERT_TRUE(service_client_manager2->send_request());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
pub_manager.run_publishers();

auto & writer_ref = recorder->get_writer_handle();
auto & recorder_writer =
dynamic_cast<SequentialWriterForTest &>(writer_ref.get_implementation_handle());

// By default, only client introspection is enabled.
// For one request, service event topic get 2 messages.
size_t expected_messages = num_service_requests * 2 + 2;
auto ret = rosbag2_test_common::wait_until_condition(
[&recorder_writer, &expected_messages]() {
return recorder_writer.get_number_of_written_messages() >= expected_messages;
},
std::chrono::seconds(5));
EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time";

recorder->stop();
}
pub_manager.run_publishers();

auto & writer_ref = recorder->get_writer_handle();
auto & recorder_writer =
dynamic_cast<SequentialWriterForTest &>(writer_ref.get_implementation_handle());

// By default, only client introspection is enabled.
// For one request, service event topic get 2 messages.
size_t expected_messages = num_service_requests * 2 + 2;
auto ret = rosbag2_test_common::wait_until_condition(
[&recorder_writer, &expected_messages]() {
return recorder_writer.get_number_of_written_messages() >= expected_messages;
},
std::chrono::seconds(5));
EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time";

recorder->stop();
stop_spinning();
cleanup_process_handle.cancel();

rosbag2_cpp::Info info;
std::vector<std::shared_ptr<rosbag2_cpp::rosbag2_service_info_t>> ret_service_infos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gmock/gmock.h>

#include <atomic>
#include <chrono>
#include <future>
#include <memory>
#include <stdexcept>
#include <string>
#include <vector>
#include <utility>

#include "rclcpp/rclcpp.hpp"

#include "rosbag2_transport/record_options.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

#include "rosbag2_test_common/memory_management.hpp"

#include "rosbag2_transport_test_fixture.hpp"

using namespace ::testing; // NOLINT
using namespace rosbag2_transport; // NOLINT
using namespace std::chrono_literals; // NOLINT

#ifndef ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_
#define ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_

Expand All @@ -50,21 +47,43 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture

void TearDown() override
{
stop_spinning();
rclcpp::shutdown();
}

template<class T>
void start_async_spin(T node)
{
future_ = std::async(
std::launch::async, [node]() -> void {rclcpp::spin(node);});
if (exec_ == nullptr) {
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
exec_->add_node(node);
spin_thread_ = std::thread(
[this]() {
exec_->spin();
});
// Wait for the executor to start spinning in the newly spawned thread to avoid race condition
// with exec_->cancel()
using clock = std::chrono::steady_clock;
auto start = clock::now();
while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
if (!exec_->is_spinning()) {
throw std::runtime_error("Failed to start spinning node");
}
} else {
throw std::runtime_error("Already spinning a node, can't start a new node spin");
}
}

void stop_spinning()
{
rclcpp::shutdown();
if (future_.valid()) {
future_.wait();
if (exec_ != nullptr) {
exec_->cancel();
if (spin_thread_.joinable()) {
spin_thread_.join();
}
exec_ = nullptr;
}
}

Expand All @@ -83,8 +102,9 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture
return filtered_messages;
}

std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_{nullptr};
MemoryManagement memory_;
std::future<void> future_;
std::thread spin_thread_;
};

#endif // ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,12 @@ TEST_F(RecordIntegrationTestFixture, test_keyboard_controls)

recorder->record();

this->start_async_spin(recorder);
start_async_spin(recorder);
auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();});

EXPECT_THAT(recorder->is_paused(), true);
keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey);
EXPECT_THAT(recorder->is_paused(), false);
keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey);
EXPECT_THAT(recorder->is_paused(), true);

this->stop_spinning();
}
Loading

0 comments on commit 8b8b81d

Please sign in to comment.