Skip to content

Commit

Permalink
Add publish by loaned message in GenericPublisher (ros2#1856)
Browse files Browse the repository at this point in the history
* Add publish by loaned message in GenericPublisher

Signed-off-by: Barry Xu <[email protected]>
Co-authored-by: Alban Tamisier <[email protected]>
  • Loading branch information
Barry-Xu-2018 and Alban Tamisier authored Mar 28, 2022
1 parent d302e3c commit 492770c
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 17 deletions.
17 changes: 17 additions & 0 deletions rclcpp/include/rclcpp/generic_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "rclcpp/typesupport_helpers.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rmw/rmw.h"

namespace rclcpp
{

Expand Down Expand Up @@ -116,9 +118,24 @@ class GenericPublisher : public rclcpp::PublisherBase
RCLCPP_PUBLIC
void publish(const rclcpp::SerializedMessage & message);

/**
* Publish a rclcpp::SerializedMessage via loaned message after de-serialization.
*
* \param message a serialized message
* \throws anything rclcpp::exceptions::throw_from_rcl_error can show
*/
RCLCPP_PUBLIC
void publish_as_loaned_msg(const rclcpp::SerializedMessage & message);

private:
// The type support library should stay loaded, so it is stored in the GenericPublisher
std::shared_ptr<rcpputils::SharedLibrary> ts_lib_;

void * borrow_loaned_message();
void deserialize_message(
const rmw_serialized_message_t & serialized_message,
void * deserialized_msg);
void publish_loaned_message(void * loaned_message);
};

} // namespace rclcpp
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
uint64_t intra_process_publisher_id_;

rmw_gid_t rmw_gid_;

const rosidl_message_type_support_t type_support_;
};

} // namespace rclcpp
Expand Down
45 changes: 45 additions & 0 deletions rclcpp/src/rclcpp/generic_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,49 @@ void GenericPublisher::publish(const rclcpp::SerializedMessage & message)
}
}

void GenericPublisher::publish_as_loaned_msg(const rclcpp::SerializedMessage & message)
{
auto loaned_message = borrow_loaned_message();
deserialize_message(message.get_rcl_serialized_message(), loaned_message);
publish_loaned_message(loaned_message);
}

void * GenericPublisher::borrow_loaned_message()
{
void * loaned_message = nullptr;
auto return_code = rcl_borrow_loaned_message(
get_publisher_handle().get(), &type_support_, &loaned_message);

if (return_code != RMW_RET_OK) {
if (return_code == RCL_RET_UNSUPPORTED) {
rclcpp::exceptions::throw_from_rcl_error(
return_code,
"current middleware cannot support loan messages");
} else {
rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to borrow loaned msg");
}
}
return loaned_message;
}

void GenericPublisher::deserialize_message(
const rmw_serialized_message_t & serialized_message,
void * deserialized_msg)
{
auto return_code = rmw_deserialize(&serialized_message, &type_support_, deserialized_msg);
if (return_code != RMW_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to deserialize msg");
}
}

