From 492770c12f5427effdb7f24a03aebc55b2532d63 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Mon, 28 Mar 2022 13:06:17 +0800 Subject: [PATCH] Add publish by loaned message in GenericPublisher (#1856) * Add publish by loaned message in GenericPublisher Signed-off-by: Barry Xu Co-authored-by: Alban Tamisier --- rclcpp/include/rclcpp/generic_publisher.hpp | 17 ++++ rclcpp/include/rclcpp/publisher_base.hpp | 2 + rclcpp/src/rclcpp/generic_publisher.cpp | 45 +++++++++ rclcpp/src/rclcpp/publisher_base.cpp | 4 +- rclcpp/test/rclcpp/test_generic_pubsub.cpp | 101 ++++++++++++++++---- 5 files changed, 152 insertions(+), 17 deletions(-) diff --git a/rclcpp/include/rclcpp/generic_publisher.hpp b/rclcpp/include/rclcpp/generic_publisher.hpp index e379f09427..e1b46002bc 100644 --- a/rclcpp/include/rclcpp/generic_publisher.hpp +++ b/rclcpp/include/rclcpp/generic_publisher.hpp @@ -31,6 +31,8 @@ #include "rclcpp/typesupport_helpers.hpp" #include "rclcpp/visibility_control.hpp" +#include "rmw/rmw.h" + namespace rclcpp { @@ -116,9 +118,24 @@ class GenericPublisher : public rclcpp::PublisherBase RCLCPP_PUBLIC void publish(const rclcpp::SerializedMessage & message); + /** + * Publish a rclcpp::SerializedMessage via loaned message after de-serialization. + * + * \param message a serialized message + * \throws anything rclcpp::exceptions::throw_from_rcl_error can show + */ + RCLCPP_PUBLIC + void publish_as_loaned_msg(const rclcpp::SerializedMessage & message); + private: // The type support library should stay loaded, so it is stored in the GenericPublisher std::shared_ptr ts_lib_; + + void * borrow_loaned_message(); + void deserialize_message( + const rmw_serialized_message_t & serialized_message, + void * deserialized_msg); + void publish_loaned_message(void * loaned_message); }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index cdbd4bb758..8416757a53 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -346,6 +346,8 @@ class PublisherBase : public std::enable_shared_from_this uint64_t intra_process_publisher_id_; rmw_gid_t rmw_gid_; + + const rosidl_message_type_support_t type_support_; }; } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/generic_publisher.cpp b/rclcpp/src/rclcpp/generic_publisher.cpp index 733aa5dd5c..87ace1eb19 100644 --- a/rclcpp/src/rclcpp/generic_publisher.cpp +++ b/rclcpp/src/rclcpp/generic_publisher.cpp @@ -31,4 +31,49 @@ void GenericPublisher::publish(const rclcpp::SerializedMessage & message) } } +void GenericPublisher::publish_as_loaned_msg(const rclcpp::SerializedMessage & message) +{ + auto loaned_message = borrow_loaned_message(); + deserialize_message(message.get_rcl_serialized_message(), loaned_message); + publish_loaned_message(loaned_message); +} + +void * GenericPublisher::borrow_loaned_message() +{ + void * loaned_message = nullptr; + auto return_code = rcl_borrow_loaned_message( + get_publisher_handle().get(), &type_support_, &loaned_message); + + if (return_code != RMW_RET_OK) { + if (return_code == RCL_RET_UNSUPPORTED) { + rclcpp::exceptions::throw_from_rcl_error( + return_code, + "current middleware cannot support loan messages"); + } else { + rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to borrow loaned msg"); + } + } + return loaned_message; +} + +void GenericPublisher::deserialize_message( + const rmw_serialized_message_t & serialized_message, + void * deserialized_msg) +{ + auto return_code = rmw_deserialize(&serialized_message, &type_support_, deserialized_msg); + if (return_code != RMW_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to deserialize msg"); + } +} + +void GenericPublisher::publish_loaned_message(void * loaned_message) +{ + auto return_code = rcl_publish_loaned_message( + get_publisher_handle().get(), loaned_message, NULL); + + if (return_code != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to publish loaned message"); + } +} + } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index dd027f2e65..723d56e0d5 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -47,7 +47,9 @@ PublisherBase::PublisherBase( const rosidl_message_type_support_t & type_support, const rcl_publisher_options_t & publisher_options) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - intra_process_is_enabled_(false), intra_process_publisher_id_(0) + intra_process_is_enabled_(false), + intra_process_publisher_id_(0), + type_support_(type_support) { auto custom_deleter = [node_handle = this->rcl_node_handle_](rcl_publisher_t * rcl_pub) { diff --git a/rclcpp/test/rclcpp/test_generic_pubsub.cpp b/rclcpp/test/rclcpp/test_generic_pubsub.cpp index ff34ec52b3..0d2bec588a 100644 --- a/rclcpp/test/rclcpp/test_generic_pubsub.cpp +++ b/rclcpp/test/rclcpp/test_generic_pubsub.cpp @@ -61,34 +61,37 @@ class RclcppGenericNodeFixture : public Test publishers_.push_back(publisher); } - std::vector subscribe_raw_messages( - size_t expected_messages_number, const std::string & topic_name, const std::string & type) + template + std::vector subscribe_raw_messages( + size_t expected_recv_msg_count, const std::string & topic_name, const std::string & type) { - std::vector messages; + std::vector messages; size_t counter = 0; auto subscription = node_->create_generic_subscription( topic_name, type, rclcpp::QoS(1), - [&counter, &messages](std::shared_ptr message) { - test_msgs::msg::Strings string_message; - rclcpp::Serialization serializer; - serializer.deserialize_message(message.get(), &string_message); - messages.push_back(string_message.string_value); + [&counter, &messages, this](std::shared_ptr message) { + T2 deserialized_message; + rclcpp::Serialization serializer; + serializer.deserialize_message(message.get(), &deserialized_message); + messages.push_back(this->get_data_from_msg(deserialized_message)); counter++; }); - while (counter < expected_messages_number) { + while (counter < expected_recv_msg_count) { rclcpp::spin_some(node_); } return messages; } - rclcpp::SerializedMessage serialize_string_message(const std::string & message) + template + rclcpp::SerializedMessage serialize_message(const T1 & data) { - test_msgs::msg::Strings string_message; - string_message.string_value = message; - rclcpp::Serialization ser; + T2 message; + write_message(data, message); + + rclcpp::Serialization ser; SerializedMessage result; - ser.serialize_message(&string_message, &result); + ser.serialize_message(&message, &result); return result; } @@ -115,6 +118,27 @@ class RclcppGenericNodeFixture : public Test std::shared_ptr node_; rclcpp::Node::SharedPtr publisher_node_; std::vector> publishers_; + +private: + void write_message(const std::string & data, test_msgs::msg::Strings & message) + { + message.string_value = data; + } + + void write_message(const int64_t & data, test_msgs::msg::BasicTypes & message) + { + message.int64_value = data; + } + + std::string get_data_from_msg(const test_msgs::msg::Strings & message) + { + return message.string_value; + } + + int64_t get_data_from_msg(const test_msgs::msg::BasicTypes & message) + { + return message.int64_value; + } }; @@ -130,7 +154,7 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work) auto subscriber_future_ = std::async( std::launch::async, [this, topic_name, type] { - return subscribe_raw_messages(1, topic_name, type); + return subscribe_raw_messages(1, topic_name, type); }); // TODO(karsten1987): Port 'wait_for_sub' to rclcpp @@ -147,7 +171,7 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work) ASSERT_TRUE(success); for (const auto & message : test_messages) { - publisher->publish(serialize_string_message(message)); + publisher->publish(serialize_message(message)); } auto subscribed_messages = subscriber_future_.get(); @@ -155,6 +179,51 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work) EXPECT_THAT(subscribed_messages[0], StrEq("Hello World")); } +TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work) +{ + // We currently publish more messages because they can get lost + std::vector test_messages = {100, 100}; + std::string topic_name = "/int64_topic"; + std::string type = "test_msgs/msg/BasicTypes"; + + auto publisher = node_->create_generic_publisher(topic_name, type, rclcpp::QoS(1)); + + if (publisher->can_loan_messages()) { + auto subscriber_future_ = std::async( + std::launch::deferred, [this, topic_name, type] { + return subscribe_raw_messages( + 1, topic_name, type); + }); + + auto allocator = node_->get_node_options().allocator(); + auto success = false; + auto ret = rcl_wait_for_subscribers( + node_->get_node_base_interface()->get_rcl_node_handle(), + &allocator, + topic_name.c_str(), + 1u, + static_cast(2e9), + &success); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_TRUE(success); + + for (const auto & message : test_messages) { + publisher->publish_as_loaned_msg( + serialize_message(message)); + } + + auto subscribed_messages = subscriber_future_.get(); + EXPECT_THAT(subscribed_messages, SizeIs(Not(0))); + EXPECT_EQ(subscribed_messages[0], test_messages[0]); + } else { + ASSERT_THROW( + { + publisher->publish_as_loaned_msg( + serialize_message(test_messages[0])); + }, rclcpp::exceptions::RCLError); + } +} + TEST_F(RclcppGenericNodeFixture, generic_subscription_uses_qos) { // If the GenericSubscription does not use the provided QoS profile,