Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Reworked subscriber examples
Browse files Browse the repository at this point in the history
shuhaowu committed Aug 5, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent f45dd63 commit 4e15a49
Showing 10 changed files with 229 additions and 146 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
"editor.defaultFormatter": "llvm-vs-code-extensions.vscode-clangd"
},
"clangd.arguments": [
"--compile-commands-dir=${workspaceFolder}/build/debug"
"--compile-commands-dir=${workspaceFolder}/build/debug",
"--header-insertion=never"
]
}
8 changes: 2 additions & 6 deletions examples/ros2/publisher/complex_data.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#include <cactus_rt/config.h>
#include <cactus_rt/cyclic_thread.h>
#include <cactus_rt/ros2/app.h>
#include <quill/detail/LogMacros.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <rclcpp/qos.hpp>
#include <memory>
#include <std_msgs/msg/float32_multi_array.hpp>

#include "cactus_rt/ros2/ros2_adapter.h"

struct MyCoefficientData {
float a;
float b;
7 changes: 1 addition & 6 deletions examples/ros2/publisher/simple_data.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
#include <cactus_rt/config.h>
#include <cactus_rt/cyclic_thread.h>
#include <cactus_rt/ros2/app.h>
#include <quill/detail/LogMacros.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <rclcpp/qos.hpp>
#include <std_msgs/msg/int64.hpp>

#include "cactus_rt/ros2/ros2_adapter.h"

using RealtimeType = std_msgs::msg::Int64;
using RosType = std_msgs::msg::Int64;

32 changes: 26 additions & 6 deletions examples/ros2/subscriber/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,23 +1,43 @@
find_package(std_msgs REQUIRED)

add_executable(rt_ros2_subscriber
main.cc
add_executable(rt_ros2_subscriber_simple_data
simple_data.cc
)

target_link_libraries(rt_ros2_subscriber
target_link_libraries(rt_ros2_subscriber_simple_data
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_ros2_subscriber)
setup_cactus_rt_target_options(rt_ros2_subscriber_simple_data)

ament_target_dependencies(rt_ros2_subscriber
ament_target_dependencies(rt_ros2_subscriber_simple_data
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_subscriber
TARGETS rt_ros2_subscriber_simple_data
DESTINATION lib/${PROJECT_NAME}
)

add_executable(rt_ros2_subscriber_complex_data
complex_data.cc
)

target_link_libraries(rt_ros2_subscriber_complex_data
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_ros2_subscriber_complex_data)

ament_target_dependencies(rt_ros2_subscriber_complex_data
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_subscriber_complex_data
DESTINATION lib/${PROJECT_NAME}
)
102 changes: 102 additions & 0 deletions examples/ros2/subscriber/complex_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <std_msgs/msg/float32_multi_array.hpp>

struct MyCoefficientData {
float a;
float b;
float c;
float d;
};

using RealtimeType = MyCoefficientData;
using RosType = std_msgs::msg::Float32MultiArray;

template <>
struct rclcpp::TypeAdapter<RealtimeType, RosType> {
using is_specialized = std::true_type;
using custom_type = RealtimeType;
using ros_message_type = RosType;

static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) {
destination.data.reserve(4);
destination.data.push_back(source.a);
destination.data.push_back(source.b);
destination.data.push_back(source.c);
destination.data.push_back(source.d);
}

static void convert_to_custom(const ros_message_type& source, custom_type& destination) {
destination.a = source.data.at(0);
destination.b = source.data.at(1);
destination.c = source.data.at(2);
destination.d = source.data.at(3);
}
};

class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t last_msg_id_ = 0;
int64_t run_duration_;

std::shared_ptr<cactus_rt::ros2::SubscriptionLatest<RealtimeType, RosType>> subscription_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30))
: cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()),
run_duration_(run_duration.count()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeType, RosType>("/cactus_rt/complex", rclcpp::QoS(100));
}

protected:
bool Loop(int64_t elapsed_ns) noexcept override {
cactus_rt::ros2::StampedValue<MyCoefficientData> msg;
{
const auto span = Tracer().WithSpan("Subscription::ReadLatest");
msg = subscription_->ReadLatest();
}

if (msg.id != last_msg_id_) {
LOG_INFO(Logger(), "Got new data at {}: {} [{}, {}, {}, {}]", elapsed_ns, msg.id, msg.value.a, msg.value.b, msg.value.c, msg.value.d);
last_msg_id_ = msg.id;
}

return elapsed_ns > run_duration_;
}
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);

cactus_rt::ros2::App app("SimpleDataROS2Subscriber");
app.StartTraceSession("build/subscriber.perfetto");

constexpr std::chrono::seconds time(30);
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);
app.RegisterThread(thread);

app.Start();

std::this_thread::sleep_for(time);

app.RequestStop();
app.Join();

std::cout << "Done\n";
return 0;
}
104 changes: 0 additions & 104 deletions examples/ros2/subscriber/main.cc

This file was deleted.

77 changes: 77 additions & 0 deletions examples/ros2/subscriber/simple_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <std_msgs/msg/int64.hpp>

using RealtimeType = std_msgs::msg::Int64;
using RosType = std_msgs::msg::Int64;

class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t last_msg_id_ = 0;
int64_t run_duration_;

// We force turn off the trivial data check, because the ROS message data type
// has a defined constructor with code in it. That said, the code really is
// pretty trivial so it is safe to use in real-time. We thus disable the trivial
// type check manually.
std::shared_ptr<cactus_rt::ros2::SubscriptionLatest<RealtimeType, RosType, false>> subscription_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30))
: cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()),
run_duration_(run_duration.count()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeType, RosType, false>("/cactus_rt/simple", rclcpp::QoS(100));
}

