Skip to content

Commit

Permalink
feat: add multicast support (#7)
Browse files Browse the repository at this point in the history
* feat: added multicast supporty

* fix: fixed multicast with different sensors simultaneously

Signed-off-by: Kenzo Lobos-Tsunekawa <[email protected]>

---------

Signed-off-by: Kenzo Lobos-Tsunekawa <[email protected]>
  • Loading branch information
knzo25 authored Mar 29, 2024
1 parent 5b007e2 commit e7e25ed
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 4 deletions.
2 changes: 2 additions & 0 deletions udp_driver/include/boost_udp_driver/udp_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class UdpDriver
const std::string & remote_ip, uint16_t remote_port,
const std::string & host_ip, uint16_t host_port);
void init_receiver(const std::string & ip, uint16_t port);
void init_receiver(const std::string & remote_ip, uint16_t remote_port, const std::string & host_ip, uint16_t host_port);
void init_receiver(const std::string & remote_ip, uint16_t remote_port, const std::string & host_ip, uint16_t host_port, size_t buffer_size);
void init_receiver(const std::string & ip, uint16_t port, size_t buffer_size);


Expand Down
16 changes: 16 additions & 0 deletions udp_driver/include/boost_udp_driver/udp_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace udp_driver
{

using Functor = std::function<void (const std::vector<uint8_t> &)>;
using FunctorWithSender = std::function<void (const std::vector<uint8_t> &, const std::string & sender)>;

class UdpSocket
{
Expand Down Expand Up @@ -64,6 +65,7 @@ class UdpSocket
void close();
bool isOpen() const;
void bind();
void setMulticast(bool value);

/*
* Blocking Send Operation
Expand All @@ -85,6 +87,11 @@ class UdpSocket
*/
void asyncReceive(Functor func);

/*
* NonBlocking Receive Operation with Sender information
*/
void asyncReceiveWithSender(FunctorWithSender func);

private:
void asyncSendHandler(
const boost::system::error_code & error,
Expand All @@ -94,15 +101,24 @@ class UdpSocket
const boost::system::error_code & error,
std::size_t bytes_transferred);

void asyncReceiveHandler2(
const boost::system::error_code & error,
std::size_t bytes_transferred);

private:
const drivers::common::IoContext & m_ctx;
boost::asio::ip::udp::socket m_udp_socket;
boost::asio::ip::udp::endpoint m_remote_endpoint;
boost::asio::ip::udp::endpoint m_host_endpoint;
boost::asio::ip::udp::endpoint m_any_endpoint;
Functor m_func;
FunctorWithSender m_func_with_sender;
static constexpr size_t m_default_recv_buffer_size{2048};
size_t m_recv_buffer_size;
bool m_use_multicast;
std::vector<uint8_t> m_recv_buffer;

boost::asio::ip::udp::endpoint sender_endpoint_;
};

} // namespace udp_driver
Expand Down
12 changes: 12 additions & 0 deletions udp_driver/src/udp_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ void UdpDriver::init_receiver(const std::string & ip, uint16_t port)
m_receiver.reset(new UdpSocket(m_ctx, ip, port));
}

void UdpDriver::init_receiver(const std::string & remote_ip, uint16_t remote_port,
const std::string & host_ip, uint16_t host_port)
{
m_receiver.reset(new UdpSocket(m_ctx, remote_ip, remote_port, host_ip, host_port));
}

void UdpDriver::init_receiver(const std::string & remote_ip, uint16_t remote_port,
const std::string & host_ip, uint16_t host_port, size_t buffer_size)
{
m_receiver.reset(new UdpSocket(m_ctx, remote_ip, remote_port, host_ip, host_port, buffer_size));
}

