Skip to content

Commit

Permalink
Added basic scaffolding for ROS2 subscriber
Browse files Browse the repository at this point in the history
This is pretty inefficiently implemented for now. This only implements
the latest value subscriber.

Need to significantly overhaul the code to not use a functional
converter, but a type which would enable better compile time detection
of wrong usage. Possibly reuse rclcpp::TypeAdapter for this. That's in a
future commit tho. Right now this sets up some temporary
code/scaffolding.

Will need to implement an "all value" subscriber with a queue as well,
but this will happen after the TypeAdapter overhaul. Some
simplification/optimization might also occur before that.
  • Loading branch information
shuhaowu committed Aug 4, 2024
1 parent c6e4ef2 commit 989c7d1
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 16 deletions.
1 change: 1 addition & 0 deletions cmake/ros2.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ ament_target_dependencies(cactus_rt
)

add_subdirectory(examples/ros2/publisher)
add_subdirectory(examples/ros2/subscriber)

ament_package()
2 changes: 1 addition & 1 deletion examples/ros2/publisher/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2:
int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);

App app;
App app("RTROS2Publisher");
app.StartTraceSession("build/data.perfetto");

auto thread = app.CreateROS2EnabledThread<RealtimeROS2PublisherThread>("RTROS2PublisherThread");
Expand Down
23 changes: 23 additions & 0 deletions examples/ros2/subscriber/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
find_package(std_msgs REQUIRED)

add_executable(rt_ros2_subscriber
main.cc
)

target_link_libraries(rt_ros2_subscriber
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_ros2_subscriber)

ament_target_dependencies(rt_ros2_subscriber
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_subscriber
DESTINATION lib/${PROJECT_NAME}
)

97 changes: 97 additions & 0 deletions examples/ros2/subscriber/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <iostream>
#include <memory>
#include <std_msgs/msg/int64.hpp>
#include <thread>

#include "cactus_rt/ros2/subscription.h"

using cactus_rt::CyclicThread;
using cactus_rt::ros2::App;
using cactus_rt::ros2::StampedValue;

struct RealtimeData {
int64_t data;

RealtimeData() = default;
explicit RealtimeData(int64_t d) : data(d) {}
};
using RosData = std_msgs::msg::Int64;

namespace {
RealtimeData ROS2ToRealtimeConverterFunc(const RosData& ros_data) {
// A bit of a silly example, but gets the point across.
RealtimeData rt_data(ros_data.data);
return rt_data;
}
} // namespace

class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t loop_counter_ = 0;

std::shared_ptr<cactus_rt::ros2::SubscriptionLatest<RealtimeData, RosData>> subscription_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

// thread_config.tracer_config.trace_sleep = true;
thread_config.tracer_config.trace_wakeup_latency = true;
return thread_config;
}

public:
RealtimeROS2SubscriberThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeData, RosData>("/hello", rclcpp::QoS(10), ROS2ToRealtimeConverterFunc);
}

int64_t GetLoopCounter() const {
return loop_counter_;
}

protected:
bool Loop(int64_t /*now*/) noexcept final {
loop_counter_++;
if (loop_counter_ % 10 == 0) {
StampedValue<RealtimeData> data;

{
const auto span = Tracer().WithSpan("ReadLatest");
data = subscription_->ReadLatest();
}

LOG_INFO(Logger(), "Loop {}: {}", loop_counter_, data.value.data);
}
return false;
}
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);

App app("RTROS2Subscriber");
app.StartTraceSession("build/subscriber.perfetto");

auto thread = app.CreateROS2EnabledThread<RealtimeROS2SubscriberThread>("RTROS2SubscriberThread");
app.RegisterThread(thread);

constexpr unsigned int time = 60;
std::cout << "Testing RT loop for " << time << " seconds.\n";

app.Start();

std::this_thread::sleep_for(std::chrono::seconds(time));

app.RequestStop();
app.Join();

