From 8129810408c9740c0302ed94d9bab163b4de51d3 Mon Sep 17 00:00:00 2001 From: Sergio Portoles Diez Date: Thu, 10 Sep 2020 23:46:48 +0200 Subject: [PATCH 1/5] rtt_roscomm: shortcut ROS spinner callback queue The patch introduces a new "queue" without a queue to bypass the global CallbackQueue of ROS for the subscriptions of Orocos messages. Instead, the network thread will immediately execute the callback, and will pass it to the global CallbackQueue only in case that it fails to execute with TryAgain status. Helper functions are provided to generate a subscriber with a custom implementation of CallbackQueueInterface. This method is chosen for the subscribers of the message transporter. --- rtt_roscomm/CMakeLists.txt | 4 + .../passthrough_callback_queue.hpp | 232 ++++++++++++++++++ .../rtt_rostopic_ros_msg_transporter.hpp | 10 +- .../src/passthrough_callback_queue.cpp | 24 ++ 4 files changed, 266 insertions(+), 4 deletions(-) create mode 100644 rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp create mode 100644 rtt_roscomm/src/passthrough_callback_queue.cpp diff --git a/rtt_roscomm/CMakeLists.txt b/rtt_roscomm/CMakeLists.txt index 68881aaa..14c9949f 100644 --- a/rtt_roscomm/CMakeLists.txt +++ b/rtt_roscomm/CMakeLists.txt @@ -17,6 +17,10 @@ orocos_library(rtt_rostopic src/rtt_rostopic.cpp) target_link_libraries(rtt_rostopic ${catkin_LIBRARIES}) +orocos_library(passthrough_callback_queue + src/passthrough_callback_queue.cpp) +target_link_libraries(passthrough_callback_queue ${catkin_LIBRARIES}) + orocos_service(rtt_rostopic_service src/rtt_rostopic_service.cpp src/rtt_rostopic_ros_publish_activity.cpp) diff --git a/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp new file mode 100644 index 00000000..d18d67b8 --- /dev/null +++ b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp @@ -0,0 +1,232 @@ + +#ifndef RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_ +#define RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_ + +#include +#include +#include +#include + +namespace rtt_roscomm { + class PassthroughCallbackQueue: public ros::CallbackQueueInterface + { + public: + PassthroughCallbackQueue(); + + /** + * Implementation of ros::CallbackQueueInterface::addCallback() + * + * @param callback aaa + * @param owner_id aaa + * + * @return void + */ + virtual void addCallback( + const ros::CallbackInterfacePtr &callback, + uint64_t owner_id=0); + + /** + * Implementation of ros::CallbackQueueInterface::removeByID() + * + * @param owner_id aaa + * + * @return void + */ + virtual void removeByID(uint64_t owner_id); + + private: + + /** + * Port where to write the message + */ + // base::PortInterface* output_port_ptr_; + + }; // class PassthroughCallbackQueue + + /** + * Helpers to generate subscribers with a custom ros::CallbackQueueInterface + */ + + // template + // ros::Subscriber subscribe(const std::string& topic, uint32_t queue_size, + // void(T::*fp)(const boost::shared_ptr&) const, T* obj, + // ros::CallbackQueueInterface* cq, ros::NodeHandle& nh, + // const TransportHints& transport_hints = ros::TransportHints()) + // { + // ros::SubscribeOptions ops; + // ops.template init(topic, queue_size, boost::bind(fp, obj, _1)); + // ops.transport_hints = transport_hints; + // ops.callback_queue = cq; + // return nh.subscribe(ops); + // } + + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj, _1)); + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + + /// and the const version + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const, T* obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj, _1)); + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, + void(T::*fp)(const boost::shared_ptr&), T* obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template init(topic, queue_size, boost::bind(fp, obj, _1)); + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, + void(T::*fp)(const boost::shared_ptr&) const, T* obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template init(topic, queue_size, boost::bind(fp, obj, _1)); + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, void(T::*fp)(M), + const boost::shared_ptr& obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj.get(), _1)); + ops.tracked_object = obj; + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const, + const boost::shared_ptr& obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj.get(), _1)); + ops.tracked_object = obj; + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, + void(T::*fp)(const boost::shared_ptr&), + const boost::shared_ptr& obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template init(topic, queue_size, boost::bind(fp, obj.get(), _1)); + ops.tracked_object = obj; + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, + void(T::*fp)(const boost::shared_ptr&) const, + const boost::shared_ptr& obj, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template init(topic, queue_size, boost::bind(fp, obj.get(), _1)); + ops.tracked_object = obj; + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, void(*fp)(M), + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template initByFullCallbackType(topic, queue_size, fp); + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, + void(*fp)(const boost::shared_ptr&), + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template init(topic, queue_size, fp); + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, + const boost::function&)>& callback, + const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(), + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template init(topic, queue_size, callback); + ops.tracked_object = tracked_object; + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + template + ros::Subscriber subscribe( + ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, + const std::string& topic, uint32_t queue_size, + const boost::function& callback, + const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(), + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template initByFullCallbackType(topic, queue_size, callback); + ops.tracked_object = tracked_object; + ops.transport_hints = transport_hints; + ops.callback_queue = cq; + return nh.subscribe(ops); + } + +} // namespace rtt_roscomm + +#endif // RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_ diff --git a/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp b/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp index 4a963030..235e15ed 100644 --- a/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp +++ b/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp @@ -56,6 +56,7 @@ #include #include +#include #include #ifndef RTT_VERSION_GTE @@ -241,6 +242,7 @@ namespace rtt_roscomm { ros::NodeHandle ros_node; ros::NodeHandle ros_node_private; ros::Subscriber ros_sub; + PassthroughCallbackQueue prassthrough_callback_queue; public: /** @@ -264,9 +266,9 @@ namespace rtt_roscomm { RTT::log(RTT::Debug)<<"Creating ROS subscriber for port "<getName()<<" on topic "< 1 && topicname.at(0) == '~') { - ros_sub = ros_node_private.subscribe(policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1 + ros_sub = subscribe(ros_node_private, &prassthrough_callback_queue, policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1 } else { - ros_sub = ros_node.subscribe(policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1 + ros_sub = subscribe(ros_node, &prassthrough_callback_queue, policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1 } } @@ -350,6 +352,6 @@ namespace rtt_roscomm { return channel; } - }; -} + }; // class RosMsgTransporter +} // namespace rtt_roscomm #endif diff --git a/rtt_roscomm/src/passthrough_callback_queue.cpp b/rtt_roscomm/src/passthrough_callback_queue.cpp new file mode 100644 index 00000000..95e3d341 --- /dev/null +++ b/rtt_roscomm/src/passthrough_callback_queue.cpp @@ -0,0 +1,24 @@ + +#include "rtt_roscomm/passthrough_callback_queue.hpp" + +namespace rtt_roscomm { + +PassthroughCallbackQueue::PassthroughCallbackQueue() + {} + +void PassthroughCallbackQueue::addCallback( + const ros::CallbackInterfacePtr &callback, + uint64_t owner_id) +{ + // Call CallbackInterface::CallResult SubscriptionQueue::call() + if (ros::CallbackInterface::TryAgain == callback->call()) { + ros::getGlobalCallbackQueue()->addCallback(callback, owner_id); + } +} + +void PassthroughCallbackQueue::removeByID(uint64_t owner_id) +{ +} + +} // namespace rtt_roscomm + From 13afcbe8b04540ec0637383ec1eae3816bab81a0 Mon Sep 17 00:00:00 2001 From: Sergio Portoles Diez Date: Wed, 30 Sep 2020 19:54:26 +0200 Subject: [PATCH 2/5] rtt_roscomm: attend PR comments The patch mainly removes the use of helper functions to subscribe with a custom callback. Instead, a SubscribeOptions object is populated and used during the subscribe() call. The documentation is improved. --- .../passthrough_callback_queue.hpp | 252 +++--------------- .../rtt_rostopic_ros_msg_transporter.hpp | 21 +- .../src/passthrough_callback_queue.cpp | 13 +- 3 files changed, 48 insertions(+), 238 deletions(-) diff --git a/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp index d18d67b8..ae82f5ad 100644 --- a/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp +++ b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp @@ -1,4 +1,3 @@ - #ifndef RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_ #define RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_ @@ -8,224 +7,39 @@ #include namespace rtt_roscomm { - class PassthroughCallbackQueue: public ros::CallbackQueueInterface - { - public: - PassthroughCallbackQueue(); - - /** - * Implementation of ros::CallbackQueueInterface::addCallback() - * - * @param callback aaa - * @param owner_id aaa - * - * @return void - */ - virtual void addCallback( - const ros::CallbackInterfacePtr &callback, - uint64_t owner_id=0); - - /** - * Implementation of ros::CallbackQueueInterface::removeByID() - * - * @param owner_id aaa - * - * @return void - */ - virtual void removeByID(uint64_t owner_id); - - private: - - /** - * Port where to write the message - */ - // base::PortInterface* output_port_ptr_; - - }; // class PassthroughCallbackQueue - - /** - * Helpers to generate subscribers with a custom ros::CallbackQueueInterface - */ - - // template - // ros::Subscriber subscribe(const std::string& topic, uint32_t queue_size, - // void(T::*fp)(const boost::shared_ptr&) const, T* obj, - // ros::CallbackQueueInterface* cq, ros::NodeHandle& nh, - // const TransportHints& transport_hints = ros::TransportHints()) - // { - // ros::SubscribeOptions ops; - // ops.template init(topic, queue_size, boost::bind(fp, obj, _1)); - // ops.transport_hints = transport_hints; - // ops.callback_queue = cq; - // return nh.subscribe(ops); - // } - - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj, _1)); - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - - /// and the const version - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const, T* obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj, _1)); - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, - void(T::*fp)(const boost::shared_ptr&), T* obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template init(topic, queue_size, boost::bind(fp, obj, _1)); - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, - void(T::*fp)(const boost::shared_ptr&) const, T* obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template init(topic, queue_size, boost::bind(fp, obj, _1)); - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, void(T::*fp)(M), - const boost::shared_ptr& obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj.get(), _1)); - ops.tracked_object = obj; - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const, - const boost::shared_ptr& obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template initByFullCallbackType(topic, queue_size, boost::bind(fp, obj.get(), _1)); - ops.tracked_object = obj; - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, - void(T::*fp)(const boost::shared_ptr&), - const boost::shared_ptr& obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template init(topic, queue_size, boost::bind(fp, obj.get(), _1)); - ops.tracked_object = obj; - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, - void(T::*fp)(const boost::shared_ptr&) const, - const boost::shared_ptr& obj, - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template init(topic, queue_size, boost::bind(fp, obj.get(), _1)); - ops.tracked_object = obj; - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, void(*fp)(M), - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template initByFullCallbackType(topic, queue_size, fp); - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, - void(*fp)(const boost::shared_ptr&), - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template init(topic, queue_size, fp); - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, - const boost::function&)>& callback, - const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(), - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template init(topic, queue_size, callback); - ops.tracked_object = tracked_object; - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } - template - ros::Subscriber subscribe( - ros::NodeHandle& nh, ros::CallbackQueueInterface* cq, - const std::string& topic, uint32_t queue_size, - const boost::function& callback, - const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(), - const ros::TransportHints& transport_hints = ros::TransportHints()) - { - ros::SubscribeOptions ops; - ops.template initByFullCallbackType(topic, queue_size, callback); - ops.tracked_object = tracked_object; - ops.transport_hints = transport_hints; - ops.callback_queue = cq; - return nh.subscribe(ops); - } +class PassthroughCallbackQueue: public ros::CallbackQueueInterface +{ + public: + /** + * Implementation of ros::CallbackQueueInterface::addCallback() + * + * This method is executing the callback received instead of adding + * it to a queue. In this way, the queue is bypassed and immediately + * executed. + * + * @param callback callback to execute, instead of queueing + * @param owner_id Owner of the callback, in other implementations it might + * be used to remove the callback. In this implementation + * it has no effect. + * + * @return void + */ + virtual void addCallback( + const ros::CallbackInterfacePtr &callback, + uint64_t owner_id=0); + + /** + * Implementation of ros::CallbackQueueInterface::removeByID() + * + * No-op + * + * @param owner_id Not used. + * + * @return void + */ + virtual void removeByID(uint64_t owner_id) {} + +}; // class PassthroughCallbackQueue } // namespace rtt_roscomm diff --git a/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp b/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp index 235e15ed..80bf9ba1 100644 --- a/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp +++ b/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp @@ -148,8 +148,6 @@ namespace rtt_roscomm { } ~RosPubChannelElement() { - RTT::Logger::In in(topicname); -// RTT::log(RTT::Debug) << "Destroying RosPubChannelElement" << RTT::endlog(); act->removePublisher( this ); } @@ -241,8 +239,8 @@ namespace rtt_roscomm { std::string topicname; ros::NodeHandle ros_node; ros::NodeHandle ros_node_private; + PassthroughCallbackQueue passthrough_callback_queue; ros::Subscriber ros_sub; - PassthroughCallbackQueue prassthrough_callback_queue; public: /** @@ -258,7 +256,14 @@ namespace rtt_roscomm { ros_node(), ros_node_private("~") { - topicname=policy.name_id; + topicname = policy.name_id; + ros::SubscribeOptions ops; + ops.template initByFullCallbackType( + topicname, + policy.size > 0 ? policy.size : 1, // minimum queue_size 1 + boost::bind(&RosSubChannelElement::newData, this, _1)); + ops.callback_queue = &passthrough_callback_queue; + RTT::Logger::In in(topicname); if (port->getInterface() && port->getInterface()->getOwner()) { RTT::log(RTT::Debug)<<"Creating ROS subscriber for port "<getInterface()->getOwner()->getName()<<"."<getName()<<" on topic "<getName()<<" on topic "< 1 && topicname.at(0) == '~') { - ros_sub = subscribe(ros_node_private, &prassthrough_callback_queue, policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1 + ops.topic = ops.topic.substr(1); + ros_sub = ros_node_private.subscribe(ops); } else { - ros_sub = subscribe(ros_node, &prassthrough_callback_queue, policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1 + ros_sub = ros_node.subscribe(ops); } } ~RosSubChannelElement() { - RTT::Logger::In in(topicname); -// RTT::log(RTT::Debug)<<"Destroying RosSubChannelElement"<call()) { + ros::CallbackInterface::CallResult result = callback->call(); + if (ros::CallbackInterface::TryAgain == result) { ros::getGlobalCallbackQueue()->addCallback(callback, owner_id); } } -void PassthroughCallbackQueue::removeByID(uint64_t owner_id) -{ -} - } // namespace rtt_roscomm - From 66c674263b520348a241a7435304f8de2671bd76 Mon Sep 17 00:00:00 2001 From: Sergio Portoles Diez Date: Thu, 1 Oct 2020 09:37:03 +0200 Subject: [PATCH 3/5] rtt_roscomm: add license to new files --- .../passthrough_callback_queue.hpp | 29 +++++++++++++++++++ .../src/passthrough_callback_queue.cpp | 29 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp index ae82f5ad..efa4e7a1 100644 --- a/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp +++ b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp @@ -1,3 +1,32 @@ +/* + * (C) 2020, Intermodalics BVBA + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + #ifndef RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_ #define RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_ diff --git a/rtt_roscomm/src/passthrough_callback_queue.cpp b/rtt_roscomm/src/passthrough_callback_queue.cpp index 0f12655b..f16c71a4 100644 --- a/rtt_roscomm/src/passthrough_callback_queue.cpp +++ b/rtt_roscomm/src/passthrough_callback_queue.cpp @@ -1,3 +1,32 @@ +/* + * (C) 2020, Intermodalics BVBA + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + #include "rtt_roscomm/passthrough_callback_queue.hpp" namespace rtt_roscomm { From 6c94d127529232c88d51f0851ebae6720950cd3c Mon Sep 17 00:00:00 2001 From: Sergio Portoles Diez Date: Thu, 1 Oct 2020 15:28:57 +0200 Subject: [PATCH 4/5] rtt_roscomm: remove redundant documentation --- .../rtt_roscomm/passthrough_callback_queue.hpp | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp index efa4e7a1..bf238c54 100644 --- a/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp +++ b/rtt_roscomm/include/rtt_roscomm/passthrough_callback_queue.hpp @@ -43,15 +43,11 @@ class PassthroughCallbackQueue: public ros::CallbackQueueInterface * Implementation of ros::CallbackQueueInterface::addCallback() * * This method is executing the callback received instead of adding - * it to a queue. In this way, the queue is bypassed and immediately - * executed. + * it to a queue. In this way, the queue is bypassed and the callback is + * immediately executed. * * @param callback callback to execute, instead of queueing - * @param owner_id Owner of the callback, in other implementations it might - * be used to remove the callback. In this implementation - * it has no effect. - * - * @return void + * @param owner_id Not used */ virtual void addCallback( const ros::CallbackInterfacePtr &callback, @@ -63,8 +59,6 @@ class PassthroughCallbackQueue: public ros::CallbackQueueInterface * No-op * * @param owner_id Not used. - * - * @return void */ virtual void removeByID(uint64_t owner_id) {} From 3a7367661144511fab50d39ec19b7b6ad3c11320 Mon Sep 17 00:00:00 2001 From: Sergio Portoles Diez Date: Thu, 1 Oct 2020 16:00:47 +0200 Subject: [PATCH 5/5] rtt_roscomm: use always ROS queue size of 1 for subscribers The patch modifies the rostopic queue size to always 1 when subscribing, bacause when shortcutting the callback that processes messages at ROS side, this buffer will be never fill. Instead, the buffering happens at the RTT data object buffer with the correct size according to the policy. --- .../include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp b/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp index 80bf9ba1..f8b763b0 100644 --- a/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp +++ b/rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp @@ -260,7 +260,7 @@ namespace rtt_roscomm { ros::SubscribeOptions ops; ops.template initByFullCallbackType( topicname, - policy.size > 0 ? policy.size : 1, // minimum queue_size 1 + 1u, // Always 1, since data can be buffered in RTT buffer boost::bind(&RosSubChannelElement::newData, this, _1)); ops.callback_queue = &passthrough_callback_queue;