From d8d09232cb8a4b9f438d05c0038b1c7a7f5e3382 Mon Sep 17 00:00:00 2001 From: Lennart Niecksch Date: Fri, 12 Jul 2024 11:02:41 +0200 Subject: [PATCH] Initial commit. --- .gitignore | 9 + CMakeLists.txt | 219 ++++++++++++++++++ LICENSE | 31 +++ README.md | 18 ++ cmake/FindLibRDKafka.cmake | 2 + config/producer.yaml | 13 ++ examples/ExampleConsumer.cpp | 27 +++ examples/ExampleProducer.cpp | 23 ++ include/ros_kafka/ros_kafka_common_config.h | 140 +++++++++++ .../ros_kafka/ros_kafka_message_consumer.h | 105 +++++++++ .../ros_kafka/ros_kafka_message_producer.h | 208 +++++++++++++++++ include/ros_kafka/ros_kafka_serialization.h | 60 +++++ include/ros_kafka/timestamp_util.h | 23 ++ include/ros_kafka/unassigned_partitioner.h | 21 ++ launch/example_consumer.launch | 32 +++ launch/example_producer.launch | 31 +++ package.xml | 34 +++ 17 files changed, 996 insertions(+) create mode 100644 .gitignore create mode 100644 CMakeLists.txt create mode 100644 LICENSE create mode 100644 README.md create mode 100644 cmake/FindLibRDKafka.cmake create mode 100644 config/producer.yaml create mode 100644 examples/ExampleConsumer.cpp create mode 100644 examples/ExampleProducer.cpp create mode 100644 include/ros_kafka/ros_kafka_common_config.h create mode 100644 include/ros_kafka/ros_kafka_message_consumer.h create mode 100644 include/ros_kafka/ros_kafka_message_producer.h create mode 100644 include/ros_kafka/ros_kafka_serialization.h create mode 100644 include/ros_kafka/timestamp_util.h create mode 100644 include/ros_kafka/unassigned_partitioner.h create mode 100644 launch/example_consumer.launch create mode 100644 launch/example_producer.launch create mode 100644 package.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..499ca51 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +certs/* +.idea/* + +*.swp +*.swo +*.key +*.pem +*.req +ca-cert diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..0c99312 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,219 @@ +cmake_minimum_required(VERSION 3.0.2) +project(ros_kafka) +set(CMAKE_MODULE_PATH + ${CMAKE_CURRENT_SOURCE_DIR}/cmake + ${CMAKE_MODULE_PATH}) + + +## Compile as C++11, supported in ROS Kinetic and newer +# add_compile_options(-std=c++11) + +## Find catkin macros and libraries +## if COMPONENTS list like find_package(catkin REQUIRED COMPONENTS xyz) +## is used, also find other catkin packages +find_package(catkin REQUIRED COMPONENTS + roscpp + std_msgs +) +catkin_package(CFG_EXTRAS FindLibRDKafka.cmake) + +## System dependencies are found with CMake's conventions +find_package(Boost REQUIRED COMPONENTS system) +find_package(LibRDKafka REQUIRED) + + +## Uncomment this if the package has a setup.py. This macro ensures +## modules and global scripts declared therein get installed +## See http://ros.org/doc/api/catkin/html/user_guide/setup_dot_py.html +# catkin_python_setup() + +################################################ +## Declare ROS messages, services and actions ## +################################################ + +## To declare and build messages, services or actions from within this +## package, follow these steps: +## * Let MSG_DEP_SET be the set of packages whose message types you use in +## your messages/services/actions (e.g. std_msgs, actionlib_msgs, ...). +## * In the file package.xml: +## * add a build_depend tag for "message_generation" +## * add a build_depend and a exec_depend tag for each package in MSG_DEP_SET +## * If MSG_DEP_SET isn't empty the following dependency has been pulled in +## but can be declared for certainty nonetheless: +## * add a exec_depend tag for "message_runtime" +## * In this file (CMakeLists.txt): +## * add "message_generation" and every package in MSG_DEP_SET to +## find_package(catkin REQUIRED COMPONENTS ...) +## * add "message_runtime" and every package in MSG_DEP_SET to +## catkin_package(CATKIN_DEPENDS ...) +## * uncomment the add_*_files sections below as needed +## and list every .msg/.srv/.action file to be processed +## * uncomment the generate_messages entry below +## * add every package in MSG_DEP_SET to generate_messages(DEPENDENCIES ...) + +## Generate messages in the 'msg' folder +# add_message_files( +# FILES +# Message1.msg +# Message2.msg +# ) + +## Generate services in the 'srv' folder +# add_service_files( +# FILES +# Service1.srv +# Service2.srv +# ) + +## Generate actions in the 'action' folder +# add_action_files( +# FILES +# Action1.action +# Action2.action +# ) + +## Generate added messages and services with any dependencies listed here +# generate_messages( +# DEPENDENCIES +# sensor_msgs# std_msgs +# ) + +################################################ +## Declare ROS dynamic reconfigure parameters ## +################################################ + +## To declare and build dynamic reconfigure parameters within this +## package, follow these steps: +## * In the file package.xml: +## * add a build_depend and a exec_depend tag for "dynamic_reconfigure" +## * In this file (CMakeLists.txt): +## * add "dynamic_reconfigure" to +## find_package(catkin REQUIRED COMPONENTS ...) +## * uncomment the "generate_dynamic_reconfigure_options" section below +## and list every .cfg file to be processed + +## Generate dynamic reconfigure parameters in the 'cfg' folder +# generate_dynamic_reconfigure_options( +# cfg/DynReconf1.cfg +# cfg/DynReconf2.cfg +# ) + +################################### +## catkin specific configuration ## +################################### +## The catkin_package macro generates cmake config files for your package +## Declare things to be passed to dependent projects +## INCLUDE_DIRS: uncomment this if your package contains header files +## LIBRARIES: libraries you create in this project that dependent projects also need +## CATKIN_DEPENDS: catkin_packages dependent projects also need +## DEPENDS: system dependencies of this project that dependent projects also need +catkin_package( + INCLUDE_DIRS include +# LIBRARIES ros-kafka +# CATKIN_DEPENDS roscpp sensor_msgs std_msgs +DEPENDS LibRDKafka +) + +########### +## Build ## +########### + +## Specify additional locations of header files +## Your package locations should be listed before other locations +include_directories( + include + ${catkin_INCLUDE_DIRS} + ${LibRDKafka_INCLUDE_DIRS} +) + +## Declare a C++ library +# add_library(${PROJECT_NAME} +# src/${PROJECT_NAME}/ros-kafka.cpp +# ) + +## Add cmake target dependencies of the library +## as an example, code may need to be generated before libraries +## either from message generation or dynamic reconfigure +# add_dependencies(${PROJECT_NAME} ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS}) + +## Declare a C++ executable +## With catkin_make all packages are built within a single CMake context +## The recommended prefix ensures that target names across packages don't collide +add_executable(example_producer examples/ExampleProducer.cpp) +add_executable(example_consumer examples/ExampleConsumer.cpp) + + + +## Rename C++ executable without prefix +## The above recommended prefix causes long target names, the following renames the +## target back to the shorter version for ease of user use +## e.g. "rosrun someones_pkg node" instead of "rosrun someones_pkg someones_pkg_node" +# set_target_properties(${PROJECT_NAME}_node PROPERTIES OUTPUT_NAME node PREFIX "") + +## Add cmake target dependencies of the executable +## same as for the library above +add_dependencies(example_producer ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS}) +add_dependencies(example_consumer ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS}) + +## Specify libraries to link a library or executable target against + target_link_libraries(example_producer + ${catkin_LIBRARIES} + ${LibRDKafka_LIBRARIES} + ) + + target_link_libraries(example_consumer + ${catkin_LIBRARIES} + ${LibRDKafka_LIBRARIES} + ) + + +############# +## Install ## +############# + +# all install targets should use catkin DESTINATION variables +# See http://ros.org/doc/api/catkin/html/adv_user_guide/variables.html + +## Mark executable scripts (Python etc.) for installation +## in contrast to setup.py, you can choose the destination +# catkin_install_python(PROGRAMS +# scripts/my_python_script +# DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} +# ) + +## Mark executables for installation +## See http://docs.ros.org/melodic/api/catkin/html/howto/format1/building_executables.html +install(TARGETS example_producer example_consumer + RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} + ) + + +## Mark cpp header files for installation +install(DIRECTORY include/${PROJECT_NAME}/ + DESTINATION ${CATKIN_PACKAGE_INCLUDE_DESTINATION} + FILES_MATCHING PATTERN "*.h" + PATTERN ".svn" EXCLUDE + ) +# cmake find script +install(DIRECTORY cmake + DESTINATION ${CATKIN_PACKAGE_SHARE_DESTINATION} + PATTERN ".svn" EXCLUDE) +## Mark other files for installation (e.g. launch and bag files, etc.) +# install(FILES +# # myfile1 +# # myfile2 +# DESTINATION ${CATKIN_PACKAGE_SHARE_DESTINATION} +# ) + +############# +## Testing ## +############# + +## Add gtest based cpp test target and link libraries +# catkin_add_gtest(${PROJECT_NAME}-test test/test_ros-kafka.cpp) +# if(TARGET ${PROJECT_NAME}-test) +# target_link_libraries(${PROJECT_NAME}-test ${PROJECT_NAME}) +# endif() + +## Add folders to be run by python nosetests +# catkin_add_nosetests(test) diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..04e96eb --- /dev/null +++ b/LICENSE @@ -0,0 +1,31 @@ +Software License Agreement (BSD License) + + Copyright (c) 2023, DFKI GmbH + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + * Neither the name of Willow Garage nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..95454fc --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +***DISCLAIMER*** + +***DO NOT USE IN PRODUCTION!!!*** + + +Dependencies (except obviously ROS): +```console +sudo apt install librdkafka-dev +``` +If you want to use zstd compression one needs to compile librdkafka from source (on ubuntu 20.04). + +Very basic bridgin from ros<->kafka. +Basically two template headers based on the librdkafka tutorial. +Publishes serialized rosmsg on kafka topic assumes stamped message. +Or consumes from kafka topic to publish on ros topic. +See ScanProducer.cpp for the most simple example for producing to kafka +and RosClusterConsumer reads a MarkerArray from a kafka topic. +Custom (de-)serialization/partitioning is possible by passing a functor to the constructor. diff --git a/cmake/FindLibRDKafka.cmake b/cmake/FindLibRDKafka.cmake new file mode 100644 index 0000000..492f85f --- /dev/null +++ b/cmake/FindLibRDKafka.cmake @@ -0,0 +1,2 @@ +find_package(PkgConfig REQUIRED) +pkg_check_modules(LibRDKafka REQUIRED rdkafka++ rdkafka) diff --git a/config/producer.yaml b/config/producer.yaml new file mode 100644 index 0000000..33d7c44 --- /dev/null +++ b/config/producer.yaml @@ -0,0 +1,13 @@ +parameters: { + acks: "1", + message.max.bytes: "500000000", + queue.buffering.max.kbytes: "160000000000", + queue.buffering.max.messages: "10000000", + linger.ms: "80", + batch.num.messages: "16", + compression.type: "zstd", + compression.codec: "zstd", + compression.level: "3", + # 0 is infinite + message.timeout.ms: "0", + } diff --git a/examples/ExampleConsumer.cpp b/examples/ExampleConsumer.cpp new file mode 100644 index 0000000..9a60df0 --- /dev/null +++ b/examples/ExampleConsumer.cpp @@ -0,0 +1,27 @@ +#include +#include + +#include +#include + +#include +#include + + +int main(int argc, char **argv) +{ + ros::init(argc, argv, "ConsumerExample"); + + ros::NodeHandle nh("~"); + ros::Publisher pub = nh.advertise("chatter", 50); + + RosKafkaMessageConsumer consumer(pub, nh, &RosKafkaSerialization::deserializeToRosMsg); + + while(ros::ok()) + { + consumer.consume(); + ros::spinOnce(); + } + + return 0; +} diff --git a/examples/ExampleProducer.cpp b/examples/ExampleProducer.cpp new file mode 100644 index 0000000..122c975 --- /dev/null +++ b/examples/ExampleProducer.cpp @@ -0,0 +1,23 @@ +#include +#include +#include + +#include + +int main(int argc, char **argv) +{ + ros::init(argc, argv, "producer"); + + ros::NodeHandle n("~"); + RosKafkaMessageProducer prod(n, "chatter"); + + while(ros::ok()) + { + prod.poll(); + //ros::spinOnce(); + // mimic spin behaviour according to wiki.. + ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(0.1)); + } + + return 0; +} diff --git a/include/ros_kafka/ros_kafka_common_config.h b/include/ros_kafka/ros_kafka_common_config.h new file mode 100644 index 0000000..0b2d4ca --- /dev/null +++ b/include/ros_kafka/ros_kafka_common_config.h @@ -0,0 +1,140 @@ +#ifndef ROS_KAFKA_COMMON_CONFIG_ +#define ROS_KAFKA_COMMON_CONFIG_ + +class RosKafkaCommonConfig{ + public: + RosKafkaCommonConfig(ros::NodeHandle nh) : + nh_(nh), + conf_(nullptr) + { + configFromRosParams(); + } + + + std::unique_ptr getConfig() + { + if(!conf_) + configFromRosParams(); + + return std::move(conf_); + } + private: + std::unique_ptr conf_; + ros::NodeHandle nh_; + + void configFromRosParams() + { + conf_ = std::unique_ptr(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + std::string errstr; + std::string user, password, security_protocol, host; + if (nh_.getParam("host", host)) + { + if (conf_->set("bootstrap.servers", host, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + + + /// SECURITY /// + if (nh_.getParam("security_protocol", security_protocol)) + { + if (conf_->set("security.protocol", security_protocol, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + + /// SASL /// + if (security_protocol.find("sasl") != std::string::npos) { + /// TODO check if supported from builtin features. + /// TODO configurable + if (conf_->set("sasl.mechanism", "PLAIN", errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + + if (nh_.getParam("user", user)) + { + if (conf_->set("sasl.username", std::getenv(user.c_str()), errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + + if (nh_.getParam("password", password)) + { + if (conf_->set("sasl.password", std::getenv(user.c_str()), errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + } + + //// SSL /// + if (security_protocol.find("ssl") != std::string::npos) { + /// TODO check if supported + std::string ca_cert, client_cert, client_key, key_pw_var, endpoint_identification; + + if (nh_.getParam("ca_cert", ca_cert)) + { + if (conf_->set("ssl.ca.location", ca_cert, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + + if (nh_.getParam("client_cert", client_cert)) + { + std::cout << " client cert " << client_cert << std::endl; + if (conf_->set("ssl.certificate.location", client_cert, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + + if (nh_.getParam("client_key", client_key)) + { + std::cout << "client key" << client_key << std::endl; + if (conf_->set("ssl.key.location", client_key, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + + if (nh_.getParam("key_pw_var", key_pw_var)) + { + if (conf_->set("ssl.key.password", std::getenv(key_pw_var.c_str()), errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + + if (nh_.getParam("endpoint_identification", endpoint_identification)) + { + if (conf_->set("ssl.endpoint.identification.algorithm", endpoint_identification, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + } + } + } + } + + /// read arbitrary config from parameters namespace e.g. loaded from yaml /// + std::map params; + if(nh_.getParam("parameters", params)) + { + for(auto const& it : params) + { + if (conf_->set(it.first, it.second, errstr) != + RdKafka::Conf::CONF_OK) { + ROS_ERROR("INVALID PARAM/VALUE: %s", errstr.c_str()); + std::cerr << errstr << std::endl; + } + + } + } + } +}; + +#endif + diff --git a/include/ros_kafka/ros_kafka_message_consumer.h b/include/ros_kafka/ros_kafka_message_consumer.h new file mode 100644 index 0000000..1d8e410 --- /dev/null +++ b/include/ros_kafka/ros_kafka_message_consumer.h @@ -0,0 +1,105 @@ +#ifndef KAFKA_MESSAGE_CONSUMER_ +#define KAFKA_MESSAGE_CONSUMER_ + +// ROS +#include +#include + +// kafka +#include + +// ros kafka bridge +#include "ros_kafka_common_config.h" +#include "ros_kafka_serialization.h" + +template +class RosKafkaMessageConsumer +{ + public: + RosKafkaMessageConsumer( + publisherT pub, + ros::NodeHandle nh, + std::function(RdKafka::Message*)> deserializer = + &RosKafkaSerialization::deserializeToRosMsg) + : + nh_(nh), + deserializer_(deserializer), + pub_(pub) + + { + std::string errstr; + RosKafkaCommonConfig confHelper(nh_); + std::unique_ptr conf = confHelper.getConfig(); + + std::string group_id; + + if (nh.getParam("group_id", group_id)) + { + if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) { + ROS_ERROR("ERROR group id: %s", errstr.c_str()); + } + } + else + { + /// TODO exceptions + ROS_ERROR("NO GROUP ID"); + } + + // Create the cosumer + consumer_ = + std::unique_ptr(RdKafka::KafkaConsumer::create(conf.get(), errstr)); + + conf.reset(); + + if (!consumer_) { + ROS_ERROR("Failed to create consumer: %s", errstr.c_str()); + }else{ + // TODO should this be in commonconfig? + if(nh.getParam("kafka_topic", kafkaTopic_)) + { + ROS_INFO("Consuming from %s", kafkaTopic_.c_str()); + consumer_->subscribe({kafkaTopic_}); + } + else + { + ROS_INFO("NO KAFKA TOPIC DEFINED!"); + } + } + + } + + void consume(RdKafka::Message* kafka_msg) + { + + if(kafka_msg->err()) + { + // HANDLE ERRORS BUT THIS MAY JUST BE A LOT "TIMED OUT" Messages... + return; + } + else + { + boost::shared_ptr ros_msg = deserializer_(kafka_msg); + pub_.publish(ros_msg); + } + delete kafka_msg; + } + + void consume() + { + // TODO This should this be configurable + RdKafka::Message* msg = consumer_->consume(20); + if(msg) + consume(msg); + + } + + private: + std::unique_ptr consumer_; + ros::NodeHandle nh_; + std::string kafkaTopic_; + publisherT pub_; + std::function(RdKafka::Message* kafka_msg)> deserializer_; + +}; + +#endif diff --git a/include/ros_kafka/ros_kafka_message_producer.h b/include/ros_kafka/ros_kafka_message_producer.h new file mode 100644 index 0000000..4631b09 --- /dev/null +++ b/include/ros_kafka/ros_kafka_message_producer.h @@ -0,0 +1,208 @@ +#ifndef KAFKA_MESSAGE_PRODUCER_ +#define KAFKA_MESSAGE_PRODUCER_ + +// ROS +#include +#include + +// kafka +#include + +#include +#include + +#include +#include +#include + +#include "ros_kafka_common_config.h" +#include "ros_kafka_serialization.h" +#include "unassigned_partitioner.h" + +#include "timestamp_util.h" + + +// Slightly modified dr from examples. +//class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { +// private: +// int i = 0; +// std::chrono::steady_clock::time_point begin; +// public: +// void setChrono() +// { +// begin = std::chrono::steady_clock::now(); +// } +// void dr_cb (RdKafka::Message &message) { +// +// std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); +// /* If message.err() is non-zero the message delivery failed permanently +// * for the message. */ +// if (message.err()) +// { +// std::cerr << "% Message delivery failed: " << message.errstr() << std::endl; +// } +// else +// { +// ROS_INFO("Send frame %d in %ld", i, std::chrono::duration_cast(end - begin).count()); +// } +// } +//}; + +template +class RosKafkaMessageProducer +{ + public: + + RosKafkaMessageProducer(ros::NodeHandle nh, const std::string& rosTopic) : + RosKafkaMessageProducer(nh, + rosTopic, + &RosKafkaSerialization::serializeFromRosMsg, + &UAPartitioner::UAPartition) + {} + + RosKafkaMessageProducer(ros::NodeHandle nh, + const std::string& rosTopic, + std::function&)> serializer): + RosKafkaMessageProducer(nh, + rosTopic, + serializer, + &UAPartitioner::UAPartition) + {} + + + RosKafkaMessageProducer(ros::NodeHandle nh, + const std::string& rosTopic, + std::function&)> serializer, + std::function partitioner) + : + nh_(nh), + rosTopic_(rosTopic), + serializer_(serializer), + partitioner_(partitioner) + { + + + if(rosTopic != "") + { + /// TODO queue size configurable + sub_ = nh.subscribe(rosTopic, std::numeric_limits::max(), &RosKafkaMessageProducer::produceKafkaMsg, this); + } + + + // TODO should this be in commonconfig? + if(nh.getParam("kafka_topic", kafkaTopic_)) + { + ROS_INFO("Producing to %s", kafkaTopic_.c_str()); + } + else + { + ROS_INFO("NO KAFKA TOPIC DEFINED!"); + } + std::string errstr; + RosKafkaCommonConfig confHelper(nh_); + std::unique_ptr conf = confHelper.getConfig(); + producer_ = std::unique_ptr(RdKafka::Producer::create(conf.get(), errstr)); + conf.reset(); + + if (!producer_) { + std::cerr << "Failed to create producer: " << errstr << std::endl; + } + } + + void produceKafkaMsg(const msgT& msg) + { + boost::shared_array buffer; + // For measuring + //msgT msg2 = msg; + //msg2.header.stamp = ros::Time::now(); + //ros::Time ts = msg2.header.stamp; + //uint32_t serSize = serializer_(msg2, buffer); + + // Production + uint32_t serSize = serializer_(msg, buffer); + + /// use traits to use timestamp if present else now + ros::Time ts = getTimestampOrNow(msg); + + // Convert to ms + long ms_ts = (long) ts.sec * 1000 + (long) ts.nsec / 1000000; + bool retry = false; + do + { + RdKafka::ErrorCode err = + producer_->produce( + kafkaTopic_, + partitioner_(msg), + // TODO possible to transfer ownership?! + RdKafka::Producer::RK_MSG_COPY, + buffer.get(), serSize, + /* Key */ + NULL, 0, + ms_ts, + /* Message headers, if any */ + NULL, + /* Per-message opaque value passed to + * delivery report */ + NULL); + + if (err != RdKafka::ERR_NO_ERROR) { + + std::string errStr(RdKafka::err2str(err)); + ROS_INFO("Failed to produce %s", errStr.c_str()); + if (err == RdKafka::ERR__QUEUE_FULL) { + /// TODO SEE https://github.com/edenhill/librdkafka/issues/2247 + // TODO configurable behaviour. + producer_->poll(1000); + } + + // TODO configurable + retry = true; + } + } + while(retry); + + /* A producer application should continually serve + * the delivery report queue by calling poll() + * at frequent intervals. + * Either put the poll call in your main loop, or in a + * dedicated thread, or call it after every produce() call. + * Just make sure that poll() is still called + * during periods where you are not producing any messages + * to make sure previously produced messages have their + * delivery report callback served (and any other callbacks + * you register). */ + producer_->poll(0); + } + + + void poll() + { + producer_->poll(0); + } + + RosKafkaMessageProducer(RosKafkaMessageProducer&) = delete; + // Should/could this be kept and default?! + RosKafkaMessageProducer(RosKafkaMessageProducer&&) = delete; + RosKafkaMessageProducer& operator=(RosKafkaMessageProducer&) = delete; + + ~RosKafkaMessageProducer() + { + // TODO make configurable + producer_->flush(1 * 1000 /* wait for max 1 seconds */); + } + + + private: + + std::function&)> serializer_; + std::function partitioner_; + ros::NodeHandle nh_; + std::string rosTopic_; + ros::Subscriber sub_; + std::string kafkaTopic_; + std::unique_ptr producer_; +// ExampleDeliveryReportCb ex_dr_cb; +// std::unique_ptr msg_opaque_; +}; + +#endif diff --git a/include/ros_kafka/ros_kafka_serialization.h b/include/ros_kafka/ros_kafka_serialization.h new file mode 100644 index 0000000..58e53ad --- /dev/null +++ b/include/ros_kafka/ros_kafka_serialization.h @@ -0,0 +1,60 @@ +#ifndef ROS_KAFKA_SERIALIZATION_H_ +#define ROS_KAFKA_SERIALIZATION_H_ + +// ROS +#include +#include +#include + +// kafka +#include + + +namespace RosKafkaSerialization{ +template +boost::shared_ptr deserializeToRosMsg(RdKafka::Message* kafka_msg) +{ + boost::shared_ptr ros_msg = boost::shared_ptr(new msgT()); + kafka_msg->payload(); + ros::serialization::IStream stream(static_cast(kafka_msg->payload()), kafka_msg->len()); + ros::serialization::deserialize(stream, *ros_msg); + return ros_msg; +} + + +template <> +boost::shared_ptr deserializeToRosMsg(RdKafka::Message* kafka_msg) +{ + std::cout << "des img" << std::endl; + boost::shared_ptr ros_msg = boost::shared_ptr(new sensor_msgs::Image()); + kafka_msg->payload(); + ros::serialization::IStream stream(static_cast(kafka_msg->payload()), kafka_msg->len()); + ros::serialization::deserialize(stream, *ros_msg); + std::cout << ros_msg->data.size() << std::endl; + return ros_msg; +} + +template +uint32_t serializeFromRosMsg(const msgT& msg, boost::shared_array& buffer) +{ + uint32_t serSize = ros::serialization::serializationLength(msg); + // Maybe should be shared_ptr with array but we are using ros.. + buffer = boost::shared_array(new uint8_t[serSize]); + ros::serialization::OStream stream(buffer.get(), serSize); + ros::serialization::serialize(stream, msg); + return serSize; +} + +template <> +uint32_t serializeFromRosMsg(const sensor_msgs::Image& msg, boost::shared_array& buffer) +{ + uint32_t serSize = ros::serialization::serializationLength(msg); + // Maybe should be shared_ptr with array but we are using ros.. + buffer = boost::shared_array(new uint8_t[serSize]); + ros::serialization::OStream stream(buffer.get(), serSize); + ros::serialization::serialize(stream, msg); + return serSize; +} +} + +#endif diff --git a/include/ros_kafka/timestamp_util.h b/include/ros_kafka/timestamp_util.h new file mode 100644 index 0000000..1a88577 --- /dev/null +++ b/include/ros_kafka/timestamp_util.h @@ -0,0 +1,23 @@ +#ifndef TIMESTAMP_UTIL_ +#define TIMESTAMP_UTIL_ + +// see https://en.cppreference.com/w/cpp/types/void_t +// primary template handles types that have no nested ::type member: +template< class, class = void > +struct is_stamped_msg : std::false_type { }; + +// specialization recognizes types that do have a nested ::type member: +template< class msgT > +struct is_stamped_msg> : std::true_type { }; + + +/// use traits to use timestamp if present else now +template::value >* = nullptr > + ros::Time getTimestampOrNow(msgT msg) { return msg.header.stamp; } + +template::value >* = nullptr > + ros::Time getTimestampOrNow(msgT msg) { return ros::Time::now(); } + +#endif diff --git a/include/ros_kafka/unassigned_partitioner.h b/include/ros_kafka/unassigned_partitioner.h new file mode 100644 index 0000000..ebecd6b --- /dev/null +++ b/include/ros_kafka/unassigned_partitioner.h @@ -0,0 +1,21 @@ +#ifndef UNASSIGNED_PARTITIONER_H_ +#define UNASSIGNED_PARTITIONER_H_ + +// ROS +#include +#include +#include + +// kafka +#include + + +namespace UAPartitioner{ +template + int32_t UAPartition(const msgT& msg) + { + return RdKafka::Topic::PARTITION_UA; + } +} + +#endif diff --git a/launch/example_consumer.launch b/launch/example_consumer.launch new file mode 100644 index 0000000..8039859 --- /dev/null +++ b/launch/example_consumer.launch @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/launch/example_producer.launch b/launch/example_producer.launch new file mode 100644 index 0000000..b0ac89c --- /dev/null +++ b/launch/example_producer.launch @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/package.xml b/package.xml new file mode 100644 index 0000000..d14c75a --- /dev/null +++ b/package.xml @@ -0,0 +1,34 @@ + + + ros_kafka + 0.0.0 + Header library for writing from ROS to Kafka and vice versa + + Lennart Niecksch + Lennart Niecksch + BSD + + + + + + + catkin + roscpp + tf2 + + std_msgs + librdkafka + + roscpp + + std_msgs + librdkafka + + + roscpp + librdkafka + + std_msgs + +