From 3753f67b379a1ca0ec1646d90b1f128cd042182e Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Sun, 4 Aug 2024 20:26:58 -0400 Subject: [PATCH] Reworked subscriber examples --- .vscode/settings.json | 3 +- examples/ros2/publisher/complex_data.cc | 8 +- examples/ros2/publisher/simple_data.cc | 7 +- examples/ros2/subscriber/CMakeLists.txt | 32 +++++-- examples/ros2/subscriber/complex_data.cc | 102 ++++++++++++++++++++++ examples/ros2/subscriber/main.cc | 104 ----------------------- examples/ros2/subscriber/simple_data.cc | 77 +++++++++++++++++ include/cactus_rt/ros2/publisher.h | 13 +-- include/cactus_rt/ros2/ros2_adapter.h | 6 +- include/cactus_rt/ros2/subscription.h | 23 ++--- src/cactus_rt/ros2/ros2_adapter.cc | 4 - 11 files changed, 229 insertions(+), 150 deletions(-) create mode 100644 examples/ros2/subscriber/complex_data.cc delete mode 100644 examples/ros2/subscriber/main.cc create mode 100644 examples/ros2/subscriber/simple_data.cc diff --git a/.vscode/settings.json b/.vscode/settings.json index 31a9bc8..c225c18 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,6 +5,7 @@ "editor.defaultFormatter": "llvm-vs-code-extensions.vscode-clangd" }, "clangd.arguments": [ - "--compile-commands-dir=${workspaceFolder}/build/debug" + "--compile-commands-dir=${workspaceFolder}/build/debug", + "--header-insertion=never" ] } diff --git a/examples/ros2/publisher/complex_data.cc b/examples/ros2/publisher/complex_data.cc index f05c72e..eb5d259 100644 --- a/examples/ros2/publisher/complex_data.cc +++ b/examples/ros2/publisher/complex_data.cc @@ -1,14 +1,10 @@ -#include -#include #include -#include +#include #include -#include +#include #include -#include "cactus_rt/ros2/ros2_adapter.h" - struct MyCoefficientData { float a; float b; diff --git a/examples/ros2/publisher/simple_data.cc b/examples/ros2/publisher/simple_data.cc index 7afc07b..2e3f458 100644 --- a/examples/ros2/publisher/simple_data.cc +++ b/examples/ros2/publisher/simple_data.cc @@ -1,15 +1,10 @@ -#include -#include #include -#include +#include #include #include -#include #include -#include "cactus_rt/ros2/ros2_adapter.h" - using RealtimeType = std_msgs::msg::Int64; using RosType = std_msgs::msg::Int64; diff --git a/examples/ros2/subscriber/CMakeLists.txt b/examples/ros2/subscriber/CMakeLists.txt index 725a44d..2663257 100644 --- a/examples/ros2/subscriber/CMakeLists.txt +++ b/examples/ros2/subscriber/CMakeLists.txt @@ -1,23 +1,43 @@ find_package(std_msgs REQUIRED) -add_executable(rt_ros2_subscriber - main.cc +add_executable(rt_ros2_subscriber_simple_data + simple_data.cc ) -target_link_libraries(rt_ros2_subscriber +target_link_libraries(rt_ros2_subscriber_simple_data PRIVATE cactus_rt ) -setup_cactus_rt_target_options(rt_ros2_subscriber) +setup_cactus_rt_target_options(rt_ros2_subscriber_simple_data) -ament_target_dependencies(rt_ros2_subscriber +ament_target_dependencies(rt_ros2_subscriber_simple_data PUBLIC std_msgs ) install( - TARGETS rt_ros2_subscriber + TARGETS rt_ros2_subscriber_simple_data DESTINATION lib/${PROJECT_NAME} ) +add_executable(rt_ros2_subscriber_complex_data + complex_data.cc +) + +target_link_libraries(rt_ros2_subscriber_complex_data + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_ros2_subscriber_complex_data) + +ament_target_dependencies(rt_ros2_subscriber_complex_data + PUBLIC + std_msgs +) + +install( + TARGETS rt_ros2_subscriber_complex_data + DESTINATION lib/${PROJECT_NAME} +) diff --git a/examples/ros2/subscriber/complex_data.cc b/examples/ros2/subscriber/complex_data.cc new file mode 100644 index 0000000..a0b16cf --- /dev/null +++ b/examples/ros2/subscriber/complex_data.cc @@ -0,0 +1,102 @@ +#include +#include + +#include +#include +#include + +struct MyCoefficientData { + float a; + float b; + float c; + float d; +}; + +using RealtimeType = MyCoefficientData; +using RosType = std_msgs::msg::Float32MultiArray; + +template <> +struct rclcpp::TypeAdapter { + using is_specialized = std::true_type; + using custom_type = RealtimeType; + using ros_message_type = RosType; + + static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { + destination.data.reserve(4); + destination.data.push_back(source.a); + destination.data.push_back(source.b); + destination.data.push_back(source.c); + destination.data.push_back(source.d); + } + + static void convert_to_custom(const ros_message_type& source, custom_type& destination) { + destination.a = source.data.at(0); + destination.b = source.data.at(1); + destination.c = source.data.at(2); + destination.d = source.data.at(3); + } +}; + +class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t last_msg_id_ = 0; + int64_t run_duration_; + + std::shared_ptr> subscription_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30)) + : cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()), + run_duration_(run_duration.count()) {} + + void InitializeForRos2() override { + subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/cactus_rt/complex", rclcpp::QoS(100)); + } + + protected: + bool Loop(int64_t elapsed_ns) noexcept override { + cactus_rt::ros2::StampedValue msg; + { + const auto span = Tracer().WithSpan("Subscription::ReadLatest"); + msg = subscription_->ReadLatest(); + } + + if (msg.id != last_msg_id_) { + LOG_INFO(Logger(), "Got new data at {}: {} [{}, {}, {}, {}]", elapsed_ns, msg.id, msg.value.a, msg.value.b, msg.value.c, msg.value.d); + last_msg_id_ = msg.id; + } + + return elapsed_ns > run_duration_; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + cactus_rt::ros2::App app("SimpleDataROS2Subscriber"); + app.StartTraceSession("build/subscriber.perfetto"); + + constexpr std::chrono::seconds time(30); + std::cout << "Testing RT loop for " << time.count() << " seconds.\n"; + + auto thread = app.CreateROS2EnabledThread(time); + app.RegisterThread(thread); + + app.Start(); + + std::this_thread::sleep_for(time); + + app.RequestStop(); + app.Join(); + + std::cout << "Done\n"; + return 0; +} diff --git a/examples/ros2/subscriber/main.cc b/examples/ros2/subscriber/main.cc deleted file mode 100644 index a1459c9..0000000 --- a/examples/ros2/subscriber/main.cc +++ /dev/null @@ -1,104 +0,0 @@ -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -using cactus_rt::CyclicThread; -using cactus_rt::ros2::App; -using cactus_rt::ros2::StampedValue; - -struct RealtimeData { - int64_t data; - - RealtimeData() = default; - explicit RealtimeData(int64_t d) : data(d) {} -}; -using RosData = std_msgs::msg::Int64; - -template <> -struct rclcpp::TypeAdapter { - using is_specialized = std::true_type; - using custom_type = RealtimeData; - using ros_message_type = RosData; - - static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { - destination.data = source.data; - } - - static void convert_to_custom(const ros_message_type& source, custom_type& destination) { - destination.data = source.data; - } -}; - -class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { - int64_t loop_counter_ = 0; - - std::shared_ptr> subscription_; - - static cactus_rt::CyclicThreadConfig CreateThreadConfig() { - cactus_rt::CyclicThreadConfig thread_config; - thread_config.period_ns = 1'000'000; - thread_config.cpu_affinity = std::vector{2}; - thread_config.SetFifoScheduler(80); - - // thread_config.tracer_config.trace_sleep = true; - thread_config.tracer_config.trace_wakeup_latency = true; - return thread_config; - } - - public: - RealtimeROS2SubscriberThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} - - void InitializeForRos2() override { - subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/hello", rclcpp::QoS(10)); - } - - int64_t GetLoopCounter() const { - return loop_counter_; - } - - protected: - bool Loop(int64_t /*now*/) noexcept final { - loop_counter_++; - if (loop_counter_ % 10 == 0) { - StampedValue data; - - { - const auto span = Tracer().WithSpan("ReadLatest"); - data = subscription_->ReadLatest(); - } - - LOG_INFO(Logger(), "Loop {}: {} {}", loop_counter_, data.id, data.value.data); - } - return false; - } -}; - -int main(int argc, char* argv[]) { - rclcpp::init(argc, argv); - - App app("RTROS2Subscriber"); - app.StartTraceSession("build/subscriber.perfetto"); - - auto thread = app.CreateROS2EnabledThread("RTROS2SubscriberThread"); - app.RegisterThread(thread); - - constexpr unsigned int time = 60; - std::cout << "Testing RT loop for " << time << " seconds.\n"; - - app.Start(); - - std::this_thread::sleep_for(std::chrono::seconds(time)); - - app.RequestStop(); - app.Join(); - - std::cout << "Number of loops executed: " << thread->GetLoopCounter() << "\n"; - return 0; -} diff --git a/examples/ros2/subscriber/simple_data.cc b/examples/ros2/subscriber/simple_data.cc new file mode 100644 index 0000000..a805ac0 --- /dev/null +++ b/examples/ros2/subscriber/simple_data.cc @@ -0,0 +1,77 @@ +#include +#include + +#include +#include +#include + +using RealtimeType = std_msgs::msg::Int64; +using RosType = std_msgs::msg::Int64; + +class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t last_msg_id_ = 0; + int64_t run_duration_; + + // We force turn off the trivial data check, because the ROS message data type + // has a defined constructor with code in it. That said, the code really is + // pretty trivial so it is safe to use in real-time. We thus disable the trivial + // type check manually. + std::shared_ptr> subscription_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30)) + : cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()), + run_duration_(run_duration.count()) {} + + void InitializeForRos2() override { + subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/cactus_rt/simple", rclcpp::QoS(100)); + } + + protected: + bool Loop(int64_t elapsed_ns) noexcept override { + cactus_rt::ros2::StampedValue msg; + { + const auto span = Tracer().WithSpan("Subscription::ReadLatest"); + msg = subscription_->ReadLatest(); + } + + if (msg.id != last_msg_id_) { + LOG_INFO(Logger(), "Got new data at {}: {} {}", elapsed_ns, msg.id, msg.value.data); + last_msg_id_ = msg.id; + } + + return elapsed_ns > run_duration_; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + cactus_rt::ros2::App app("SimpleDataROS2Subscriber"); + app.StartTraceSession("build/subscriber.perfetto"); + + constexpr std::chrono::seconds time(30); + std::cout << "Testing RT loop for " << time.count() << " seconds.\n"; + + auto thread = app.CreateROS2EnabledThread(time); + app.RegisterThread(thread); + + app.Start(); + + std::this_thread::sleep_for(time); + + app.RequestStop(); + app.Join(); + + std::cout << "Done\n"; + return 0; +} diff --git a/include/cactus_rt/ros2/publisher.h b/include/cactus_rt/ros2/publisher.h index c99478d..506bfd5 100644 --- a/include/cactus_rt/ros2/publisher.h +++ b/include/cactus_rt/ros2/publisher.h @@ -1,9 +1,9 @@ #ifndef CACTUS_RT_ROS2_PUBLISHER_H_ #define CACTUS_RT_ROS2_PUBLISHER_H_ -#include #include +#include #include #include @@ -20,8 +20,6 @@ class IPublisher { virtual ~IPublisher() = default; }; -#define print_type_of(_x) your_type_was_() - template class Publisher : public IPublisher { friend class Ros2Adapter; @@ -29,7 +27,7 @@ class Publisher : public IPublisher { static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v, "RealtimeT must be a trivial object to be real-time safe"); using NoConversion = std::is_same; - using AdaptedRosType = typename std::conditional>::type; + using AdaptedRosType = typename std::conditional_t>; typename rclcpp::Publisher::SharedPtr publisher_; moodycamel::ReaderWriterQueue queue_; @@ -81,12 +79,9 @@ class Publisher : public IPublisher { const rclcpp::QoS& qos, const size_t rt_queue_size = 1000 ) { - typename rclcpp::Publisher::SharedPtr publisher = node.create_publisher(topic_name, qos); - typename moodycamel::ReaderWriterQueue queue(rt_queue_size); - return std::make_shared>( - std::move(publisher), - std::move(queue) + node.create_publisher(topic_name, qos), + moodycamel::ReaderWriterQueue(rt_queue_size) ); } diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index c80acc4..1899872 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -59,12 +59,12 @@ class Ros2Adapter { return publisher; } - template - std::shared_ptr> CreateSubscriptionForLatestValue( + template + std::shared_ptr> CreateSubscriptionForLatestValue( const std::string& topic_name, const rclcpp::QoS& qos ) { - auto subscriber = SubscriptionLatest::Create(*this->ros_node_, topic_name, qos); + auto subscriber = SubscriptionLatest::Create(*this->ros_node_, topic_name, qos); const std::scoped_lock lock(mut_); subscriptions_.push_back(subscriber); diff --git a/include/cactus_rt/ros2/subscription.h b/include/cactus_rt/ros2/subscription.h index 952d452..ccd1508 100644 --- a/include/cactus_rt/ros2/subscription.h +++ b/include/cactus_rt/ros2/subscription.h @@ -3,6 +3,7 @@ #include #include +#include #include "../experimental/lockless/spsc/realtime_readable_value.h" @@ -20,26 +21,26 @@ class ISubscription { template struct StampedValue { - uint64_t id; + int64_t id; RealtimeT value; StampedValue() = default; - StampedValue(const uint64_t i, const RealtimeT& v) : id(i), value(v) {} + StampedValue(const int64_t i, const RealtimeT& v) : id(i), value(v) {} }; -// TODO: Theoretically it is possible to not use the type converter if we don't -// copy the data on read with RealtimeReadableValue. -template +template class SubscriptionLatest : public ISubscription { friend class Ros2Adapter; - static_assert(std::is_trivial_v, "RealtimeT must be a trivial object to be real-time safe"); - using AdaptedRosType = rclcpp::TypeAdapter; + static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v, "RealtimeT must be a trivial object to be real-time safe"); + + using NoConversion = std::is_same; + using AdaptedRosType = typename std::conditional_t>; using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue>; typename rclcpp::Subscription::SharedPtr ros_subscription_; - uint64_t current_msg_id_ = 0; + int64_t current_msg_id_ = 0; RealtimeReadableValue latest_value_; void SubscriptionCallback(const RealtimeT& rt_msg) { @@ -53,12 +54,12 @@ class SubscriptionLatest : public ISubscription { latest_value_.Write(stamped_value); } - static std::shared_ptr Create( + static std::shared_ptr> Create( rclcpp::Node& node, const std::string& topic_name, const rclcpp::QoS& qos ) { - auto subscription = std::make_shared>(); + auto subscription = std::make_shared>(); subscription->ros_subscription_ = node.create_subscription( topic_name, @@ -80,7 +81,7 @@ class SubscriptionLatest : public ISubscription { */ SubscriptionLatest() = default; - inline StampedValue ReadLatest() noexcept { + StampedValue ReadLatest() noexcept { return latest_value_.Read(); } }; diff --git a/src/cactus_rt/ros2/ros2_adapter.cc b/src/cactus_rt/ros2/ros2_adapter.cc index 9226d02..a1d0fbc 100644 --- a/src/cactus_rt/ros2/ros2_adapter.cc +++ b/src/cactus_rt/ros2/ros2_adapter.cc @@ -2,8 +2,6 @@ #include -#include - namespace cactus_rt::ros2 { Ros2Adapter::Ros2Adapter(const std::string& name, const Ros2Adapter::Config& config) @@ -18,8 +16,6 @@ void Ros2Adapter::TimerCallback() { void Ros2Adapter::DrainQueues() { const std::scoped_lock lock(mut_); - const auto& logger = this->ros_node_->get_logger(); - for (const auto& publisher : publishers_) { // Hopefully the thread is not publishing so quickly that a single // publisher monopolizes all resources. That said, if that happens the