Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): multicast udp socket support #22099

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
27 changes: 26 additions & 1 deletion lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
hash::Hash,
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64,
NonZeroU8, NonZeroUsize,
Expand Down Expand Up @@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired from the SocketAddr implementation a few lines above

fn referenceable_name() -> Option<&'static str> {
Some("stdlib::Ipv4Addr")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("An IPv4 address.");
metadata
}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the same TODOs as in

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {

// but we eventually should have validation since the format for the possible permutations
// is well-known and can be easily codified.
Ok(generate_string_schema())
}
}

impl ToValue for Ipv4Addr {
fn to_value(&self) -> Value {
Value::String(self.to_string())
}
}

impl Configurable for PathBuf {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::PathBuf")
Expand Down
43 changes: 43 additions & 0 deletions src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::{Ipv4Addr, SocketAddr};

use super::default_host_key;
use bytes::BytesMut;
use chrono::Utc;
Expand Down Expand Up @@ -41,6 +43,17 @@ pub struct UdpConfig {
#[configurable(derived)]
address: SocketListenAddr,

/// TODO: document this.
/// TODO: The join multicast method should fail if the address is not SocketListenAddr::SocketAddr.
/// multicast wont work with systemd{N} fd sockets.
/// Also, if the address is IPv4, the multicast address should be IPv4 too.
/// TODO: should we support a list of groups or a single group? The `join_multicast` method supports
/// just one group per call.
/// TODO: document that we use `IPv4Addr` and not `SocketAddr` for the multicast groups because
/// the `join_multicast_v6` is not supported due to the need of using an interface index.s
#[serde(default)]
multicast_groups: Vec<Ipv4Addr>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
Expand Down Expand Up @@ -118,6 +131,7 @@ impl UdpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
multicast_groups: Vec::new(),
max_length: default_max_length(),
host_key: None,
port_key: default_port_key(),
Expand Down Expand Up @@ -152,6 +166,35 @@ pub(super) fn udp(
})
})?;

if !config.multicast_groups.is_empty() {
let listen_addr = match config.address() {
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
todo!("handle this error, IPv6 multicast is not supported")
}
// TODO: if we need to support systemd{N} fd sockets, we should use the
// `UdpSocket::local_addr` method to get the address of the socket.
// that method can fail and I wonder if the user sets `IP_ADDR_ANY` (`0.0.0.0`) in the config,
// the `UdpSocket::local_addr` would return the real interface address that the
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address
// for the multicast group join that the user has set in the config.
// if systemd{N} fd sockets are required to work too, we should investigate on this.
SocketListenAddr::SystemdFd(_) => todo!("handle this error"),
};
for group_addr in config.multicast_groups {
socket
.join_multicast_v4(group_addr, *listen_addr.ip())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I use listen_addr.ip() here o just hardcode Ipv4Addr::UNSPECIFIED? If 0.0.0.0 is not set, the interface would filter out the packets targeting the multicast ip

see the method documentation here: https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v4

and more info about ip multicast here: https://stackoverflow.com/questions/10692956/what-does-it-mean-to-bind-a-multicast-udp-socket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, if either listen_addr.ip() or Ipv4Addr::UNSPECIFIED is used here, this would only if the source's address in config is 0.0.0.0 as setting it to 127.0.0.1 (for example) would filter out any packet going to the multicast addres

for example, this config:

[sources.multicast_udp]
type = "socket"
mode = "udp"
address = "127.0.0.1:4242"
multicast_groups = ["224.0.0.2"]


[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json"

and this command echo hello | nc 224.0.0.2 4242 -u

wont work because the socket address is no 0..0.0.0.

If we hardcode this to Ipv4Addr::UNSPECIFIED, the behaviour would be the same; that config won't work.

Is this clear enough?

.map_err(|error| {
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case?
emit!(SocketBindError {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this considered a SocketBindError? or should we create a new error for this case?

mode: SocketMode::Udp,
error,
})
})?;
// TODO: add debug (or info) logs here to inform the user that the socket has joined the multicast group.
}
}

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
Expand Down