void GenericPublisher::publish_loaned_message(void * loaned_message)
{
auto return_code = rcl_publish_loaned_message(
get_publisher_handle().get(), loaned_message, NULL);

if (return_code != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to publish loaned message");
}
}

} // namespace rclcpp
4 changes: 3 additions & 1 deletion rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_is_enabled_(false), intra_process_publisher_id_(0)
intra_process_is_enabled_(false),
intra_process_publisher_id_(0),
type_support_(type_support)
{
auto custom_deleter = [node_handle = this->rcl_node_handle_](rcl_publisher_t * rcl_pub)
{
Expand Down
101 changes: 85 additions & 16 deletions rclcpp/test/rclcpp/test_generic_pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,37 @@ class RclcppGenericNodeFixture : public Test
publishers_.push_back(publisher);
}

std::vector<std::string> subscribe_raw_messages(
size_t expected_messages_number, const std::string & topic_name, const std::string & type)
template<typename T1, typename T2>
std::vector<T1> subscribe_raw_messages(
size_t expected_recv_msg_count, const std::string & topic_name, const std::string & type)
{
std::vector<std::string> messages;
std::vector<T1> messages;
size_t counter = 0;
auto subscription = node_->create_generic_subscription(
topic_name, type, rclcpp::QoS(1),
[&counter, &messages](std::shared_ptr<rclcpp::SerializedMessage> message) {
test_msgs::msg::Strings string_message;
rclcpp::Serialization<test_msgs::msg::Strings> serializer;
serializer.deserialize_message(message.get(), &string_message);
messages.push_back(string_message.string_value);
[&counter, &messages, this](std::shared_ptr<rclcpp::SerializedMessage> message) {
T2 deserialized_message;
rclcpp::Serialization<T2> serializer;
serializer.deserialize_message(message.get(), &deserialized_message);
messages.push_back(this->get_data_from_msg(deserialized_message));
counter++;
});

while (counter < expected_messages_number) {
while (counter < expected_recv_msg_count) {
rclcpp::spin_some(node_);
}
return messages;
}

rclcpp::SerializedMessage serialize_string_message(const std::string & message)
template<typename T1, typename T2>
rclcpp::SerializedMessage serialize_message(const T1 & data)
{
test_msgs::msg::Strings string_message;
string_message.string_value = message;
rclcpp::Serialization<test_msgs::msg::Strings> ser;
T2 message;
write_message(data, message);

rclcpp::Serialization<T2> ser;
SerializedMessage result;
ser.serialize_message(&string_message, &result);
ser.serialize_message(&message, &result);
return result;
}

Expand All @@ -115,6 +118,27 @@ class RclcppGenericNodeFixture : public Test
std::shared_ptr<rclcpp::Node> node_;
rclcpp::Node::SharedPtr publisher_node_;
std::vector<std::shared_ptr<rclcpp::PublisherBase>> publishers_;

private:
void write_message(const std::string & data, test_msgs::msg::Strings & message)
{
message.string_value = data;
}

void write_message(const int64_t & data, test_msgs::msg::BasicTypes & message)
{
message.int64_value = data;
}

std::string get_data_from_msg(const test_msgs::msg::Strings & message)
{
return message.string_value;
}

int64_t get_data_from_msg(const test_msgs::msg::BasicTypes & message)
{
return message.int64_value;
}
};


Expand All @@ -130,7 +154,7 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work)

auto subscriber_future_ = std::async(
std::launch::async, [this, topic_name, type] {
return subscribe_raw_messages(1, topic_name, type);
return subscribe_raw_messages<std::string, test_msgs::msg::Strings>(1, topic_name, type);
});

// TODO(karsten1987): Port 'wait_for_sub' to rclcpp
Expand All @@ -147,14 +171,59 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work)
ASSERT_TRUE(success);

for (const auto & message : test_messages) {
publisher->publish(serialize_string_message(message));
publisher->publish(serialize_message<std::string, test_msgs::msg::Strings>(message));
}

auto subscribed_messages = subscriber_future_.get();
EXPECT_THAT(subscribed_messages, SizeIs(Not(0)));
EXPECT_THAT(subscribed_messages[0], StrEq("Hello World"));
}

TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work)
{
// We currently publish more messages because they can get lost
std::vector<int64_t> test_messages = {100, 100};
std::string topic_name = "/int64_topic";
std::string type = "test_msgs/msg/BasicTypes";

auto publisher = node_->create_generic_publisher(topic_name, type, rclcpp::QoS(1));

if (publisher->can_loan_messages()) {
auto subscriber_future_ = std::async(
std::launch::deferred, [this, topic_name, type] {
return subscribe_raw_messages<int64_t, test_msgs::msg::BasicTypes>(
1, topic_name, type);
});

auto allocator = node_->get_node_options().allocator();
auto success = false;
auto ret = rcl_wait_for_subscribers(
node_->get_node_base_interface()->get_rcl_node_handle(),
&allocator,
topic_name.c_str(),
1u,
static_cast<rcutils_duration_value_t>(2e9),
&success);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
ASSERT_TRUE(success);

for (const auto & message : test_messages) {
publisher->publish_as_loaned_msg(
serialize_message<int64_t, test_msgs::msg::BasicTypes>(message));
}

auto subscribed_messages = subscriber_future_.get();
EXPECT_THAT(subscribed_messages, SizeIs(Not(0)));
EXPECT_EQ(subscribed_messages[0], test_messages[0]);
} else {
ASSERT_THROW(
{
publisher->publish_as_loaned_msg(
serialize_message<int64_t, test_msgs::msg::BasicTypes>(test_messages[0]));
}, rclcpp::exceptions::RCLError);
}
}

TEST_F(RclcppGenericNodeFixture, generic_subscription_uses_qos)
{
// If the GenericSubscription does not use the provided QoS profile,
Expand Down

0 comments on commit 492770c

Please sign in to comment.