protected:
bool Loop(int64_t elapsed_ns) noexcept override {
cactus_rt::ros2::StampedValue<std_msgs::msg::Int64> msg;
{
const auto span = Tracer().WithSpan("Subscription::ReadLatest");
msg = subscription_->ReadLatest();
}

if (msg.id != last_msg_id_) {
LOG_INFO(Logger(), "Got new data at {}: {} {}", elapsed_ns, msg.id, msg.value.data);
last_msg_id_ = msg.id;
}

return elapsed_ns > run_duration_;
}
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);

cactus_rt::ros2::App app("SimpleDataROS2Subscriber");
app.StartTraceSession("build/subscriber.perfetto");

constexpr std::chrono::seconds time(30);
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);
app.RegisterThread(thread);

app.Start();

std::this_thread::sleep_for(time);

app.RequestStop();
app.Join();

std::cout << "Done\n";
return 0;
}
13 changes: 4 additions & 9 deletions include/cactus_rt/ros2/publisher.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#ifndef CACTUS_RT_ROS2_PUBLISHER_H_
#define CACTUS_RT_ROS2_PUBLISHER_H_

#include <quill/detail/LogMacros.h>
#include <readerwriterqueue.h>

#include <memory>
#include <rclcpp/rclcpp.hpp>
#include <type_traits>

@@ -20,16 +20,14 @@ class IPublisher {
virtual ~IPublisher() = default;
};

#define print_type_of(_x) your_type_was_<decltype(_x)>()

template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
class Publisher : public IPublisher {
friend class Ros2Adapter;

static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v<RealtimeT>, "RealtimeT must be a trivial object to be real-time safe");

using NoConversion = std::is_same<RealtimeT, RosT>;
using AdaptedRosType = typename std::conditional<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>::type;
using AdaptedRosType = typename std::conditional_t<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>;

typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;
@@ -81,12 +79,9 @@ class Publisher : public IPublisher {
const rclcpp::QoS& qos,
const size_t rt_queue_size = 1000
) {
typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher = node.create_publisher<AdaptedRosType>(topic_name, qos);
typename moodycamel::ReaderWriterQueue<RealtimeT> queue(rt_queue_size);

return std::make_shared<Publisher<RealtimeT, RosT, CheckForTrivialRealtimeT>>(
std::move(publisher),
std::move(queue)
node.create_publisher<AdaptedRosType>(topic_name, qos),
moodycamel::ReaderWriterQueue<RealtimeT>(rt_queue_size)
);
}

6 changes: 3 additions & 3 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
@@ -59,12 +59,12 @@ class Ros2Adapter {
return publisher;
}

template <typename RealtimeT, typename RosT>
std::shared_ptr<SubscriptionLatest<RealtimeT, RosT>> CreateSubscriptionForLatestValue(
template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
std::shared_ptr<SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>> CreateSubscriptionForLatestValue(
const std::string& topic_name,
const rclcpp::QoS& qos
) {
auto subscriber = SubscriptionLatest<RealtimeT, RosT>::Create(*this->ros_node_, topic_name, qos);
auto subscriber = SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(*this->ros_node_, topic_name, qos);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscriber);
23 changes: 12 additions & 11 deletions include/cactus_rt/ros2/subscription.h
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@

#include <memory>
#include <rclcpp/rclcpp.hpp>
#include <type_traits>

#include "../experimental/lockless/spsc/realtime_readable_value.h"

@@ -20,26 +21,26 @@ class ISubscription {

template <typename RealtimeT>
struct StampedValue {
uint64_t id;
int64_t id;
RealtimeT value;

StampedValue() = default;
StampedValue(const uint64_t i, const RealtimeT& v) : id(i), value(v) {}
StampedValue(const int64_t i, const RealtimeT& v) : id(i), value(v) {}
};

// TODO: Theoretically it is possible to not use the type converter if we don't
// copy the data on read with RealtimeReadableValue.
template <typename RealtimeT, typename RosT>
template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
class SubscriptionLatest : public ISubscription {
friend class Ros2Adapter;

static_assert(std::is_trivial_v<RealtimeT>, "RealtimeT must be a trivial object to be real-time safe");
using AdaptedRosType = rclcpp::TypeAdapter<RealtimeT, RosT>;
static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v<RealtimeT>, "RealtimeT must be a trivial object to be real-time safe");

using NoConversion = std::is_same<RealtimeT, RosT>;
using AdaptedRosType = typename std::conditional_t<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>;

using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue<StampedValue<RealtimeT>>;

typename rclcpp::Subscription<AdaptedRosType>::SharedPtr ros_subscription_;
uint64_t current_msg_id_ = 0;
int64_t current_msg_id_ = 0;
RealtimeReadableValue latest_value_;

void SubscriptionCallback(const RealtimeT& rt_msg) {
@@ -53,12 +54,12 @@ class SubscriptionLatest : public ISubscription {
latest_value_.Write(stamped_value);
}

static std::shared_ptr<SubscriptionLatest> Create(
static std::shared_ptr<SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>> Create(
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos
) {
auto subscription = std::make_shared<SubscriptionLatest<RealtimeT, RosT>>();
auto subscription = std::make_shared<SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>>();

subscription->ros_subscription_ = node.create_subscription<AdaptedRosType>(
topic_name,
@@ -80,7 +81,7 @@ class SubscriptionLatest : public ISubscription {
*/
SubscriptionLatest() = default;

inline StampedValue<RealtimeT> ReadLatest() noexcept {
StampedValue<RealtimeT> ReadLatest() noexcept {
return latest_value_.Read();
}
};

0 comments on commit 4e15a49

Please sign in to comment.