diff --git a/.github/workflows/bazelized_drake_ros.yml b/.github/workflows/bazelized_drake_ros.yml index 038aa9b6..f5f46438 100755 --- a/.github/workflows/bazelized_drake_ros.yml +++ b/.github/workflows/bazelized_drake_ros.yml @@ -34,6 +34,13 @@ jobs: run: du -hs $(readlink -f ~/.cache/bazel_ci) || true # Setup. + - name: Free up disk space + run: | + sudo rm -rf \ + /usr/share/dotnet \ + /opt/ghc \ + /usr/local/share/boost \ + "$AGENT_TOOLSDIRECTORY" || true - name: Simplify apt upgrades run: .github/simplify_apt_and_upgrades.sh - name: Configure drake_ros Bazel for CI diff --git a/bazel_ros2_rules/WORKSPACE b/bazel_ros2_rules/WORKSPACE index cb0ca1cd..b386eb55 100644 --- a/bazel_ros2_rules/WORKSPACE +++ b/bazel_ros2_rules/WORKSPACE @@ -15,3 +15,7 @@ http_archive( load("@bazel_skylib//:workspace.bzl", "bazel_skylib_workspace") bazel_skylib_workspace() + +load("//deps:defs.bzl", "add_bazel_ros2_rules_dependencies") + +add_bazel_ros2_rules_dependencies() diff --git a/bazel_ros2_rules/network_isolation/BUILD.bazel b/bazel_ros2_rules/network_isolation/BUILD.bazel new file mode 100644 index 00000000..f74865e1 --- /dev/null +++ b/bazel_ros2_rules/network_isolation/BUILD.bazel @@ -0,0 +1,33 @@ +cc_library( + name = "network_isolation_cc", + srcs = ["network_isolation.cc"], + hdrs = ["network_isolation.h"], + visibility = ["//visibility:public"], +) + +cc_binary( + name = "isolate", + srcs = ["isolate.cc"], + visibility = ["//visibility:public"], + deps = [ + ":network_isolation_cc", + ], +) + +# Create a CPython extension +cc_binary( + name = "network_isolation_py.so", + srcs = ["network_isolation_py.cc"], + linkshared = True, + linkstatic = True, + deps = [ + ":network_isolation_cc", + "@python_dev//:headers", + ], +) + +py_library( + name = "network_isolation_py", + data = [":network_isolation_py.so"], + visibility = ["//visibility:public"], +) diff --git a/bazel_ros2_rules/network_isolation/README.md b/bazel_ros2_rules/network_isolation/README.md new file mode 100644 index 00000000..79cd4fff --- /dev/null +++ b/bazel_ros2_rules/network_isolation/README.md @@ -0,0 +1,103 @@ +# Introduction +This directory contains tools and targets to isolate ros2 tests in this repository using linux network namespaces. +At its core, it uses the ``unshare()`` system call, and is meant to isolate ROS2 traffic. It creates a new user namespace, +new network namespace to prevent cross talk via the network, and new IPC namespace to prevent cross talk using shared memory. + +The existing dload_shim is used to pass on the required argument and isolate the tests. + +## Why do we need this ? +When running ROS2 tests in parallel, they might publish on the same topics and there might be cross talk between tests. RMW config or ROS domain id based isolation is possible, +but it does not scale well, due to limited ports and domain ids available. Linux namespaces provide a generic and a scalable way to solve this problem. + +## Why not isolate individual targets instead of tests ? +Isolation using the namespace approach requires 3 namespaces, or "credentials" for processes to talk to each other, or be in the same realm : IPC, user and network namespaces. +Tests are more generic than individual targets, as the tests are free to fork() or run any number of processes they want, and they'll live in the same namespace. + +If we do isolate a process 'A', how do we make sure the next process 'B' will live in the same namespace as 'A' (if they're meant to talk to each other) ? There needs to be an API to provide the above mentioned "credentials" to the new process +somehow, which out of scope of this feature for now. + +# Targets +The logic lives in the following targets, which are meant to be used with the bazel rules ``ros_cc_test()`` and ``ros_py_test()`` : +* ``network_isolation_cc`` (cc_library): This is where the core logic lives, and the ``unshare()`` call is run. +* ``network_isolation_py.so``(cpython extension) : Python binding for the network isolation logic. +* ``network_isolation_py`` (py_library) : Importable python module for the network isolation logic. + +Other than these, there is a standalone executable target called ``isolate`` which is meant to be used in a standalone way, and not with the +``ros_*_test()`` rules. It isolates the process in the first argument provided to it. This uses the same ``unshare()`` logic as ``network_isolation_cc``. + +# How do we use this feature ? +There are 3 ways to use this feature : + +## Using ``ros_cc_test()`` rule : +There is now an extra argument (``network_isolation``) available to the rule, so for e.g. we can modify a test in ``ros2_example_bazel_installed/ros2_example_apps/BUILD.bazel`` as : + +``` +ros_cc_test( + name = "talker_listener_cc_test", + size = "small", + srcs = ["test/talker_listener.cc"], + rmw_implementation = "rmw_cyclonedds_cpp", + network_isolation = True, + deps = [ + ":listener_cc", + ":talker_cc", + "@ros2//:rclcpp_cc", + "@ros2//:std_msgs_cc", + "@ros2//resources/rmw_isolation:rmw_isolation_cc", + ], +) +``` + +Whatever processes are spawned by this test will be contained in a namespace, and will be able to talk to each other, but not to any other ros nodes running outside of this test. + +## Using the ``ros_py_test()`` rule : +Similarly for the python test rule, we can add ``network_isolation`` to ``True``. Consider this section in ``drake_ros_examples/examples/iiwa_manipulator/BUILD.bazel`` : + +``` +ros_py_test( + name = "iiwa_manipulator_test", + network_isolation = True, + srcs = ["test/iiwa_manipulator_test.py"], + data = [ + ":iiwa_manipulator", + ":iiwa_manipulator_py", + ], + main = "test/iiwa_manipulator_test.py", + deps = [ + "@ros2//resources/bazel_ros_env:bazel_ros_env_py", + ], +) +``` + +Similarly, the ros nodes spawned in this test can only talk to each other, and not other nodes running on the system at the same time, even if you have multiple instances of this test running in different terminals. + + +## Using the ``isolate`` target: +As mentioned before, the ``isolate`` target is meant to be used in a generic way and will isolate **any** process provided to it provided it is available in the bazel sandbox, for e.g. : +``` +cd bazel_ros2_rules +bazel run //network_isolation:isolate -- /bin/bash -c "echo HELLO_WORLD" +``` + +Here, the bash command for printing "HELLO_WORLD" is isolated, and can be replaced with any ros2 command. For instance, if you have ros2 humble installed +outside of drake-ros, using debs, you could run a publisher and subscriber that would be isolated from each other : + +Lets start the talker from the demo nodes package: +``` +bazel run //network_isolation:isolate -- /bin/bash -c "source /opt/ros/humble/setup.bash && export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp && ros2 run demo_nodes_cpp talker" +``` + +In another terminal, run : +``` +bazel run //network_isolation:isolate -- /bin/bash -c "source /opt/ros/humble/setup.bash && export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp && ros2 run demo_nodes_cpp listener" +``` + +These 2 processes should not be able to talk to each other, when used with ``isolate``. + +# Testing this feature in CI +Among the 3 ways to use this feature, as far as CI is concerned, there is a test for the ``isolate`` target mechanism. + +The test target is called ``network_isolation_test`` and uses ``ros2_example_bazel_installed/test/network_isolation_test.py`` to run 5 (by default) talker-listener pairs which have an id attached to them. +They all publish and listen on the same topic at the same time, and expect no cross talk between the pairs. The number of pairs can be changed using the ``--id`` cmdline argument if needed. + +The other 2 ways to use this feature, using ``ros_*_test()`` rules, run on tests, and not on executable targets, so *writing a test for a test* seems like an anti-pattern. The ``isolate`` target uses the same logic, so should be sufficient for testing. diff --git a/bazel_ros2_rules/network_isolation/isolate.cc b/bazel_ros2_rules/network_isolation/isolate.cc new file mode 100644 index 00000000..032da1d3 --- /dev/null +++ b/bazel_ros2_rules/network_isolation/isolate.cc @@ -0,0 +1,35 @@ +#include + +#include +#include + +#include "network_isolation.h" + +void die(const char * message) { + std::cerr << "isolate: " << message << ".\n"; + exit(-1); +} + +int main(int argc, char ** argv) { + if (argc < 2) { + die("shim must be given a command to execute"); + } + + if (!network_isolation::create_linux_namespaces()) { + die("Failed to fully create isolated environment"); + } + + // Copy to a new array that terminates with a null pointer at the end. + std::vector new_argv; + for (int i = 1; i < argc; ++i) { + new_argv.push_back(argv[i]); + } + new_argv.push_back(nullptr); + + // Exec a new process - should never return! + execv(new_argv.at(0), &new_argv.at(0)); + + perror("execv"); + die("Call to execv failed"); + return -1; +} diff --git a/bazel_ros2_rules/network_isolation/network_isolation.cc b/bazel_ros2_rules/network_isolation/network_isolation.cc new file mode 100644 index 00000000..d601bb4b --- /dev/null +++ b/bazel_ros2_rules/network_isolation/network_isolation.cc @@ -0,0 +1,109 @@ +#include "network_isolation.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace network_isolation { + +void error(const char * message) +{ + std::cerr << "create_linux_namesapces: " + << message << ":" << strerror(errno) << "\n"; +} + +bool create_linux_namespaces() +{ + int result = unshare(CLONE_NEWUSER | CLONE_NEWNET | CLONE_NEWIPC); + + if (result != 0) { + error("failed to call unshare"); + return false; + } + + // Assert there is exactly one network interface + struct ifaddrs *ifaddr; + + if (-1 == getifaddrs(&ifaddr)) { + error("could not get network interfaces"); + return false; + } + if (nullptr == ifaddr) { + error("there are no network interfaces"); + return false; + } + if (nullptr != ifaddr->ifa_next) { + error("there are multiple network interfaces"); + return false; + } + + // Need a socket to do ioctl stuff on + int fd = socket(AF_INET, SOCK_DGRAM, 0); + if( fd < 0 ){ + error("could not open a socket"); + freeifaddrs(ifaddr); + return false; + } + + struct ifreq ioctl_request; + + // Check what flags are set on the interface + strncpy(ioctl_request.ifr_name, ifaddr->ifa_name, IFNAMSIZ); + int err = ioctl(fd, SIOCGIFFLAGS, &ioctl_request); + if (0 != err) { + freeifaddrs(ifaddr); + error("failed to get interface flags"); + return false; + } + + // Expecting a loopback interface. + if (!(ioctl_request.ifr_flags & IFF_LOOPBACK)) { + error("the only interface is not a loopback interface"); + freeifaddrs(ifaddr); + return false; + } + + // Enable multicast + ioctl_request.ifr_flags |= IFF_MULTICAST; + // Bring up interface + ioctl_request.ifr_flags |= IFF_UP; + + err = ioctl(fd, SIOCSIFFLAGS, &ioctl_request); + if (0 != err) { + error("failed to set interface flags"); + freeifaddrs(ifaddr); + return false; + } + + // For programs that use both LCM and ROS, we need an LCM route ala + // sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo + struct rtentry route = {}; + auto* dest = reinterpret_cast(&route.rt_dst); + dest->sin_family = AF_INET; + dest->sin_addr.s_addr = inet_addr("224.0.0.0"); + auto* mask = reinterpret_cast(&route.rt_genmask); + mask->sin_family = AF_INET; + mask->sin_addr.s_addr = inet_addr("240.0.0.0"); + std::string device{"lo"}; + route.rt_dev = device.data(); + route.rt_flags = RTF_UP; + err = ioctl(fd, SIOCADDRT, &route); + if (0 != err) { + error("failed to set route"); + freeifaddrs(ifaddr); + return false; + } + + freeifaddrs(ifaddr); + return true; +} +} // namespace network_isolation diff --git a/bazel_ros2_rules/network_isolation/network_isolation.h b/bazel_ros2_rules/network_isolation/network_isolation.h new file mode 100644 index 00000000..6c9e7cba --- /dev/null +++ b/bazel_ros2_rules/network_isolation/network_isolation.h @@ -0,0 +1,22 @@ +#pragma once + +namespace network_isolation { +/// Creates linux namespaces suitable for isolating ROS 2 traffic. +/// +/// The new namespaces are: +/// * A new user namespace to avoid needing CAP_SYS_ADMIN to create +/// network and IPC namespaces +/// * A new network namespace to prevent cross-talk via the network +/// * A new IPC namespaces to prevent cross-talk via shared memory +/// +/// It also configures network namespace to enable ROS 2 traffic. +/// At the end of a successful call the current process will be in +/// the created namespaces. +/// Depending on what part of the process fails, an unsuccessful +/// call may also leave the current process in new namespaces. +/// There is no way to undo this. +/// +/// \return true iff the namespaces were created successfully. +bool create_linux_namespaces(); + +} // namespace network_isolation diff --git a/bazel_ros2_rules/network_isolation/network_isolation_py.cc b/bazel_ros2_rules/network_isolation/network_isolation_py.cc new file mode 100644 index 00000000..aa0fbe0a --- /dev/null +++ b/bazel_ros2_rules/network_isolation/network_isolation_py.cc @@ -0,0 +1,12 @@ +#include + +#include "network_isolation/network_isolation.h" + +namespace py = pybind11; + +PYBIND11_MODULE(network_isolation_py, m) +{ + m.def("create_linux_namespaces", &network_isolation::create_linux_namespaces, R"pbdoc( + Creates linux namespaces suitable for isolating ROS 2 traffic. + )pbdoc"); +} diff --git a/bazel_ros2_rules/ros2/ros_cc.bzl b/bazel_ros2_rules/ros2/ros_cc.bzl index fbc7ffdc..b79febb4 100644 --- a/bazel_ros2_rules/ros2/ros_cc.bzl +++ b/bazel_ros2_rules/ros2/ros_cc.bzl @@ -166,6 +166,7 @@ def ros_cc_test( cc_binary_rule = native.cc_binary, cc_library_rule = native.cc_library, cc_test_rule = native.cc_test, + network_isolation = False, **kwargs): """ Builds a C/C++ test and wraps it with a shim that will inject the minimal @@ -211,6 +212,7 @@ def ros_cc_test( name = shim_name, target = ":" + noshim_name, env_changes = shim_env_changes, + network_isolation = network_isolation, **shim_kwargs ) @@ -220,4 +222,8 @@ def ros_cc_test( deps = ["@bazel_ros2_rules//ros2:dload_shim_cc"], tags = ["nolint"] + kwargs.get("tags", []), ) + if network_isolation: + kwargs["deps"].append( + "@bazel_ros2_rules//network_isolation:network_isolation_cc", + ) cc_test_rule(name = name, **kwargs) diff --git a/bazel_ros2_rules/ros2/ros_py.bzl b/bazel_ros2_rules/ros2/ros_py.bzl index 5184c8f3..edbdadbc 100644 --- a/bazel_ros2_rules/ros2/ros_py.bzl +++ b/bazel_ros2_rules/ros2/ros_py.bzl @@ -135,6 +135,7 @@ def ros_py_test( rmw_implementation = None, py_binary_rule = native.py_binary, py_test_rule = native.py_test, + network_isolation = False, **kwargs): """ Builds a Python test and wraps it with a shim that will inject the minimal @@ -176,6 +177,7 @@ def ros_py_test( name = shim_name, target = ":" + noshim_name, env_changes = shim_env_changes, + network_isolation = network_isolation, **shim_kwargs ) @@ -186,4 +188,8 @@ def ros_py_test( deps = ["@bazel_ros2_rules//ros2:dload_shim_py"], tags = ["nolint"] + kwargs.get("tags", []), ) + if network_isolation: + kwargs["deps"].append( + "@bazel_ros2_rules//network_isolation:network_isolation_py", + ) py_test_rule(name = name, **kwargs) diff --git a/bazel_ros2_rules/ros2/tools/dload.bzl b/bazel_ros2_rules/ros2/tools/dload.bzl index 2d3ea2e2..9a519c11 100644 --- a/bazel_ros2_rules/ros2/tools/dload.bzl +++ b/bazel_ros2_rules/ros2/tools/dload.bzl @@ -59,6 +59,7 @@ def get_dload_shim_attributes(): cfg = "target", ), "env_changes": attr.string_list_dict(), + "network_isolation": attr.bool(default = False), } def _workaround_issue311(ament_prefixes, env_changes): diff --git a/bazel_ros2_rules/ros2/tools/dload_cc.bzl b/bazel_ros2_rules/ros2/tools/dload_cc.bzl index 939cae6b..f98488fd 100644 --- a/bazel_ros2_rules/ros2/tools/dload_cc.bzl +++ b/bazel_ros2_rules/ros2/tools/dload_cc.bzl @@ -12,10 +12,19 @@ load( "get_dload_shim_attributes", ) +_ISOLATE_IMPORT = '#include "network_isolation/network_isolation.h"' +_ISOLATE_CALL_OR_RETURN = """\ +if (!network_isolation::create_linux_namespaces()) {{ + return -1; +}} +""" + _REEXEC_TEMPLATE = """\ #include "ros2/tools/dload_shim.h" +CC_ISOLATE_IMPORT int main(int argc, const char * argv[]) {{ + CC_ISOLATE_CALL const char * executable_path = "{executable_path}"; std::vector names = {names}; std::vector> actions = {actions}; @@ -24,12 +33,23 @@ int main(int argc, const char * argv[]) {{ }} """ +def _resolve_isolation(template, network_isolation): + isolate_import = "" + isolate_call = "" + if network_isolation: + isolate_import = _ISOLATE_IMPORT + isolate_call = _ISOLATE_CALL_OR_RETURN + template = template.replace("CC_ISOLATE_IMPORT", isolate_import) + template = template.replace("CC_ISOLATE_CALL", isolate_call) + return template + def _to_cc_list(collection): """Turn collection into a C++ aggregate initializer expression.""" return "{" + ", ".join(collection) + "}" def _dload_cc_reexec_impl(ctx): - return do_dload_shim(ctx, _REEXEC_TEMPLATE, _to_cc_list) + template = _resolve_isolation(_REEXEC_TEMPLATE, ctx.attr.network_isolation) + return do_dload_shim(ctx, template, _to_cc_list) dload_cc_reexec = rule( doc = """\ @@ -52,10 +72,12 @@ dload_cc_reexec = rule( _LDWRAP_TEMPLATE = """\ #include "ros2/tools/dload_shim.h" +CC_ISOLATE_IMPORT extern "C" int __real_main(int argc, char** argv); extern "C" int __wrap_main(int argc, char** argv) {{ + CC_ISOLATE_CALL std::vector names = {names}; std::vector> actions = {actions}; bazel_ros2_rules::ApplyEnvironmentActions(argv[0], names, actions); @@ -64,7 +86,8 @@ extern "C" int __wrap_main(int argc, char** argv) {{ """ def _dload_cc_ldwrap_impl(ctx): - return do_dload_shim(ctx, _LDWRAP_TEMPLATE, _to_cc_list) + template = _resolve_isolation(_LDWRAP_TEMPLATE, ctx.attr.network_isolation) + return do_dload_shim(ctx, template, _to_cc_list) dload_cc_ldwrap = rule( doc = """\ diff --git a/bazel_ros2_rules/ros2/tools/dload_py.bzl b/bazel_ros2_rules/ros2/tools/dload_py.bzl index 4aaf2195..c7f1ff32 100644 --- a/bazel_ros2_rules/ros2/tools/dload_py.bzl +++ b/bazel_ros2_rules/ros2/tools/dload_py.bzl @@ -13,23 +13,48 @@ load( "get_dload_shim_attributes", ) +_ISOLATE_IMPORT = """\ +import network_isolation.network_isolation_py as isolation +""" + +_ISOLATE_CALL_OR_RETURN = """\ +if not isolation.create_linux_namespaces(): + sys.exit(-1) +""" + _DLOAD_PY_SHIM_TEMPLATE = """\ assert __name__ == "__main__" from bazel_ros2_rules.ros2.tools.dload_shim import do_dload_shim +PY_ISOLATE_IMPORT +PY_ISOLATE_CALL executable_path = "{executable_path}" names = {names} actions = {actions} do_dload_shim(executable_path, names, actions) """ +def _resolve_isolation(template, network_isolation): + isolate_import = "" + isolate_call = "" + if network_isolation: + isolate_import = _ISOLATE_IMPORT + isolate_call = _ISOLATE_CALL_OR_RETURN + template = template.replace("PY_ISOLATE_IMPORT", isolate_import) + template = template.replace("PY_ISOLATE_CALL", isolate_call) + return template + def _to_py_list(collection): """Turn collection into a Python list expression.""" return "[" + ", ".join(collection) + "]" def _dload_py_shim_impl(ctx): - return do_dload_shim(ctx, _DLOAD_PY_SHIM_TEMPLATE, _to_py_list) + template = _resolve_isolation( + _DLOAD_PY_SHIM_TEMPLATE, + ctx.attr.network_isolation, + ) + return do_dload_shim(ctx, template, _to_py_list) dload_py_shim = rule( attrs = get_dload_shim_attributes(), diff --git a/ros2_example_bazel_installed/BUILD.bazel b/ros2_example_bazel_installed/BUILD.bazel index 29407957..6bc4d2f6 100644 --- a/ros2_example_bazel_installed/BUILD.bazel +++ b/ros2_example_bazel_installed/BUILD.bazel @@ -86,6 +86,28 @@ ros_cc_test( deps = ["@ros2//:console_bridge_vendor_cc"], ) +ros_py_binary( + name = "talker_listener_isolated_py", + srcs = ["test/talker_listener.py"], + main = "test/talker_listener.py", + deps = [ + "@ros2//:rclpy_py", + "@ros2//:std_msgs_py", + ], +) + +ros_py_test( + name = "network_isolation_test", + srcs = ["test/network_isolation_test.py"], + data = [ + "@bazel_ros2_rules//network_isolation:isolate", + ], + main = "test/network_isolation_test.py", + deps = [ + "//:talker_listener_isolated_py", + ], +) + # Provide a roll-up of all generated IDL types for `//tools:ros2`. py_library( name = "ros_msgs_all_py", diff --git a/ros2_example_bazel_installed/test/network_isolation_test.py b/ros2_example_bazel_installed/test/network_isolation_test.py new file mode 100644 index 00000000..d0fd69e6 --- /dev/null +++ b/ros2_example_bazel_installed/test/network_isolation_test.py @@ -0,0 +1,25 @@ +import argparse +import sys +from subprocess import Popen + +# This script spawns a bunch of talker-listener pairs +# publishing on the same topic. They are isolated using +# the network isolation mechanism, and hence should not +# cross talk. + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--number_of_isolated_pairs", type=int, default=5) + args = parser.parse_args() + + subprocess_list = [] + for i in range(args.number_of_isolated_pairs): + subprocess_list.append(Popen(['external/bazel_ros2_rules/network_isolation/isolate', + sys.executable, 'test/talker_listener.py', + '--id', str(i)])) + + for process in subprocess_list: + process.wait() + +if __name__ == "__main__": + main() diff --git a/ros2_example_bazel_installed/test/talker_listener.py b/ros2_example_bazel_installed/test/talker_listener.py new file mode 100644 index 00000000..e1f7ece4 --- /dev/null +++ b/ros2_example_bazel_installed/test/talker_listener.py @@ -0,0 +1,63 @@ +import argparse + +import rclpy +import rclpy.node +import std_msgs +from std_msgs.msg import Float64 +from rclpy.executors import MultiThreadedExecutor + +class Talker(rclpy.node.Node): + def __init__(self, id): + super().__init__('talker_' + str(id)) + self._publisher = self.create_publisher( + std_msgs.msg.Float64, 'chatter', 10) + self._timer = self.create_timer(0.1, self._timer_callback) + self._id = id + + def _timer_callback(self): + msg = std_msgs.msg.Float64() + msg.data = float(self._id) + self._publisher.publish(msg) + +class Listener(rclpy.node.Node): + def __init__(self, id): + super().__init__('listener_' + str(id)) + self._subscription = self.create_subscription( + std_msgs.msg.Float64, + 'chatter', + self._topic_callback, + 10) + timeout = self.declare_parameter('timeout', 2.0) + self._timer = self.create_timer( + timeout.value, self._timer_callback) + self._expected_messages_received = 0 + self._id = id + + def _topic_callback(self, msg): + assert msg.data == self._id, \ + f"I heard '{msg.data}' yet I was expecting '{self._id}'!" + self._expected_messages_received += 1 + + def _timer_callback(self): + assert self._expected_messages_received > 0, \ + f"I did not hear '{self._id}' even once!" + rclpy.shutdown() + +def main(): + # This script launches a pair of a talker and a listener + # that are bound to some numerical id. The talker publishes + # the id, and the listener expectes that id in the msg. + parser = argparse.ArgumentParser() + parser.add_argument("--id", type=int, default=0) + args = parser.parse_args() + + rclpy.init() + + executor = MultiThreadedExecutor() + executor.add_node(Talker(args.id)) + executor.add_node(Listener(args.id)) + + executor.spin() + +if __name__ == "__main__": + main()