From 32cc7790371b2cc05949320b7fbfd40367599ad0 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 10 Jun 2023 13:38:44 -0700 Subject: [PATCH 1/3] Fix for possible freeze in Recorder::stop() (#1381) * Fix for possible freeze in Recorder::stop() - It was possible a freeze in recorder due to the race condition when calling Recorder::stop() while event publisher thread hasn't been fully started yet. Signed-off-by: Michael Orlov * Move event_publisher_thread_wake_cv_.notify_all() out of the mutex lock Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov (cherry picked from commit c6cc69a4ed7bc25b494051148e99ab0e7d22cc90) # Conflicts: # rosbag2_transport/src/rosbag2_transport/recorder.cpp --- .../src/rosbag2_transport/recorder.cpp | 546 ++++++++++++++++++ 1 file changed, 546 insertions(+) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index a76777e02e..2bb78de822 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -40,6 +40,552 @@ namespace rosbag2_transport { +<<<<<<< HEAD +======= +class RecorderImpl +{ +public: + RecorderImpl( + rclcpp::Node * owner, + std::shared_ptr writer, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::RecordOptions & record_options); + + ~RecorderImpl(); + + void record(); + + /// @brief Stopping recording and closing writer. + /// The record() can be called again after stop(). + void stop(); + + const rosbag2_cpp::Writer & get_writer_handle(); + + /// Pause the recording. + void pause(); + + /// Resume recording. + void resume(); + + /// Pause if it was recording, continue recording if paused. + void toggle_paused(); + + /// Return the current paused state. + bool is_paused(); + + std::unordered_map get_requested_or_available_topics(); + + /// Public members for access by wrapper + std::unordered_set topics_warned_about_incompatibility_; + std::shared_ptr writer_; + rosbag2_storage::StorageOptions storage_options_; + rosbag2_transport::RecordOptions record_options_; + std::atomic stop_discovery_ = false; + std::unordered_map> subscriptions_; + +private: + void topics_discovery(); + + std::unordered_map + get_missing_topics(const std::unordered_map & all_topics); + + void subscribe_topics( + const std::unordered_map & topics_and_types); + + void subscribe_topic(const rosbag2_storage::TopicMetadata & topic); + + std::shared_ptr create_subscription( + const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos); + + /** + * Find the QoS profile that should be used for subscribing. + * + * Uses the override from record_options, if it is specified for this topic. + * Otherwise, falls back to Rosbag2QoS::adapt_request_to_offers + * + * \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status). + * \return The QoS profile to be used for subscribing. + */ + rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name) const; + + // Serialize all currently offered QoS profiles for a topic into a YAML list. + std::string serialized_offered_qos_profiles_for_topic( + const std::vector & topics_endpoint_info) const; + + void warn_if_new_qos_for_subscribed_topic(const std::string & topic_name); + + void event_publisher_thread_main(); + bool event_publisher_thread_should_wake(); + + rclcpp::Node * node; + std::unique_ptr topic_filter_; + std::future discovery_future_; + std::string serialization_format_; + std::unordered_map topic_qos_profile_overrides_; + std::unordered_set topic_unknown_types_; + rclcpp::Service::SharedPtr srv_is_paused_; + rclcpp::Service::SharedPtr srv_pause_; + rclcpp::Service::SharedPtr srv_resume_; + rclcpp::Service::SharedPtr srv_snapshot_; + rclcpp::Service::SharedPtr srv_split_bagfile_; + + std::atomic paused_ = false; + std::atomic in_recording_ = false; + std::shared_ptr keyboard_handler_; + KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ = + KeyboardHandler::invalid_handle; + + // Variables for event publishing + rclcpp::Publisher::SharedPtr split_event_pub_; + std::atomic event_publisher_thread_should_exit_ = false; + std::atomic write_split_has_occurred_ = false; + rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_; + std::mutex event_publisher_thread_mutex_; + std::condition_variable event_publisher_thread_wake_cv_; + std::thread event_publisher_thread_; +}; + +RecorderImpl::RecorderImpl( + rclcpp::Node * owner, + std::shared_ptr writer, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::RecordOptions & record_options) +: writer_(std::move(writer)), + storage_options_(storage_options), + record_options_(record_options), + stop_discovery_(record_options_.is_discovery_disabled), + node(owner), + paused_(record_options.start_paused), + keyboard_handler_(std::move(keyboard_handler)) +{ + std::string key_str = enum_key_code_to_str(Recorder::kPauseResumeToggleKey); + toggle_paused_key_callback_handle_ = + keyboard_handler_->add_key_press_callback( + [this](KeyboardHandler::KeyCode /*key_code*/, + KeyboardHandler::KeyModifiers /*key_modifiers*/) {this->toggle_paused();}, + Recorder::kPauseResumeToggleKey); + topic_filter_ = std::make_unique(record_options, node->get_node_graph_interface()); + // show instructions + RCLCPP_INFO_STREAM( + node->get_logger(), + "Press " << key_str << " for pausing/resuming"); + + for (auto & topic : record_options_.topics) { + topic = rclcpp::expand_topic_or_service_name( + topic, node->get_name(), + node->get_namespace(), false); + } +} + +RecorderImpl::~RecorderImpl() +{ + keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_); + stop(); +} + +void RecorderImpl::stop() +{ + stop_discovery_ = true; + if (discovery_future_.valid()) { + auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval); + if (status != std::future_status::ready) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() << + ") return status: " << (status == std::future_status::timeout ? "timeout" : "deferred")); + } + } + paused_ = true; + subscriptions_.clear(); + writer_->close(); // Call writer->close() to finalize current bag file and write metadata + + { + std::lock_guard lock(event_publisher_thread_mutex_); + event_publisher_thread_should_exit_ = true; + } + event_publisher_thread_wake_cv_.notify_all(); + if (event_publisher_thread_.joinable()) { + event_publisher_thread_.join(); + } + in_recording_ = false; + RCLCPP_INFO(node->get_logger(), "Recording stopped"); +} + +void RecorderImpl::record() +{ + if (in_recording_.exchange(true)) { + RCLCPP_WARN_STREAM( + node->get_logger(), + "Called Recorder::record() while already in recording, dismissing request."); + return; + } + paused_ = record_options_.start_paused; + topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; + if (record_options_.rmw_serialization_format.empty()) { + throw std::runtime_error("No serialization format specified!"); + } + + writer_->open( + storage_options_, + {rmw_get_serialization_format(), record_options_.rmw_serialization_format}); + + // Only expose snapshot service when mode is enabled + if (storage_options_.snapshot_mode) { + srv_snapshot_ = node->create_service( + "~/snapshot", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr response) + { + response->success = writer_->take_snapshot(); + }); + } + + srv_split_bagfile_ = node->create_service( + "~/split_bagfile", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + writer_->split_bagfile(); + }); + + srv_pause_ = node->create_service( + "~/pause", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + pause(); + }); + + srv_resume_ = node->create_service( + "~/resume", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + resume(); + }); + + srv_is_paused_ = node->create_service( + "~/is_paused", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr response) + { + response->paused = is_paused(); + }); + + // Start the thread that will publish events + event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); + + split_event_pub_ = + node->create_publisher("events/write_split", 1); + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { + { + std::lock_guard lock(event_publisher_thread_mutex_); + bag_split_info_ = info; + write_split_has_occurred_ = true; + } + event_publisher_thread_wake_cv_.notify_all(); + }; + writer_->add_event_callbacks(callbacks); + + serialization_format_ = record_options_.rmw_serialization_format; + RCLCPP_INFO(node->get_logger(), "Listening for topics..."); + subscribe_topics(get_requested_or_available_topics()); + + if (!record_options_.is_discovery_disabled) { + discovery_future_ = + std::async(std::launch::async, std::bind(&RecorderImpl::topics_discovery, this)); + } + RCLCPP_INFO(node->get_logger(), "Recording..."); +} + +void RecorderImpl::event_publisher_thread_main() +{ + RCLCPP_INFO(node->get_logger(), "Event publisher thread: Starting"); + while (!event_publisher_thread_should_exit_.load()) { + std::unique_lock lock(event_publisher_thread_mutex_); + event_publisher_thread_wake_cv_.wait( + lock, + [this] {return event_publisher_thread_should_wake();}); + + if (write_split_has_occurred_) { + write_split_has_occurred_ = false; + + auto message = rosbag2_interfaces::msg::WriteSplitEvent(); + message.closed_file = bag_split_info_.closed_file; + message.opened_file = bag_split_info_.opened_file; + split_event_pub_->publish(message); + } + } + RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); +} + +bool RecorderImpl::event_publisher_thread_should_wake() +{ + return write_split_has_occurred_ || event_publisher_thread_should_exit_; +} + +const rosbag2_cpp::Writer & RecorderImpl::get_writer_handle() +{ + return *writer_; +} + +void RecorderImpl::pause() +{ + paused_.store(true); + RCLCPP_INFO_STREAM(node->get_logger(), "Pausing recording."); +} + +void RecorderImpl::resume() +{ + paused_.store(false); + RCLCPP_INFO_STREAM(node->get_logger(), "Resuming recording."); +} + +void RecorderImpl::toggle_paused() +{ + if (paused_.load()) { + this->resume(); + } else { + this->pause(); + } +} + +bool RecorderImpl::is_paused() +{ + return paused_.load(); +} + +void RecorderImpl::topics_discovery() +{ + while (rclcpp::ok() && stop_discovery_ == false) { + auto topics_to_subscribe = + get_requested_or_available_topics(); + for (const auto & topic_and_type : topics_to_subscribe) { + warn_if_new_qos_for_subscribed_topic(topic_and_type.first); + } + auto missing_topics = get_missing_topics(topics_to_subscribe); + subscribe_topics(missing_topics); + + if (!record_options_.topics.empty() && subscriptions_.size() == record_options_.topics.size()) { + RCLCPP_INFO( + node->get_logger(), + "All requested topics are subscribed. Stopping discovery..."); + return; + } + std::this_thread::sleep_for(record_options_.topic_polling_interval); + } +} + +std::unordered_map +RecorderImpl::get_requested_or_available_topics() +{ + auto all_topics_and_types = node->get_topic_names_and_types(); + return topic_filter_->filter_topics(all_topics_and_types); +} + +std::unordered_map +RecorderImpl::get_missing_topics(const std::unordered_map & all_topics) +{ + std::unordered_map missing_topics; + for (const auto & i : all_topics) { + if (subscriptions_.find(i.first) == subscriptions_.end()) { + missing_topics.emplace(i.first, i.second); + } + } + return missing_topics; +} + + +void RecorderImpl::subscribe_topics( + const std::unordered_map & topics_and_types) +{ + for (const auto & topic_with_type : topics_and_types) { + auto endpoint_infos = node->get_publishers_info_by_topic(topic_with_type.first); + subscribe_topic( + { + topic_with_type.first, + topic_with_type.second, + serialization_format_, + serialized_offered_qos_profiles_for_topic(endpoint_infos), + type_description_hash_for_topic(endpoint_infos), + }); + } +} + +void RecorderImpl::subscribe_topic(const rosbag2_storage::TopicMetadata & topic) +{ + // Need to create topic in writer before we are trying to create subscription. Since in + // callback for subscription we are calling writer_->write(bag_message); and it could happened + // that callback called before we reached out the line: writer_->create_topic(topic) + writer_->create_topic(topic); + + Rosbag2QoS subscription_qos{subscription_qos_for_topic(topic.name)}; + auto subscription = create_subscription(topic.name, topic.type, subscription_qos); + if (subscription) { + subscriptions_.insert({topic.name, subscription}); + RCLCPP_INFO_STREAM( + node->get_logger(), + "Subscribed to topic '" << topic.name << "'"); + } else { + writer_->remove_topic(topic); + subscriptions_.erase(topic.name); + } +} + +std::shared_ptr +RecorderImpl::create_subscription( + const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos) +{ + auto subscription = node->create_generic_subscription( + topic_name, + topic_type, + qos, + [this, topic_name, topic_type](std::shared_ptr message) { + if (!paused_.load()) { + writer_->write(message, topic_name, topic_type, node->get_clock()->now()); + } + }); + return subscription; +} + +std::string RecorderImpl::serialized_offered_qos_profiles_for_topic( + const std::vector & topics_endpoint_info) const +{ + YAML::Node offered_qos_profiles; + for (const auto & info : topics_endpoint_info) { + offered_qos_profiles.push_back(Rosbag2QoS(info.qos_profile())); + } + return YAML::Dump(offered_qos_profiles); +} + +std::string type_hash_to_string(const rosidl_type_hash_t & type_hash) +{ + if (type_hash.version == 0) { + // version is unset, this is an empty type hash. + return ""; + } + if (type_hash.version > 1) { + // this is a version we don't know how to serialize + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "attempted to stringify type hash with unknown version " << type_hash.version); + return ""; + } + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + char * stringified_type_hash = nullptr; + rcutils_ret_t status = rosidl_stringify_type_hash(&type_hash, allocator, &stringified_type_hash); + std::string result = ""; + if (status == RCUTILS_RET_OK) { + result = stringified_type_hash; + } + if (stringified_type_hash != nullptr) { + allocator.deallocate(stringified_type_hash, allocator.state); + } + return result; +} + +std::string type_description_hash_for_topic( + const std::vector & topics_endpoint_info) +{ + rosidl_type_hash_t result_hash = rosidl_get_zero_initialized_type_hash(); + for (const auto & info : topics_endpoint_info) { + // If all endpoint infos provide the same type hash, return it. Otherwise return an empty + // string to signal that the type description hash for this topic cannot be determined. + rosidl_type_hash_t endpoint_hash = info.topic_type_hash(); + if (endpoint_hash.version == 0) { + continue; + } + if (result_hash.version == 0) { + result_hash = endpoint_hash; + continue; + } + bool difference_detected = (endpoint_hash.version != result_hash.version); + difference_detected |= ( + 0 != memcmp(endpoint_hash.value, result_hash.value, ROSIDL_TYPE_HASH_SIZE)); + if (difference_detected) { + std::string result_string = type_hash_to_string(result_hash); + std::string endpoint_string = type_hash_to_string(endpoint_hash); + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "type description hashes for topic type '" << info.topic_type() << "' conflict: '" << + result_string << "' != '" << endpoint_string << "'"); + return ""; + } + } + return type_hash_to_string(result_hash); +} + +rclcpp::QoS RecorderImpl::subscription_qos_for_topic(const std::string & topic_name) const +{ + if (topic_qos_profile_overrides_.count(topic_name)) { + RCLCPP_INFO_STREAM( + node->get_logger(), + "Overriding subscription profile for " << topic_name); + return topic_qos_profile_overrides_.at(topic_name); + } + return Rosbag2QoS::adapt_request_to_offers( + topic_name, node->get_publishers_info_by_topic(topic_name)); +} + +void RecorderImpl::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name) +{ + auto existing_subscription = subscriptions_.find(topic_name); + if (existing_subscription == subscriptions_.end()) { + // Not subscribed yet + return; + } + if (topics_warned_about_incompatibility_.count(topic_name) > 0) { + // Already warned about this topic + return; + } + const auto actual_qos = existing_subscription->second->get_actual_qos(); + const auto & used_profile = actual_qos.get_rmw_qos_profile(); + auto publishers_info = node->get_publishers_info_by_topic(topic_name); + for (const auto & info : publishers_info) { + auto new_profile = info.qos_profile().get_rmw_qos_profile(); + bool incompatible_reliability = + new_profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT && + used_profile.reliability != RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + bool incompatible_durability = + new_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE && + used_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE; + + if (incompatible_reliability) { + RCLCPP_WARN_STREAM( + node->get_logger(), + "A new publisher for subscribed topic " << topic_name << " " + "was found offering RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT, " + "but rosbag already subscribed requesting RMW_QOS_POLICY_RELIABILITY_RELIABLE. " + "Messages from this new publisher will not be recorded."); + topics_warned_about_incompatibility_.insert(topic_name); + } else if (incompatible_durability) { + RCLCPP_WARN_STREAM( + node->get_logger(), + "A new publisher for subscribed topic " << topic_name << " " + "was found offering RMW_QOS_POLICY_DURABILITY_VOLATILE, " + "but rosbag2 already subscribed requesting RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. " + "Messages from this new publisher will not be recorded."); + topics_warned_about_incompatibility_.insert(topic_name); + } + } +} + +/////////////////////////////// +// Recorder public interface + +>>>>>>> c6cc69a (Fix for possible freeze in Recorder::stop() (#1381)) Recorder::Recorder( const std::string & node_name, const rclcpp::NodeOptions & node_options) From 7b15851c403002ef0b8c6c5274f50b51a25673da Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Tue, 13 Jun 2023 16:37:30 -0700 Subject: [PATCH 2/3] Resolve merge conflicts Signed-off-by: Michael Orlov --- .../include/rosbag2_transport/recorder.hpp | 5 +- .../src/rosbag2_transport/recorder.cpp | 571 +----------------- 2 files changed, 20 insertions(+), 556 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index efc9eb0c36..8071f1e40c 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -164,6 +164,7 @@ class Recorder : public rclcpp::Node std::unordered_set topic_unknown_types_; rclcpp::Service::SharedPtr srv_snapshot_; std::atomic paused_ = false; + std::atomic in_recording_ = false; // Keyboard handler std::shared_ptr keyboard_handler_; // Toogle paused key callback handle @@ -172,8 +173,8 @@ class Recorder : public rclcpp::Node // Variables for event publishing rclcpp::Publisher::SharedPtr split_event_pub_; - bool event_publisher_thread_should_exit_ = false; - bool write_split_has_occurred_ = false; + std::atomic event_publisher_thread_should_exit_ = false; + std::atomic write_split_has_occurred_ = false; rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_; std::mutex event_publisher_thread_mutex_; std::condition_variable event_publisher_thread_wake_cv_; diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 2bb78de822..3b2fc77275 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -40,552 +40,6 @@ namespace rosbag2_transport { -<<<<<<< HEAD -======= -class RecorderImpl -{ -public: - RecorderImpl( - rclcpp::Node * owner, - std::shared_ptr writer, - std::shared_ptr keyboard_handler, - const rosbag2_storage::StorageOptions & storage_options, - const rosbag2_transport::RecordOptions & record_options); - - ~RecorderImpl(); - - void record(); - - /// @brief Stopping recording and closing writer. - /// The record() can be called again after stop(). - void stop(); - - const rosbag2_cpp::Writer & get_writer_handle(); - - /// Pause the recording. - void pause(); - - /// Resume recording. - void resume(); - - /// Pause if it was recording, continue recording if paused. - void toggle_paused(); - - /// Return the current paused state. - bool is_paused(); - - std::unordered_map get_requested_or_available_topics(); - - /// Public members for access by wrapper - std::unordered_set topics_warned_about_incompatibility_; - std::shared_ptr writer_; - rosbag2_storage::StorageOptions storage_options_; - rosbag2_transport::RecordOptions record_options_; - std::atomic stop_discovery_ = false; - std::unordered_map> subscriptions_; - -private: - void topics_discovery(); - - std::unordered_map - get_missing_topics(const std::unordered_map & all_topics); - - void subscribe_topics( - const std::unordered_map & topics_and_types); - - void subscribe_topic(const rosbag2_storage::TopicMetadata & topic); - - std::shared_ptr create_subscription( - const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos); - - /** - * Find the QoS profile that should be used for subscribing. - * - * Uses the override from record_options, if it is specified for this topic. - * Otherwise, falls back to Rosbag2QoS::adapt_request_to_offers - * - * \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status). - * \return The QoS profile to be used for subscribing. - */ - rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name) const; - - // Serialize all currently offered QoS profiles for a topic into a YAML list. - std::string serialized_offered_qos_profiles_for_topic( - const std::vector & topics_endpoint_info) const; - - void warn_if_new_qos_for_subscribed_topic(const std::string & topic_name); - - void event_publisher_thread_main(); - bool event_publisher_thread_should_wake(); - - rclcpp::Node * node; - std::unique_ptr topic_filter_; - std::future discovery_future_; - std::string serialization_format_; - std::unordered_map topic_qos_profile_overrides_; - std::unordered_set topic_unknown_types_; - rclcpp::Service::SharedPtr srv_is_paused_; - rclcpp::Service::SharedPtr srv_pause_; - rclcpp::Service::SharedPtr srv_resume_; - rclcpp::Service::SharedPtr srv_snapshot_; - rclcpp::Service::SharedPtr srv_split_bagfile_; - - std::atomic paused_ = false; - std::atomic in_recording_ = false; - std::shared_ptr keyboard_handler_; - KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ = - KeyboardHandler::invalid_handle; - - // Variables for event publishing - rclcpp::Publisher::SharedPtr split_event_pub_; - std::atomic event_publisher_thread_should_exit_ = false; - std::atomic write_split_has_occurred_ = false; - rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_; - std::mutex event_publisher_thread_mutex_; - std::condition_variable event_publisher_thread_wake_cv_; - std::thread event_publisher_thread_; -}; - -RecorderImpl::RecorderImpl( - rclcpp::Node * owner, - std::shared_ptr writer, - std::shared_ptr keyboard_handler, - const rosbag2_storage::StorageOptions & storage_options, - const rosbag2_transport::RecordOptions & record_options) -: writer_(std::move(writer)), - storage_options_(storage_options), - record_options_(record_options), - stop_discovery_(record_options_.is_discovery_disabled), - node(owner), - paused_(record_options.start_paused), - keyboard_handler_(std::move(keyboard_handler)) -{ - std::string key_str = enum_key_code_to_str(Recorder::kPauseResumeToggleKey); - toggle_paused_key_callback_handle_ = - keyboard_handler_->add_key_press_callback( - [this](KeyboardHandler::KeyCode /*key_code*/, - KeyboardHandler::KeyModifiers /*key_modifiers*/) {this->toggle_paused();}, - Recorder::kPauseResumeToggleKey); - topic_filter_ = std::make_unique(record_options, node->get_node_graph_interface()); - // show instructions - RCLCPP_INFO_STREAM( - node->get_logger(), - "Press " << key_str << " for pausing/resuming"); - - for (auto & topic : record_options_.topics) { - topic = rclcpp::expand_topic_or_service_name( - topic, node->get_name(), - node->get_namespace(), false); - } -} - -RecorderImpl::~RecorderImpl() -{ - keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_); - stop(); -} - -void RecorderImpl::stop() -{ - stop_discovery_ = true; - if (discovery_future_.valid()) { - auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval); - if (status != std::future_status::ready) { - RCLCPP_ERROR_STREAM( - node->get_logger(), - "discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() << - ") return status: " << (status == std::future_status::timeout ? "timeout" : "deferred")); - } - } - paused_ = true; - subscriptions_.clear(); - writer_->close(); // Call writer->close() to finalize current bag file and write metadata - - { - std::lock_guard lock(event_publisher_thread_mutex_); - event_publisher_thread_should_exit_ = true; - } - event_publisher_thread_wake_cv_.notify_all(); - if (event_publisher_thread_.joinable()) { - event_publisher_thread_.join(); - } - in_recording_ = false; - RCLCPP_INFO(node->get_logger(), "Recording stopped"); -} - -void RecorderImpl::record() -{ - if (in_recording_.exchange(true)) { - RCLCPP_WARN_STREAM( - node->get_logger(), - "Called Recorder::record() while already in recording, dismissing request."); - return; - } - paused_ = record_options_.start_paused; - topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; - if (record_options_.rmw_serialization_format.empty()) { - throw std::runtime_error("No serialization format specified!"); - } - - writer_->open( - storage_options_, - {rmw_get_serialization_format(), record_options_.rmw_serialization_format}); - - // Only expose snapshot service when mode is enabled - if (storage_options_.snapshot_mode) { - srv_snapshot_ = node->create_service( - "~/snapshot", - [this]( - const std::shared_ptr/* request_header */, - const std::shared_ptr/* request */, - const std::shared_ptr response) - { - response->success = writer_->take_snapshot(); - }); - } - - srv_split_bagfile_ = node->create_service( - "~/split_bagfile", - [this]( - const std::shared_ptr/* request_header */, - const std::shared_ptr/* request */, - const std::shared_ptr/* response */) - { - writer_->split_bagfile(); - }); - - srv_pause_ = node->create_service( - "~/pause", - [this]( - const std::shared_ptr/* request_header */, - const std::shared_ptr/* request */, - const std::shared_ptr/* response */) - { - pause(); - }); - - srv_resume_ = node->create_service( - "~/resume", - [this]( - const std::shared_ptr/* request_header */, - const std::shared_ptr/* request */, - const std::shared_ptr/* response */) - { - resume(); - }); - - srv_is_paused_ = node->create_service( - "~/is_paused", - [this]( - const std::shared_ptr/* request_header */, - const std::shared_ptr/* request */, - const std::shared_ptr response) - { - response->paused = is_paused(); - }); - - // Start the thread that will publish events - event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); - - split_event_pub_ = - node->create_publisher("events/write_split", 1); - rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; - callbacks.write_split_callback = - [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { - { - std::lock_guard lock(event_publisher_thread_mutex_); - bag_split_info_ = info; - write_split_has_occurred_ = true; - } - event_publisher_thread_wake_cv_.notify_all(); - }; - writer_->add_event_callbacks(callbacks); - - serialization_format_ = record_options_.rmw_serialization_format; - RCLCPP_INFO(node->get_logger(), "Listening for topics..."); - subscribe_topics(get_requested_or_available_topics()); - - if (!record_options_.is_discovery_disabled) { - discovery_future_ = - std::async(std::launch::async, std::bind(&RecorderImpl::topics_discovery, this)); - } - RCLCPP_INFO(node->get_logger(), "Recording..."); -} - -void RecorderImpl::event_publisher_thread_main() -{ - RCLCPP_INFO(node->get_logger(), "Event publisher thread: Starting"); - while (!event_publisher_thread_should_exit_.load()) { - std::unique_lock lock(event_publisher_thread_mutex_); - event_publisher_thread_wake_cv_.wait( - lock, - [this] {return event_publisher_thread_should_wake();}); - - if (write_split_has_occurred_) { - write_split_has_occurred_ = false; - - auto message = rosbag2_interfaces::msg::WriteSplitEvent(); - message.closed_file = bag_split_info_.closed_file; - message.opened_file = bag_split_info_.opened_file; - split_event_pub_->publish(message); - } - } - RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); -} - -bool RecorderImpl::event_publisher_thread_should_wake() -{ - return write_split_has_occurred_ || event_publisher_thread_should_exit_; -} - -const rosbag2_cpp::Writer & RecorderImpl::get_writer_handle() -{ - return *writer_; -} - -void RecorderImpl::pause() -{ - paused_.store(true); - RCLCPP_INFO_STREAM(node->get_logger(), "Pausing recording."); -} - -void RecorderImpl::resume() -{ - paused_.store(false); - RCLCPP_INFO_STREAM(node->get_logger(), "Resuming recording."); -} - -void RecorderImpl::toggle_paused() -{ - if (paused_.load()) { - this->resume(); - } else { - this->pause(); - } -} - -bool RecorderImpl::is_paused() -{ - return paused_.load(); -} - -void RecorderImpl::topics_discovery() -{ - while (rclcpp::ok() && stop_discovery_ == false) { - auto topics_to_subscribe = - get_requested_or_available_topics(); - for (const auto & topic_and_type : topics_to_subscribe) { - warn_if_new_qos_for_subscribed_topic(topic_and_type.first); - } - auto missing_topics = get_missing_topics(topics_to_subscribe); - subscribe_topics(missing_topics); - - if (!record_options_.topics.empty() && subscriptions_.size() == record_options_.topics.size()) { - RCLCPP_INFO( - node->get_logger(), - "All requested topics are subscribed. Stopping discovery..."); - return; - } - std::this_thread::sleep_for(record_options_.topic_polling_interval); - } -} - -std::unordered_map -RecorderImpl::get_requested_or_available_topics() -{ - auto all_topics_and_types = node->get_topic_names_and_types(); - return topic_filter_->filter_topics(all_topics_and_types); -} - -std::unordered_map -RecorderImpl::get_missing_topics(const std::unordered_map & all_topics) -{ - std::unordered_map missing_topics; - for (const auto & i : all_topics) { - if (subscriptions_.find(i.first) == subscriptions_.end()) { - missing_topics.emplace(i.first, i.second); - } - } - return missing_topics; -} - - -void RecorderImpl::subscribe_topics( - const std::unordered_map & topics_and_types) -{ - for (const auto & topic_with_type : topics_and_types) { - auto endpoint_infos = node->get_publishers_info_by_topic(topic_with_type.first); - subscribe_topic( - { - topic_with_type.first, - topic_with_type.second, - serialization_format_, - serialized_offered_qos_profiles_for_topic(endpoint_infos), - type_description_hash_for_topic(endpoint_infos), - }); - } -} - -void RecorderImpl::subscribe_topic(const rosbag2_storage::TopicMetadata & topic) -{ - // Need to create topic in writer before we are trying to create subscription. Since in - // callback for subscription we are calling writer_->write(bag_message); and it could happened - // that callback called before we reached out the line: writer_->create_topic(topic) - writer_->create_topic(topic); - - Rosbag2QoS subscription_qos{subscription_qos_for_topic(topic.name)}; - auto subscription = create_subscription(topic.name, topic.type, subscription_qos); - if (subscription) { - subscriptions_.insert({topic.name, subscription}); - RCLCPP_INFO_STREAM( - node->get_logger(), - "Subscribed to topic '" << topic.name << "'"); - } else { - writer_->remove_topic(topic); - subscriptions_.erase(topic.name); - } -} - -std::shared_ptr -RecorderImpl::create_subscription( - const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos) -{ - auto subscription = node->create_generic_subscription( - topic_name, - topic_type, - qos, - [this, topic_name, topic_type](std::shared_ptr message) { - if (!paused_.load()) { - writer_->write(message, topic_name, topic_type, node->get_clock()->now()); - } - }); - return subscription; -} - -std::string RecorderImpl::serialized_offered_qos_profiles_for_topic( - const std::vector & topics_endpoint_info) const -{ - YAML::Node offered_qos_profiles; - for (const auto & info : topics_endpoint_info) { - offered_qos_profiles.push_back(Rosbag2QoS(info.qos_profile())); - } - return YAML::Dump(offered_qos_profiles); -} - -std::string type_hash_to_string(const rosidl_type_hash_t & type_hash) -{ - if (type_hash.version == 0) { - // version is unset, this is an empty type hash. - return ""; - } - if (type_hash.version > 1) { - // this is a version we don't know how to serialize - ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "attempted to stringify type hash with unknown version " << type_hash.version); - return ""; - } - rcutils_allocator_t allocator = rcutils_get_default_allocator(); - char * stringified_type_hash = nullptr; - rcutils_ret_t status = rosidl_stringify_type_hash(&type_hash, allocator, &stringified_type_hash); - std::string result = ""; - if (status == RCUTILS_RET_OK) { - result = stringified_type_hash; - } - if (stringified_type_hash != nullptr) { - allocator.deallocate(stringified_type_hash, allocator.state); - } - return result; -} - -std::string type_description_hash_for_topic( - const std::vector & topics_endpoint_info) -{ - rosidl_type_hash_t result_hash = rosidl_get_zero_initialized_type_hash(); - for (const auto & info : topics_endpoint_info) { - // If all endpoint infos provide the same type hash, return it. Otherwise return an empty - // string to signal that the type description hash for this topic cannot be determined. - rosidl_type_hash_t endpoint_hash = info.topic_type_hash(); - if (endpoint_hash.version == 0) { - continue; - } - if (result_hash.version == 0) { - result_hash = endpoint_hash; - continue; - } - bool difference_detected = (endpoint_hash.version != result_hash.version); - difference_detected |= ( - 0 != memcmp(endpoint_hash.value, result_hash.value, ROSIDL_TYPE_HASH_SIZE)); - if (difference_detected) { - std::string result_string = type_hash_to_string(result_hash); - std::string endpoint_string = type_hash_to_string(endpoint_hash); - ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "type description hashes for topic type '" << info.topic_type() << "' conflict: '" << - result_string << "' != '" << endpoint_string << "'"); - return ""; - } - } - return type_hash_to_string(result_hash); -} - -rclcpp::QoS RecorderImpl::subscription_qos_for_topic(const std::string & topic_name) const -{ - if (topic_qos_profile_overrides_.count(topic_name)) { - RCLCPP_INFO_STREAM( - node->get_logger(), - "Overriding subscription profile for " << topic_name); - return topic_qos_profile_overrides_.at(topic_name); - } - return Rosbag2QoS::adapt_request_to_offers( - topic_name, node->get_publishers_info_by_topic(topic_name)); -} - -void RecorderImpl::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name) -{ - auto existing_subscription = subscriptions_.find(topic_name); - if (existing_subscription == subscriptions_.end()) { - // Not subscribed yet - return; - } - if (topics_warned_about_incompatibility_.count(topic_name) > 0) { - // Already warned about this topic - return; - } - const auto actual_qos = existing_subscription->second->get_actual_qos(); - const auto & used_profile = actual_qos.get_rmw_qos_profile(); - auto publishers_info = node->get_publishers_info_by_topic(topic_name); - for (const auto & info : publishers_info) { - auto new_profile = info.qos_profile().get_rmw_qos_profile(); - bool incompatible_reliability = - new_profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT && - used_profile.reliability != RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - bool incompatible_durability = - new_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE && - used_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE; - - if (incompatible_reliability) { - RCLCPP_WARN_STREAM( - node->get_logger(), - "A new publisher for subscribed topic " << topic_name << " " - "was found offering RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT, " - "but rosbag already subscribed requesting RMW_QOS_POLICY_RELIABILITY_RELIABLE. " - "Messages from this new publisher will not be recorded."); - topics_warned_about_incompatibility_.insert(topic_name); - } else if (incompatible_durability) { - RCLCPP_WARN_STREAM( - node->get_logger(), - "A new publisher for subscribed topic " << topic_name << " " - "was found offering RMW_QOS_POLICY_DURABILITY_VOLATILE, " - "but rosbag2 already subscribed requesting RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. " - "Messages from this new publisher will not be recorded."); - topics_warned_about_incompatibility_.insert(topic_name); - } - } -} - -/////////////////////////////// -// Recorder public interface - ->>>>>>> c6cc69a (Fix for possible freeze in Recorder::stop() (#1381)) Recorder::Recorder( const std::string & node_name, const rclcpp::NodeOptions & node_options) @@ -667,7 +121,13 @@ void Recorder::stop() { stop_discovery_ = true; if (discovery_future_.valid()) { - discovery_future_.wait(); + auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval); + if (status != std::future_status::ready) { + RCLCPP_ERROR_STREAM( + get_logger(), + "discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() << + ") return status: " << (status == std::future_status::timeout ? "timeout" : "deferred")); + } } paused_ = true; subscriptions_.clear(); @@ -681,10 +141,18 @@ void Recorder::stop() if (event_publisher_thread_.joinable()) { event_publisher_thread_.join(); } + in_recording_ = false; + RCLCPP_INFO(get_logger(), "Recording stopped"); } void Recorder::record() { + if (in_recording_.exchange(true)) { + RCLCPP_WARN_STREAM( + get_logger(), + "Called Recorder::record() while already in recording, dismissing request."); + return; + } stop_discovery_ = record_options_.is_discovery_disabled; paused_ = record_options_.start_paused; topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; @@ -736,15 +204,13 @@ void Recorder::record() discovery_future_ = std::async(std::launch::async, std::bind(&Recorder::topics_discovery, this)); } + RCLCPP_INFO(get_logger(), "Recording..."); } void Recorder::event_publisher_thread_main() { RCLCPP_INFO(get_logger(), "Event publisher thread: Starting"); - - bool should_exit = false; - - while (!should_exit) { + while (!event_publisher_thread_should_exit_.load()) { std::unique_lock lock(event_publisher_thread_mutex_); event_publisher_thread_wake_cv_.wait( lock, @@ -768,10 +234,7 @@ void Recorder::event_publisher_thread_main() "Failed to publish message on '/events/write_split' topic."); } } - - should_exit = event_publisher_thread_should_exit_; } - RCLCPP_INFO(get_logger(), "Event publisher thread: Exiting"); } From 4348b22d07ba04a975848c38dd2c32467a85cfd3 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 18 Jun 2023 00:45:30 -0700 Subject: [PATCH 3/3] Remove `in_recording_` variable for ABI compatibility Signed-off-by: Michael Orlov --- rosbag2_transport/include/rosbag2_transport/recorder.hpp | 2 +- rosbag2_transport/src/rosbag2_transport/recorder.cpp | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index 8071f1e40c..0ce39ca1b2 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -164,7 +164,7 @@ class Recorder : public rclcpp::Node std::unordered_set topic_unknown_types_; rclcpp::Service::SharedPtr srv_snapshot_; std::atomic paused_ = false; - std::atomic in_recording_ = false; + // Keyboard handler std::shared_ptr keyboard_handler_; // Toogle paused key callback handle diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 3b2fc77275..4d0c2649a1 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -141,18 +141,11 @@ void Recorder::stop() if (event_publisher_thread_.joinable()) { event_publisher_thread_.join(); } - in_recording_ = false; RCLCPP_INFO(get_logger(), "Recording stopped"); } void Recorder::record() { - if (in_recording_.exchange(true)) { - RCLCPP_WARN_STREAM( - get_logger(), - "Called Recorder::record() while already in recording, dismissing request."); - return; - } stop_discovery_ = record_options_.is_discovery_disabled; paused_ = record_options_.start_paused; topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;