Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): multicast udp socket support #22099

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
8 changes: 8 additions & 0 deletions changelog.d/5732_multicast_udp_socket_sources.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `socket` source with `udp` mode now supports joining multicast groups via the `multicast_groups` option
of that source. This allows the source to receive multicast packets from the specified multicast groups.

Note that in order to work properly, the `socket` address must be set to `0.0.0.0` and not
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this notice needed if it was added too to website docs?

to `127.0.0.1` (localhost) or any other specific IP address. If other IP address is used, the host's interface
will filter out the multicast packets as the packet target IP (multicast) would not match the host's interface IP.

authors: jorgehermo9
27 changes: 26 additions & 1 deletion lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
hash::Hash,
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64,
NonZeroU8, NonZeroUsize,
Expand Down Expand Up @@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired from the SocketAddr implementation a few lines above

fn referenceable_name() -> Option<&'static str> {
Some("stdlib::Ipv4Addr")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("An IPv4 address.");
metadata
}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the same TODOs as in

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {

// but we eventually should have validation since the format for the possible permutations
// is well-known and can be easily codified.
Ok(generate_string_schema())
}
}

impl ToValue for Ipv4Addr {
fn to_value(&self) -> Value {
Value::String(self.to_string())
}
}

impl Configurable for PathBuf {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::PathBuf")
Expand Down
135 changes: 129 additions & 6 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod test {
use approx::assert_relative_eq;
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -374,7 +374,7 @@ mod test {
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
next_addr, next_addr_any, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
SourceSender,
Expand Down Expand Up @@ -899,12 +899,27 @@ mod test {

//////// UDP TESTS ////////
fn send_lines_udp(addr: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
send_packets_udp(addr, lines.into_iter().map(|line| line.into()))
send_lines_udp_from(next_addr(), addr, lines)
}

fn send_lines_udp_from(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored this a little bit, because the socket from where we will send the packets must be on the same interface as where the source is listening. And, with multicast groups, 127.0.0.1 seems to not work propertly and we have to use both 0.0.0.0 for the source's listening socket and for the socket that will send the packets. Hope I'm explaining this well..

from: SocketAddr,
addr: SocketAddr,
lines: impl IntoIterator<Item = String>,
) -> SocketAddr {
send_packets_udp_from(from, addr, lines.into_iter().map(|line| line.into()))
}

fn send_packets_udp(addr: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
let bind = next_addr();
let socket = UdpSocket::bind(bind)
send_packets_udp_from(next_addr(), addr, packets)
}

fn send_packets_udp_from(
from: SocketAddr,
addr: SocketAddr,
packets: impl IntoIterator<Item = Bytes>,
) -> SocketAddr {
let socket = UdpSocket::bind(from)
.map_err(|error| panic!("{:}", error))
.ok()
.unwrap();
Expand All @@ -926,7 +941,7 @@ mod test {
thread::sleep(Duration::from_millis(10));

// Done
bind
from
}

async fn init_udp_with_shutdown(
Expand Down Expand Up @@ -1303,6 +1318,114 @@ mod test {
.await;
}

#[tokio::test]
async fn multicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
// The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

// We must send packets to the same interface the `socket_address` is bound to
// in order to receive the multicast packets this `from` socket sends
// To do so, we use the `IPADDR_ANY` address
let from = next_addr_any();
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
async fn multiple_multicast_addresses_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_addresses = (2..12)
.map(|i| format!("224.0.0.{i}").parse().unwrap())
.collect::<Vec<Ipv4Addr>>();
let multicast_ip_socket_addresses = multicast_ip_addresses
.iter()
.map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
.collect::<Vec<SocketAddr>>();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = multicast_ip_addresses;
init_udp_with_config(tx, config).await;

let from = next_addr_any();
for multicast_ip_socket_address in multicast_ip_socket_addresses {
send_lines_udp_from(
from,
multicast_ip_socket_address,
[multicast_ip_socket_address.to_string()],
);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
multicast_ip_socket_address.to_string().into()
);
}
})
.await;
}

#[tokio::test]
async fn multicast_and_unicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

let from = next_addr_any();
// Send packet to multicast address
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);

// Send packet to unicast address
send_lines_udp_from(from, socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
#[should_panic]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to use #[should_panic] here as init_udp_with_config does not return a Result to know whether the initialization failed or not

.unwrap();

async fn udp_invalid_multicast_group() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, _rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![invalid_multicast_ip_address];
init_udp_with_config(tx, config).await;
})
.await;
}

////////////// UNIX TEST LIBS //////////////

#[cfg(unix)]
Expand Down
45 changes: 45 additions & 0 deletions src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::{Ipv4Addr, SocketAddr};