std::cout << "Number of loops executed: " << thread->GetLoopCounter() << "\n";
return 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@ class RealtimeReadableValue {
* This atomically reads the value. It returns a copy of the data.
*/
T Read() {
// TODO: static assert that T is a POD.

// TODO: need to figure out the atomic memory order here!
T* data_ptr = atomic_ptr_.exchange(nullptr);
T data = *data_ptr;
atomic_ptr_.store(data_ptr);
return data;
}

// TODO: maybe create a "loan" method that would not cause a copy in read, but
// can hold the data for longer.

// TODO: possibly create a Write method with a timeout that will simply fail
// if the RT threads holds on too long.

/**
* This atomically writes the value. It will copy the value into the storage
* and free a previous storage pointer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class RealtimeWritableValue {
}

void Write(const T& new_value) {
// TODO: static assert that T is a POD
auto i = idx_.fetch_or(kBusyMask) & kIdxMask;
buf_[i] = new_value;
idx_.store((i & kIdxMask) | kNewDataMask);
Expand Down
16 changes: 6 additions & 10 deletions include/cactus_rt/ros2/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,22 @@

#include <readerwriterqueue.h>

#include <functional>
#include <optional>
#include <rclcpp/rclcpp.hpp>

#include "types.h"

namespace cactus_rt::ros2 {
class Ros2Adapter;

template <typename RealtimeT, typename RosT>
using RealtimeToROS2Converter = std::function<void(const RealtimeT&, RosT&)>;

template <typename RealtimeT, typename RosT>
using Ros2ToRealtimeConverter = std::function<RealtimeT(const RosT&)>;

class IPublisher {
friend class Ros2Adapter;

virtual bool DequeueAndPublishToRos() = 0;
virtual void FullyDrainAndPublishToRos() = 0;

public:
virtual ~IPublisher() = 0;
virtual ~IPublisher() = default;
};

template <typename RealtimeT, typename RosT>
Expand All @@ -35,18 +30,21 @@ class Publisher : public IPublisher {
bool DequeueAndPublishToRos() override {
RealtimeT rt_msg;

// 1 copy
const bool has_data = queue_.try_dequeue(rt_msg);
if (!has_data) {
return false;
}

if (converter_) {
auto loaned_msg = publisher_->borrow_loaned_message();
// 1 copy
converter_.value()(rt_msg, loaned_msg.get());
publisher_->publish(std::move(loaned_msg));
} else {
if constexpr (std::is_same_v<RealtimeT, RosT>) {
const auto* ros_msg = static_cast<const RosT*>(rt_msg);
// 1 copy
publisher_->publish(*ros_msg);
} else {
throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"};
Expand Down Expand Up @@ -77,8 +75,6 @@ class Publisher : public IPublisher {
moodycamel::ReaderWriterQueue<RealtimeT>&& queue
) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {}

~Publisher() override = default;

template <typename... Args>
bool Publish(Args&&... args) noexcept {
const bool success = queue_.try_emplace(std::forward<Args>(args)...);
Expand Down
29 changes: 26 additions & 3 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
#include <type_traits>
#include <utility>

#include "publisher.h"
#include "./publisher.h"
#include "./subscription.h"
#include "cactus_rt/ros2/subscription.h"
#include "cactus_rt/ros2/types.h"

namespace cactus_rt::ros2 {

Expand All @@ -36,8 +39,9 @@ class Ros2Adapter {
// This means that during the timer callback, no subscribers, publishers, services, and etc. can be changed.
std::mutex mut_;

// Publisher data
std::vector<std::shared_ptr<IPublisher>> publishers_;
// Publishers and subscriptions
std::vector<std::shared_ptr<IPublisher>> publishers_;
std::vector<std::shared_ptr<ISubscription>> subscriptions_;

public:
Ros2Adapter(const std::string& name_, const Config& config);
Expand Down Expand Up @@ -73,6 +77,25 @@ class Ros2Adapter {
return publisher_bundle;
}

template <typename RealtimeT, typename RosT>
std::shared_ptr<SubscriptionLatest<RealtimeT, RosT>> CreateSubscriptionForLatestValue(
const std::string& topic_name,
const rclcpp::QoS& qos,
std::optional<Ros2ToRealtimeConverter<RealtimeT, RosT>> converter
) {
if (!converter) {
if constexpr (!(std::is_trivial_v<RosT> && std::is_standard_layout_v<RosT> && std::is_same_v<RosT, RealtimeT>)) {
throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified");
}
}

auto subscription_bundle = SubscriptionLatest<RealtimeT, RosT>::Create(*this->ros_node_, topic_name, qos, converter);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscription_bundle);
return subscription_bundle;
}

private:
void TimerCallback();
void DrainQueues();
Expand Down
103 changes: 103 additions & 0 deletions include/cactus_rt/ros2/subscription.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#ifndef CACTUS_RT_ROS2_SUBSCRIPTION_H_
#define CACTUS_RT_ROS2_SUBSCRIPTION_H_

#include <memory>
#include <optional>
#include <rclcpp/rclcpp.hpp>

#include "../experimental/lockless/spsc/realtime_readable_value.h"
#include "cactus_rt/utils.h"
#include "types.h"

// Note: ROS subscription dispatch is here: https://github.com/ros2/rclcpp/blob/e10728c/rclcpp/include/rclcpp/any_subscription_callback.hpp#L481
// There are many methods that we can choose from.

namespace cactus_rt::ros2 {

class Ros2Adapter;

class ISubscription {
public:
virtual ~ISubscription() = default;
};

template <typename RealtimeT>
struct StampedValue {
int64_t received_time; // TODO: need to align this with elapsed time from the threads...
RealtimeT value;

StampedValue() = default;
StampedValue(int64_t t, const RealtimeT& v) : received_time(t), value(v) {}
};

// TODO: Theoretically it is possible to not use the type converter if we don't
// copy the data on read with RealtimeReadableValue.
template <typename RealtimeT, typename RosT>
class SubscriptionLatest : public ISubscription {
friend class Ros2Adapter;

using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue<StampedValue<RealtimeT>>;

typename rclcpp::Subscription<RosT>::SharedPtr ros_subscription_;
std::optional<Ros2ToRealtimeConverter<RealtimeT, RosT>> converter_;
RealtimeReadableValue latest_value_;

void SubscriptionCallback(const RosT& ros_msg) {
if (converter_) {
const RealtimeT& rt_msg = converter_.value()(ros_msg);

// 1 copy
const StampedValue<RealtimeT> stamped_value(NowNs(), rt_msg);

// 1 allocation and 1 copy.
latest_value_.Write(stamped_value);
} else {
if constexpr (std::is_same_v<RealtimeT, RosT>) {
const auto& rt_msg = static_cast<const RealtimeT&>(ros_msg);

// 1 copy
const StampedValue<RealtimeT> stamped_value(NowNs(), rt_msg);

// 1 allocation and 1 copy.
latest_value_.Write(stamped_value);
} else {
throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"};
}
}
}

static std::shared_ptr<SubscriptionLatest> Create(
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos,
std::optional<Ros2ToRealtimeConverter<RealtimeT, RosT>> converter
) {
auto subscription = std::make_shared<SubscriptionLatest<RealtimeT, RosT>>(converter);

auto* subscription_ptr = subscription.get();

// TODO: is there a better way to do this under than to capture the shared_pointer like this?
subscription->ros_subscription_ = node.create_subscription<RosT>(topic_name, qos, [subscription_ptr](const RosT& ros_msg) {
subscription_ptr->SubscriptionCallback(ros_msg);
});

return subscription;
}

public:
/**
* Constructs a subscription. Shouldn't be called directly. Only public to enable make_shared.
*
* @private
*/
explicit SubscriptionLatest(
std::optional<Ros2ToRealtimeConverter<RealtimeT, RosT>> converter
) : converter_(converter) {}

inline StampedValue<RealtimeT> ReadLatest() noexcept {
return latest_value_.Read();
}
};
} // namespace cactus_rt::ros2

#endif
14 changes: 14 additions & 0 deletions include/cactus_rt/ros2/types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef CACTUS_RT_ROS2_TYPES_H_
#define CACTUS_RT_ROS2_TYPES_H_

#include <functional>

namespace cactus_rt::ros2 {
template <typename RealtimeT, typename RosT>
using Ros2ToRealtimeConverter = std::function<RealtimeT(const RosT&)>;

template <typename RealtimeT, typename RosT>
using RealtimeToROS2Converter = std::function<void(const RealtimeT&, RosT&)>;
} // namespace cactus_rt::ros2

#endif
Loading

0 comments on commit 989c7d1

Please sign in to comment.