From 93983a0a684bf33cc2c69a296ca59e19550cae97 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 8 Jun 2023 22:05:33 -0700 Subject: [PATCH 1/3] Fix for rosbag2_transport::Recorder failures due to unhandled exceptions Signed-off-by: Michael Orlov --- .../src/rosbag2_transport/recorder.cpp | 61 +++++++++++--- .../src/rosbag2_transport/topic_filter.cpp | 79 ++++++++++--------- 2 files changed, 91 insertions(+), 49 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index d94d1ccb87..fd5d02106e 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -328,7 +328,13 @@ void RecorderImpl::event_publisher_thread_main() auto message = rosbag2_interfaces::msg::WriteSplitEvent(); message.closed_file = bag_split_info_.closed_file; message.opened_file = bag_split_info_.opened_file; - split_event_pub_->publish(message); + try { + split_event_pub_->publish(message); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "Failed to publish message on '/events/write_split' topic. \nError: " << e.what()); + } } } RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); @@ -373,8 +379,7 @@ bool RecorderImpl::is_paused() void RecorderImpl::topics_discovery() { while (rclcpp::ok() && stop_discovery_ == false) { - auto topics_to_subscribe = - get_requested_or_available_topics(); + auto topics_to_subscribe = get_requested_or_available_topics(); for (const auto & topic_and_type : topics_to_subscribe) { warn_if_new_qos_for_subscribed_topic(topic_and_type.first); } @@ -394,7 +399,13 @@ void RecorderImpl::topics_discovery() std::unordered_map RecorderImpl::get_requested_or_available_topics() { - auto all_topics_and_types = node->get_topic_names_and_types(); + std::map> all_topics_and_types; + try { + all_topics_and_types = node->get_topic_names_and_types(); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), "Failed to get topic names and types from node \nError: " << e.what()); + } return topic_filter_->filter_topics(all_topics_and_types); } @@ -415,7 +426,16 @@ void RecorderImpl::subscribe_topics( const std::unordered_map & topics_and_types) { for (const auto & topic_with_type : topics_and_types) { - auto endpoint_infos = node->get_publishers_info_by_topic(topic_with_type.first); + std::vector endpoint_infos; + try { + endpoint_infos = node->get_publishers_info_by_topic(topic_with_type.first); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), "Failed to get publishers info for '" << topic_with_type.first << + "' topic. \nError: " << e.what()); + continue; + } + subscribe_topic( { topic_with_type.first, @@ -435,13 +455,19 @@ void RecorderImpl::subscribe_topic(const rosbag2_storage::TopicMetadata & topic) writer_->create_topic(topic); Rosbag2QoS subscription_qos{subscription_qos_for_topic(topic.name)}; - auto subscription = create_subscription(topic.name, topic.type, subscription_qos); - if (subscription) { - subscriptions_.insert({topic.name, subscription}); - RCLCPP_INFO_STREAM( - node->get_logger(), - "Subscribed to topic '" << topic.name << "'"); - } else { + try { + auto subscription = create_subscription(topic.name, topic.type, subscription_qos); + if (subscription) { + subscriptions_.insert({topic.name, subscription}); + RCLCPP_INFO_STREAM(node->get_logger(), "Subscribed to topic '" << topic.name << "'"); + } else { + writer_->remove_topic(topic); + subscriptions_.erase(topic.name); + } + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), "Failed to create subscription for '" << topic.name << + "' topic. \nError: " << e.what()); writer_->remove_topic(topic); subscriptions_.erase(topic.name); } @@ -553,7 +579,16 @@ void RecorderImpl::warn_if_new_qos_for_subscribed_topic(const std::string & topi } const auto actual_qos = existing_subscription->second->get_actual_qos(); const auto & used_profile = actual_qos.get_rmw_qos_profile(); - auto publishers_info = node->get_publishers_info_by_topic(topic_name); + + std::vector publishers_info; + try { + publishers_info = node->get_publishers_info_by_topic(topic_name); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), "Failed to get publishers info for '" << topic_name << + "' topic \nError: " << e.what()); + } + for (const auto & info : publishers_info) { auto new_profile = info.qos_profile().get_rmw_qos_profile(); bool incompatible_reliability = diff --git a/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp b/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp index b82ee0f0fe..9f2d917c08 100644 --- a/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp +++ b/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp @@ -117,49 +117,56 @@ std::unordered_map TopicFilter::filter_topics( bool TopicFilter::take_topic( const std::string & topic_name, const std::vector & topic_types) { - if (!has_single_type(topic_name, topic_types)) { - return false; - } + try { + if (!has_single_type(topic_name, topic_types)) { + return false; + } - const std::string & topic_type = topic_types[0]; - if (!allow_unknown_types_ && !type_is_known(topic_name, topic_type)) { - return false; - } + const std::string & topic_type = topic_types[0]; + if (!allow_unknown_types_ && !type_is_known(topic_name, topic_type)) { + return false; + } - if (!record_options_.include_hidden_topics && topic_is_hidden(topic_name)) { - RCUTILS_LOG_WARN_ONCE_NAMED( - ROSBAG2_TRANSPORT_PACKAGE_NAME, - "Hidden topics are not recorded. Enable them with --include-hidden-topics"); - return false; - } + if (!record_options_.include_hidden_topics && topic_is_hidden(topic_name)) { + RCUTILS_LOG_WARN_ONCE_NAMED( + ROSBAG2_TRANSPORT_PACKAGE_NAME, + "Hidden topics are not recorded. Enable them with --include-hidden-topics"); + return false; + } - if (!record_options_.include_unpublished_topics && node_graph_ && - topic_is_unpublished(topic_name, *node_graph_)) - { - return false; - } + if (!record_options_.include_unpublished_topics && node_graph_ && + topic_is_unpublished(topic_name, *node_graph_)) + { + return false; + } - if (record_options_.ignore_leaf_topics && node_graph_ && - is_leaf_topic(topic_name, *node_graph_)) - { - return false; - } + if (record_options_.ignore_leaf_topics && node_graph_ && + is_leaf_topic(topic_name, *node_graph_)) + { + return false; + } - if (!record_options_.topics.empty() && !topic_in_list(topic_name, record_options_.topics)) { - return false; - } + if (!record_options_.topics.empty() && !topic_in_list(topic_name, record_options_.topics)) { + return false; + } - std::regex exclude_regex(record_options_.exclude); - if (!record_options_.exclude.empty() && std::regex_search(topic_name, exclude_regex)) { - return false; - } + std::regex exclude_regex(record_options_.exclude); + if (!record_options_.exclude.empty() && std::regex_search(topic_name, exclude_regex)) { + return false; + } - std::regex include_regex(record_options_.regex); - if ( - !record_options_.all && // All takes precedence over regex - !record_options_.regex.empty() && // empty regex matches nothing, but should be ignored - !std::regex_search(topic_name, include_regex)) - { + std::regex include_regex(record_options_.regex); + if ( + !record_options_.all && // All takes precedence over regex + !record_options_.regex.empty() && // empty regex matches nothing, but should be ignored + !std::regex_search(topic_name, include_regex)) + { + return false; + } + } catch (const std::exception & e) { + ROSBAG2_TRANSPORT_LOG_ERROR_STREAM( + "Failed to get information about topic '" << topic_name << + "' in TopicFilter. \nError: " << e.what()); return false; } From b04f79f0901d96a805bed5ec66ecbe95e29949e7 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 10 Jun 2023 19:23:18 -0700 Subject: [PATCH 2/3] Revert "Fix for rosbag2_transport::Recorder failures due to unhandled exceptions" This reverts commit 767d440d637f5170bf490142bf55e6f84bfeee6b. Signed-off-by: Michael Orlov --- .../src/rosbag2_transport/recorder.cpp | 61 +++----------- .../src/rosbag2_transport/topic_filter.cpp | 79 +++++++++---------- 2 files changed, 49 insertions(+), 91 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index fd5d02106e..d94d1ccb87 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -328,13 +328,7 @@ void RecorderImpl::event_publisher_thread_main() auto message = rosbag2_interfaces::msg::WriteSplitEvent(); message.closed_file = bag_split_info_.closed_file; message.opened_file = bag_split_info_.opened_file; - try { - split_event_pub_->publish(message); - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - node->get_logger(), - "Failed to publish message on '/events/write_split' topic. \nError: " << e.what()); - } + split_event_pub_->publish(message); } } RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); @@ -379,7 +373,8 @@ bool RecorderImpl::is_paused() void RecorderImpl::topics_discovery() { while (rclcpp::ok() && stop_discovery_ == false) { - auto topics_to_subscribe = get_requested_or_available_topics(); + auto topics_to_subscribe = + get_requested_or_available_topics(); for (const auto & topic_and_type : topics_to_subscribe) { warn_if_new_qos_for_subscribed_topic(topic_and_type.first); } @@ -399,13 +394,7 @@ void RecorderImpl::topics_discovery() std::unordered_map RecorderImpl::get_requested_or_available_topics() { - std::map> all_topics_and_types; - try { - all_topics_and_types = node->get_topic_names_and_types(); - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - node->get_logger(), "Failed to get topic names and types from node \nError: " << e.what()); - } + auto all_topics_and_types = node->get_topic_names_and_types(); return topic_filter_->filter_topics(all_topics_and_types); } @@ -426,16 +415,7 @@ void RecorderImpl::subscribe_topics( const std::unordered_map & topics_and_types) { for (const auto & topic_with_type : topics_and_types) { - std::vector endpoint_infos; - try { - endpoint_infos = node->get_publishers_info_by_topic(topic_with_type.first); - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - node->get_logger(), "Failed to get publishers info for '" << topic_with_type.first << - "' topic. \nError: " << e.what()); - continue; - } - + auto endpoint_infos = node->get_publishers_info_by_topic(topic_with_type.first); subscribe_topic( { topic_with_type.first, @@ -455,19 +435,13 @@ void RecorderImpl::subscribe_topic(const rosbag2_storage::TopicMetadata & topic) writer_->create_topic(topic); Rosbag2QoS subscription_qos{subscription_qos_for_topic(topic.name)}; - try { - auto subscription = create_subscription(topic.name, topic.type, subscription_qos); - if (subscription) { - subscriptions_.insert({topic.name, subscription}); - RCLCPP_INFO_STREAM(node->get_logger(), "Subscribed to topic '" << topic.name << "'"); - } else { - writer_->remove_topic(topic); - subscriptions_.erase(topic.name); - } - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - node->get_logger(), "Failed to create subscription for '" << topic.name << - "' topic. \nError: " << e.what()); + auto subscription = create_subscription(topic.name, topic.type, subscription_qos); + if (subscription) { + subscriptions_.insert({topic.name, subscription}); + RCLCPP_INFO_STREAM( + node->get_logger(), + "Subscribed to topic '" << topic.name << "'"); + } else { writer_->remove_topic(topic); subscriptions_.erase(topic.name); } @@ -579,16 +553,7 @@ void RecorderImpl::warn_if_new_qos_for_subscribed_topic(const std::string & topi } const auto actual_qos = existing_subscription->second->get_actual_qos(); const auto & used_profile = actual_qos.get_rmw_qos_profile(); - - std::vector publishers_info; - try { - publishers_info = node->get_publishers_info_by_topic(topic_name); - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - node->get_logger(), "Failed to get publishers info for '" << topic_name << - "' topic \nError: " << e.what()); - } - + auto publishers_info = node->get_publishers_info_by_topic(topic_name); for (const auto & info : publishers_info) { auto new_profile = info.qos_profile().get_rmw_qos_profile(); bool incompatible_reliability = diff --git a/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp b/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp index 9f2d917c08..b82ee0f0fe 100644 --- a/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp +++ b/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp @@ -117,56 +117,49 @@ std::unordered_map TopicFilter::filter_topics( bool TopicFilter::take_topic( const std::string & topic_name, const std::vector & topic_types) { - try { - if (!has_single_type(topic_name, topic_types)) { - return false; - } + if (!has_single_type(topic_name, topic_types)) { + return false; + } - const std::string & topic_type = topic_types[0]; - if (!allow_unknown_types_ && !type_is_known(topic_name, topic_type)) { - return false; - } + const std::string & topic_type = topic_types[0]; + if (!allow_unknown_types_ && !type_is_known(topic_name, topic_type)) { + return false; + } - if (!record_options_.include_hidden_topics && topic_is_hidden(topic_name)) { - RCUTILS_LOG_WARN_ONCE_NAMED( - ROSBAG2_TRANSPORT_PACKAGE_NAME, - "Hidden topics are not recorded. Enable them with --include-hidden-topics"); - return false; - } + if (!record_options_.include_hidden_topics && topic_is_hidden(topic_name)) { + RCUTILS_LOG_WARN_ONCE_NAMED( + ROSBAG2_TRANSPORT_PACKAGE_NAME, + "Hidden topics are not recorded. Enable them with --include-hidden-topics"); + return false; + } - if (!record_options_.include_unpublished_topics && node_graph_ && - topic_is_unpublished(topic_name, *node_graph_)) - { - return false; - } + if (!record_options_.include_unpublished_topics && node_graph_ && + topic_is_unpublished(topic_name, *node_graph_)) + { + return false; + } - if (record_options_.ignore_leaf_topics && node_graph_ && - is_leaf_topic(topic_name, *node_graph_)) - { - return false; - } + if (record_options_.ignore_leaf_topics && node_graph_ && + is_leaf_topic(topic_name, *node_graph_)) + { + return false; + } - if (!record_options_.topics.empty() && !topic_in_list(topic_name, record_options_.topics)) { - return false; - } + if (!record_options_.topics.empty() && !topic_in_list(topic_name, record_options_.topics)) { + return false; + } - std::regex exclude_regex(record_options_.exclude); - if (!record_options_.exclude.empty() && std::regex_search(topic_name, exclude_regex)) { - return false; - } + std::regex exclude_regex(record_options_.exclude); + if (!record_options_.exclude.empty() && std::regex_search(topic_name, exclude_regex)) { + return false; + } - std::regex include_regex(record_options_.regex); - if ( - !record_options_.all && // All takes precedence over regex - !record_options_.regex.empty() && // empty regex matches nothing, but should be ignored - !std::regex_search(topic_name, include_regex)) - { - return false; - } - } catch (const std::exception & e) { - ROSBAG2_TRANSPORT_LOG_ERROR_STREAM( - "Failed to get information about topic '" << topic_name << - "' in TopicFilter. \nError: " << e.what()); + std::regex include_regex(record_options_.regex); + if ( + !record_options_.all && // All takes precedence over regex + !record_options_.regex.empty() && // empty regex matches nothing, but should be ignored + !std::regex_search(topic_name, include_regex)) + { return false; } From 1c79e8a31b2389dff4e4b7861518e3994e73309c Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 10 Jun 2023 19:55:52 -0700 Subject: [PATCH 3/3] Handle exceptions in event publisher and discovery threads - Exceptions potentially may come from the underlying node operations when we are invalidating context via rclcpp::shutdown(). We need to have possibility to correct finish recording in this case. Signed-off-by: Michael Orlov --- .../src/rosbag2_transport/recorder.cpp | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index d94d1ccb87..7a893a4cb4 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -285,11 +285,12 @@ void RecorderImpl::record() response->paused = is_paused(); }); + split_event_pub_ = + node->create_publisher("events/write_split", 1); + // Start the thread that will publish events event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); - split_event_pub_ = - node->create_publisher("events/write_split", 1); rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; callbacks.write_split_callback = [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { @@ -328,7 +329,17 @@ void RecorderImpl::event_publisher_thread_main() auto message = rosbag2_interfaces::msg::WriteSplitEvent(); message.closed_file = bag_split_info_.closed_file; message.opened_file = bag_split_info_.opened_file; - split_event_pub_->publish(message); + try { + split_event_pub_->publish(message); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "Failed to publish message on '/events/write_split' topic. \nError: " << e.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "Failed to publish message on '/events/write_split' topic."); + } } } RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); @@ -373,19 +384,26 @@ bool RecorderImpl::is_paused() void RecorderImpl::topics_discovery() { while (rclcpp::ok() && stop_discovery_ == false) { - auto topics_to_subscribe = - get_requested_or_available_topics(); - for (const auto & topic_and_type : topics_to_subscribe) { - warn_if_new_qos_for_subscribed_topic(topic_and_type.first); - } - auto missing_topics = get_missing_topics(topics_to_subscribe); - subscribe_topics(missing_topics); + try { + auto topics_to_subscribe = get_requested_or_available_topics(); + for (const auto & topic_and_type : topics_to_subscribe) { + warn_if_new_qos_for_subscribed_topic(topic_and_type.first); + } + auto missing_topics = get_missing_topics(topics_to_subscribe); + subscribe_topics(missing_topics); - if (!record_options_.topics.empty() && subscriptions_.size() == record_options_.topics.size()) { - RCLCPP_INFO( - node->get_logger(), - "All requested topics are subscribed. Stopping discovery..."); - return; + if (!record_options_.topics.empty() && + subscriptions_.size() == record_options_.topics.size()) + { + RCLCPP_INFO( + node->get_logger(), + "All requested topics are subscribed. Stopping discovery..."); + return; + } + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery.\nError: " << e.what()); + } catch (...) { + RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery."); } std::this_thread::sleep_for(record_options_.topic_polling_interval); }