use super::default_host_key;
use bytes::BytesMut;
use chrono::Utc;
Expand Down Expand Up @@ -41,6 +43,21 @@ pub struct UdpConfig {
#[configurable(derived)]
address: SocketListenAddr,

/// List of IPv4 multicast groups to join on socket's binding process.
///
/// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`.
/// If any other address is used (such as `127.0.0.1` or an specific interface address), the
/// listening interface will filter out all multicast packets received,
/// as their target IP would be the one of the multicast group
/// and it will not match the socket's bound IP.
///
/// Note that this setting will only work if the source's address
/// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported
/// with multicast groups).
#[serde(default)]
#[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
pub(super) multicast_groups: Vec<Ipv4Addr>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
Expand Down Expand Up @@ -118,6 +135,7 @@ impl UdpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
multicast_groups: Vec::new(),
max_length: default_max_length(),
host_key: None,
port_key: default_port_key(),
Expand Down Expand Up @@ -152,6 +170,33 @@ pub(super) fn udp(
})
})?;

if !config.multicast_groups.is_empty() {
socket.set_multicast_loop_v4(true).unwrap();
let listen_addr = match config.address() {
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
// We could support Ipv6 multicast with the
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
// and specifying the interface index as `0`, in order to bind all interfaces.
panic!("IPv6 multicast is not supported")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about adding ipv6 support too... But as no one requested it yet, I thought i wouldn't be worth to implement & test it in this PR. Ayn thoughts about this? Should I include ipv6 support in this PR?

}
SocketListenAddr::SystemdFd(_) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use the UdpSocket::local_addr with this socket, but I wonder if this would ever output IPADDR_ANY (0.0.0.0) instead of the real interface address that socket is bound to.

multicast ip required setting 0.0.0.0 as a bind address to work. Otherwise, the interface would filter out packets targeting the multicast ip address.

I think it would be difficult to get this working as I would have to investigate more about this, and no one requested it... Any thoughts?

panic!("Multicast for systemd fd sockets is not supported")
}
};
for group_addr in config.multicast_groups {
socket
.join_multicast_v4(group_addr, *listen_addr.ip())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I use listen_addr.ip() here o just hardcode Ipv4Addr::UNSPECIFIED? If 0.0.0.0 is not set, the interface would filter out the packets targeting the multicast ip

see the method documentation here: https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v4

and more info about ip multicast here: https://stackoverflow.com/questions/10692956/what-does-it-mean-to-bind-a-multicast-udp-socket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, if either listen_addr.ip() or Ipv4Addr::UNSPECIFIED is used here, this would only if the source's address in config is 0.0.0.0 as setting it to 127.0.0.1 (for example) would filter out any packet going to the multicast addres

for example, this config:

[sources.multicast_udp]
type = "socket"
mode = "udp"
address = "127.0.0.1:4242"
multicast_groups = ["224.0.0.2"]


[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json"

and this command echo hello | nc 224.0.0.2 4242 -u

wont work because the socket address is no 0..0.0.0.

If we hardcode this to Ipv4Addr::UNSPECIFIED, the behaviour would be the same; that config won't work.

Is this clear enough?

.map_err(|error| {
emit!(SocketBindError {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this considered a SocketBindError? or should we create a new error for this case?

mode: SocketMode::Udp,
error,
})
})?;
info!(message = "Joined multicast group.", group = %group_addr);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To give a bit of feedback about group joining

}
}

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
Expand Down
4 changes: 4 additions & 0 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ pub fn next_addr() -> SocketAddr {
next_addr_for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST))
}

pub fn next_addr_any() -> SocketAddr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this method due to https://github.com/vectordotdev/vector/pull/22099/files#r1921563985

I needed an address that pointed to 0.0.0.0 instead of 127.0.0.1 as next_addr() does.

I thought about changing the next_addr implementation to return Ipv4Addr::UNSPECIFIED instead of Ipv4Addr::LOCALHOST, but a bunch of other test failed and maybe it is not worth to change that much.

next_addr_for_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}

pub fn next_addr_v6() -> SocketAddr {
next_addr_for_ip(IpAddr::V6(Ipv6Addr::LOCALHOST))
}
Expand Down
21 changes: 21 additions & 0 deletions website/cue/reference/components/sources/base/socket.cue
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,27 @@ base: components: sources: socket: configuration: {
unix_stream: "Listen on a Unix domain socket (UDS), in stream mode."
}
}
multicast_groups: {
description: """
List of IPv4 multicast groups to join on socket's binding process.

In order to read multicast packets, this source's listening address should be set to `0.0.0.0`.
If any other address is used (such as `127.0.0.1` or an specific interface address), the
listening interface will filter out all multicast packets received,
as their target IP would be the one of the multicast group
and it will not match the socket's bound IP.

Note that this setting will only work if the source's address
is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported
with multicast groups).
"""
relevant_when: "mode = \"udp\""
required: false
type: array: {
default: []
items: type: string: examples: ["['224.0.0.2', '224.0.0.4']"]
}
}
path: {
description: """
The Unix socket path.
Expand Down