diff --git a/udp_driver/include/boost_udp_driver/udp_driver.hpp b/udp_driver/include/boost_udp_driver/udp_driver.hpp index a109bd2..df4139b 100644 --- a/udp_driver/include/boost_udp_driver/udp_driver.hpp +++ b/udp_driver/include/boost_udp_driver/udp_driver.hpp @@ -39,6 +39,8 @@ class UdpDriver const std::string & remote_ip, uint16_t remote_port, const std::string & host_ip, uint16_t host_port); void init_receiver(const std::string & ip, uint16_t port); + void init_receiver(const std::string & remote_ip, uint16_t remote_port, const std::string & host_ip, uint16_t host_port); + void init_receiver(const std::string & remote_ip, uint16_t remote_port, const std::string & host_ip, uint16_t host_port, size_t buffer_size); void init_receiver(const std::string & ip, uint16_t port, size_t buffer_size); diff --git a/udp_driver/include/boost_udp_driver/udp_socket.hpp b/udp_driver/include/boost_udp_driver/udp_socket.hpp index 140d403..e3974b8 100644 --- a/udp_driver/include/boost_udp_driver/udp_socket.hpp +++ b/udp_driver/include/boost_udp_driver/udp_socket.hpp @@ -30,6 +30,7 @@ namespace udp_driver { using Functor = std::function &)>; +using FunctorWithSender = std::function &, const std::string & sender)>; class UdpSocket { @@ -64,6 +65,7 @@ class UdpSocket void close(); bool isOpen() const; void bind(); + void setMulticast(bool value); /* * Blocking Send Operation @@ -85,6 +87,11 @@ class UdpSocket */ void asyncReceive(Functor func); + /* + * NonBlocking Receive Operation with Sender information + */ + void asyncReceiveWithSender(FunctorWithSender func); + private: void asyncSendHandler( const boost::system::error_code & error, @@ -94,15 +101,24 @@ class UdpSocket const boost::system::error_code & error, std::size_t bytes_transferred); + void asyncReceiveHandler2( + const boost::system::error_code & error, + std::size_t bytes_transferred); + private: const drivers::common::IoContext & m_ctx; boost::asio::ip::udp::socket m_udp_socket; boost::asio::ip::udp::endpoint m_remote_endpoint; boost::asio::ip::udp::endpoint m_host_endpoint; + boost::asio::ip::udp::endpoint m_any_endpoint; Functor m_func; + FunctorWithSender m_func_with_sender; static constexpr size_t m_default_recv_buffer_size{2048}; size_t m_recv_buffer_size; + bool m_use_multicast; std::vector m_recv_buffer; + + boost::asio::ip::udp::endpoint sender_endpoint_; }; } // namespace udp_driver diff --git a/udp_driver/src/udp_driver.cpp b/udp_driver/src/udp_driver.cpp index 5420eb2..bd3d384 100644 --- a/udp_driver/src/udp_driver.cpp +++ b/udp_driver/src/udp_driver.cpp @@ -47,6 +47,18 @@ void UdpDriver::init_receiver(const std::string & ip, uint16_t port) m_receiver.reset(new UdpSocket(m_ctx, ip, port)); } +void UdpDriver::init_receiver(const std::string & remote_ip, uint16_t remote_port, + const std::string & host_ip, uint16_t host_port) +{ + m_receiver.reset(new UdpSocket(m_ctx, remote_ip, remote_port, host_ip, host_port)); +} + +void UdpDriver::init_receiver(const std::string & remote_ip, uint16_t remote_port, + const std::string & host_ip, uint16_t host_port, size_t buffer_size) +{ + m_receiver.reset(new UdpSocket(m_ctx, remote_ip, remote_port, host_ip, host_port, buffer_size)); +} + void UdpDriver::init_receiver(const std::string & ip, uint16_t port, size_t buffer_size) { m_receiver.reset(new UdpSocket(m_ctx, ip, port, buffer_size)); diff --git a/udp_driver/src/udp_socket.cpp b/udp_driver/src/udp_socket.cpp index ea2384c..7ff4dbf 100644 --- a/udp_driver/src/udp_socket.cpp +++ b/udp_driver/src/udp_socket.cpp @@ -41,7 +41,9 @@ UdpSocket::UdpSocket( m_udp_socket(ctx.ios()), m_remote_endpoint(boost::asio::ip::address::from_string(remote_ip), remote_port), m_host_endpoint(boost::asio::ip::address::from_string(host_ip), host_port), - m_recv_buffer_size(recv_buffer_size) + m_any_endpoint(boost::asio::ip::address::from_string("0.0.0.0"), host_port), + m_recv_buffer_size(recv_buffer_size), + m_use_multicast(false) { m_remote_endpoint = remote_ip.empty() ? boost::asio::ip::udp::endpoint{boost::asio::ip::udp::v4(), remote_port} : @@ -98,7 +100,7 @@ size_t UdpSocket::receive(std::vector & buff) std::size_t len = m_udp_socket.receive_from( boost::asio::buffer(buff), - m_host_endpoint, + sender_endpoint, 0, error); @@ -121,16 +123,31 @@ void UdpSocket::asyncSend(std::vector & buff) void UdpSocket::asyncReceive(Functor func) { + boost::asio::ip::udp::endpoint sender_endpoint; + m_func = std::move(func); m_udp_socket.async_receive_from( boost::asio::buffer(m_recv_buffer), - m_host_endpoint, + sender_endpoint, [this](boost::system::error_code error, std::size_t bytes_transferred) { asyncReceiveHandler(error, bytes_transferred); }); } +void UdpSocket::asyncReceiveWithSender(FunctorWithSender func) +{ + m_func_with_sender = std::move(func); + m_udp_socket.async_receive_from( + boost::asio::buffer(m_recv_buffer), + sender_endpoint_, + [this](boost::system::error_code error, std::size_t bytes_transferred) + { + std::cout << "asyncReceiveWithSender" << std::endl << std::endl; + asyncReceiveHandler2(error, bytes_transferred); + }); +} + void UdpSocket::asyncSendHandler( const boost::system::error_code & error, std::size_t bytes_transferred) @@ -166,6 +183,31 @@ void UdpSocket::asyncReceiveHandler( } } +void UdpSocket::asyncReceiveHandler2( + const boost::system::error_code & error, + std::size_t bytes_transferred) +{ + (void)bytes_transferred; + if (error) { + RCLCPP_ERROR_STREAM(rclcpp::get_logger("UdpSocket::asyncReceiveHandler"), error.message()); + return; + } + + if (bytes_transferred > 0 && m_func_with_sender) { + m_recv_buffer.resize(bytes_transferred); + m_func_with_sender(m_recv_buffer, sender_endpoint_.address().to_string()); + m_recv_buffer.resize(m_recv_buffer_size); + m_udp_socket.async_receive_from( + boost::asio::buffer(m_recv_buffer), + sender_endpoint_, + [this](boost::system::error_code error, std::size_t bytes_tf) + { + m_recv_buffer.resize(bytes_tf); + asyncReceiveHandler2(error, bytes_tf); + }); + } +} + std::string UdpSocket::remote_ip() const { return m_remote_endpoint.address().to_string(); @@ -208,7 +250,17 @@ bool UdpSocket::isOpen() const void UdpSocket::bind() { - m_udp_socket.bind(m_host_endpoint); + if (m_use_multicast) { + m_udp_socket.bind(m_any_endpoint); + m_udp_socket.set_option(boost::asio::ip::multicast::join_group(m_remote_endpoint.address().to_v4(), m_host_endpoint.address().to_v4())); + } else { + m_udp_socket.bind(m_host_endpoint); + } +} + +void UdpSocket::setMulticast(bool value) +{ + m_use_multicast = value; } } // namespace udp_driver