void UdpDriver::init_receiver(const std::string & ip, uint16_t port, size_t buffer_size)
{
m_receiver.reset(new UdpSocket(m_ctx, ip, port, buffer_size));
Expand Down
60 changes: 56 additions & 4 deletions udp_driver/src/udp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ UdpSocket::UdpSocket(
m_udp_socket(ctx.ios()),
m_remote_endpoint(boost::asio::ip::address::from_string(remote_ip), remote_port),
m_host_endpoint(boost::asio::ip::address::from_string(host_ip), host_port),
m_recv_buffer_size(recv_buffer_size)
m_any_endpoint(boost::asio::ip::address::from_string("0.0.0.0"), host_port),
m_recv_buffer_size(recv_buffer_size),
m_use_multicast(false)
{
m_remote_endpoint = remote_ip.empty() ?
boost::asio::ip::udp::endpoint{boost::asio::ip::udp::v4(), remote_port} :
Expand Down Expand Up @@ -98,7 +100,7 @@ size_t UdpSocket::receive(std::vector<uint8_t> & buff)

std::size_t len = m_udp_socket.receive_from(
boost::asio::buffer(buff),
m_host_endpoint,
sender_endpoint,
0,
error);

Expand All @@ -121,16 +123,31 @@ void UdpSocket::asyncSend(std::vector<uint8_t> & buff)

void UdpSocket::asyncReceive(Functor func)
{
boost::asio::ip::udp::endpoint sender_endpoint;

m_func = std::move(func);
m_udp_socket.async_receive_from(
boost::asio::buffer(m_recv_buffer),
m_host_endpoint,
sender_endpoint,
[this](boost::system::error_code error, std::size_t bytes_transferred)
{
asyncReceiveHandler(error, bytes_transferred);
});
}

void UdpSocket::asyncReceiveWithSender(FunctorWithSender func)
{
m_func_with_sender = std::move(func);
m_udp_socket.async_receive_from(
boost::asio::buffer(m_recv_buffer),
sender_endpoint_,
[this](boost::system::error_code error, std::size_t bytes_transferred)
{
std::cout << "asyncReceiveWithSender" << std::endl << std::endl;
asyncReceiveHandler2(error, bytes_transferred);
});
}

void UdpSocket::asyncSendHandler(
const boost::system::error_code & error,
std::size_t bytes_transferred)
Expand Down Expand Up @@ -166,6 +183,31 @@ void UdpSocket::asyncReceiveHandler(
}
}

void UdpSocket::asyncReceiveHandler2(
const boost::system::error_code & error,
std::size_t bytes_transferred)
{
(void)bytes_transferred;
if (error) {
RCLCPP_ERROR_STREAM(rclcpp::get_logger("UdpSocket::asyncReceiveHandler"), error.message());
return;
}

if (bytes_transferred > 0 && m_func_with_sender) {
m_recv_buffer.resize(bytes_transferred);
m_func_with_sender(m_recv_buffer, sender_endpoint_.address().to_string());
m_recv_buffer.resize(m_recv_buffer_size);
m_udp_socket.async_receive_from(
boost::asio::buffer(m_recv_buffer),
sender_endpoint_,
[this](boost::system::error_code error, std::size_t bytes_tf)
{
m_recv_buffer.resize(bytes_tf);
asyncReceiveHandler2(error, bytes_tf);
});
}
}

std::string UdpSocket::remote_ip() const
{
return m_remote_endpoint.address().to_string();
Expand Down Expand Up @@ -208,7 +250,17 @@ bool UdpSocket::isOpen() const

void UdpSocket::bind()
{
m_udp_socket.bind(m_host_endpoint);
if (m_use_multicast) {
m_udp_socket.bind(m_any_endpoint);
m_udp_socket.set_option(boost::asio::ip::multicast::join_group(m_remote_endpoint.address().to_v4(), m_host_endpoint.address().to_v4()));
} else {
m_udp_socket.bind(m_host_endpoint);
}
}

void UdpSocket::setMulticast(bool value)
{
m_use_multicast = value;
}

} // namespace udp_driver
Expand Down

0 comments on commit e7e25ed

Please sign in to comment.