-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sources): multicast udp socket support #22099
base: master
Are you sure you want to change the base?
Changes from all commits
a8563a3
ba9015f
1ab728f
b1e6c92
e63e2bf
0e32685
adad168
72131fb
12bf074
3018327
588bb09
c140d42
58223d5
6b96901
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
The `socket` source with `udp` mode now supports joining multicast groups via the `multicast_groups` option | ||
of that source. This allows the source to receive multicast packets from the specified multicast groups. | ||
|
||
Note that in order to work properly, the `socket` address must be set to `0.0.0.0` and not | ||
to `127.0.0.1` (localhost) or any other specific IP address. If other IP address is used, the host's interface | ||
will filter out the multicast packets as the packet target IP (multicast) would not match the host's interface IP. | ||
|
||
authors: jorgehermo9 |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -2,7 +2,7 @@ use std::{ | |||
cell::RefCell, | ||||
collections::{BTreeMap, BTreeSet, HashMap, HashSet}, | ||||
hash::Hash, | ||||
net::SocketAddr, | ||||
net::{Ipv4Addr, SocketAddr}, | ||||
num::{ | ||||
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64, | ||||
NonZeroU8, NonZeroUsize, | ||||
|
@@ -402,6 +402,31 @@ impl ToValue for SocketAddr { | |||
} | ||||
} | ||||
|
||||
impl Configurable for Ipv4Addr { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inspired from the |
||||
fn referenceable_name() -> Option<&'static str> { | ||||
Some("stdlib::Ipv4Addr") | ||||
} | ||||
|
||||
fn metadata() -> Metadata { | ||||
let mut metadata = Metadata::default(); | ||||
metadata.set_description("An IPv4 address."); | ||||
metadata | ||||
} | ||||
|
||||
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> { | ||||
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the same TODOs as in vector/lib/vector-config/src/stdlib.rs Line 391 in e63e2bf
|
||||
// but we eventually should have validation since the format for the possible permutations | ||||
// is well-known and can be easily codified. | ||||
Ok(generate_string_schema()) | ||||
} | ||||
} | ||||
|
||||
impl ToValue for Ipv4Addr { | ||||
fn to_value(&self) -> Value { | ||||
Value::String(self.to_string()) | ||||
} | ||||
} | ||||
|
||||
impl Configurable for PathBuf { | ||||
fn referenceable_name() -> Option<&'static str> { | ||||
Some("stdlib::PathBuf") | ||||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -319,7 +319,7 @@ mod test { | |||
use approx::assert_relative_eq; | ||||
use std::{ | ||||
collections::HashMap, | ||||
net::{SocketAddr, UdpSocket}, | ||||
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, | ||||
sync::{ | ||||
atomic::{AtomicBool, Ordering}, | ||||
Arc, | ||||
|
@@ -374,7 +374,7 @@ mod test { | |||
test_util::{ | ||||
collect_n, collect_n_limited, | ||||
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS}, | ||||
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp, | ||||
next_addr, next_addr_any, random_string, send_lines, send_lines_tls, wait_for_tcp, | ||||
}, | ||||
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig}, | ||||
SourceSender, | ||||
|
@@ -899,12 +899,27 @@ mod test { | |||
|
||||
//////// UDP TESTS //////// | ||||
fn send_lines_udp(addr: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr { | ||||
send_packets_udp(addr, lines.into_iter().map(|line| line.into())) | ||||
send_lines_udp_from(next_addr(), addr, lines) | ||||
} | ||||
|
||||
fn send_lines_udp_from( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored this a little bit, because the socket from where we will send the packets must be on the same interface as where the source is listening. And, with multicast groups, |
||||
from: SocketAddr, | ||||
addr: SocketAddr, | ||||
lines: impl IntoIterator<Item = String>, | ||||
) -> SocketAddr { | ||||
send_packets_udp_from(from, addr, lines.into_iter().map(|line| line.into())) | ||||
} | ||||
|
||||
fn send_packets_udp(addr: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr { | ||||
let bind = next_addr(); | ||||
let socket = UdpSocket::bind(bind) | ||||
send_packets_udp_from(next_addr(), addr, packets) | ||||
} | ||||
|
||||
fn send_packets_udp_from( | ||||
from: SocketAddr, | ||||
addr: SocketAddr, | ||||
packets: impl IntoIterator<Item = Bytes>, | ||||
) -> SocketAddr { | ||||
let socket = UdpSocket::bind(from) | ||||
.map_err(|error| panic!("{:}", error)) | ||||
.ok() | ||||
.unwrap(); | ||||
|
@@ -926,7 +941,7 @@ mod test { | |||
thread::sleep(Duration::from_millis(10)); | ||||
|
||||
// Done | ||||
bind | ||||
from | ||||
} | ||||
|
||||
async fn init_udp_with_shutdown( | ||||
|
@@ -1303,6 +1318,114 @@ mod test { | |||
.await; | ||||
} | ||||
|
||||
#[tokio::test] | ||||
async fn multicast_udp_message() { | ||||
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { | ||||
let (tx, mut rx) = SourceSender::new_test(); | ||||
// The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets | ||||
let socket_address = next_addr_any(); | ||||
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap(); | ||||
let multicast_socket_address = | ||||
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port()); | ||||
let mut config = UdpConfig::from_address(socket_address.into()); | ||||
config.multicast_groups = vec![multicast_ip_address]; | ||||
init_udp_with_config(tx, config).await; | ||||
|
||||
// We must send packets to the same interface the `socket_address` is bound to | ||||
// in order to receive the multicast packets this `from` socket sends | ||||
// To do so, we use the `IPADDR_ANY` address | ||||
let from = next_addr_any(); | ||||
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]); | ||||
|
||||
let event = rx.next().await.expect("must receive an event"); | ||||
assert_eq!( | ||||
event.as_log()[log_schema().message_key().unwrap().to_string()], | ||||
"test".into() | ||||
); | ||||
}) | ||||
.await; | ||||
} | ||||
|
||||
#[tokio::test] | ||||
async fn multiple_multicast_addresses_udp_message() { | ||||
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { | ||||
let (tx, mut rx) = SourceSender::new_test(); | ||||
let socket_address = next_addr_any(); | ||||
let multicast_ip_addresses = (2..12) | ||||
.map(|i| format!("224.0.0.{i}").parse().unwrap()) | ||||
.collect::<Vec<Ipv4Addr>>(); | ||||
let multicast_ip_socket_addresses = multicast_ip_addresses | ||||
.iter() | ||||
.map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port())) | ||||
.collect::<Vec<SocketAddr>>(); | ||||
let mut config = UdpConfig::from_address(socket_address.into()); | ||||
config.multicast_groups = multicast_ip_addresses; | ||||
init_udp_with_config(tx, config).await; | ||||
|
||||
let from = next_addr_any(); | ||||
for multicast_ip_socket_address in multicast_ip_socket_addresses { | ||||
send_lines_udp_from( | ||||
from, | ||||
multicast_ip_socket_address, | ||||
[multicast_ip_socket_address.to_string()], | ||||
); | ||||
|
||||
let event = rx.next().await.expect("must receive an event"); | ||||
assert_eq!( | ||||
event.as_log()[log_schema().message_key().unwrap().to_string()], | ||||
multicast_ip_socket_address.to_string().into() | ||||
); | ||||
} | ||||
}) | ||||
.await; | ||||
} | ||||
|
||||
#[tokio::test] | ||||
async fn multicast_and_unicast_udp_message() { | ||||
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { | ||||
let (tx, mut rx) = SourceSender::new_test(); | ||||
let socket_address = next_addr_any(); | ||||
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap(); | ||||
let multicast_socket_address = | ||||
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port()); | ||||
let mut config = UdpConfig::from_address(socket_address.into()); | ||||
config.multicast_groups = vec![multicast_ip_address]; | ||||
init_udp_with_config(tx, config).await; | ||||
|
||||
let from = next_addr_any(); | ||||
// Send packet to multicast address | ||||
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]); | ||||
let event = rx.next().await.expect("must receive an event"); | ||||
assert_eq!( | ||||
event.as_log()[log_schema().message_key().unwrap().to_string()], | ||||
"test".into() | ||||
); | ||||
|
||||
// Send packet to unicast address | ||||
send_lines_udp_from(from, socket_address, ["test".to_string()]); | ||||
let event = rx.next().await.expect("must receive an event"); | ||||
assert_eq!( | ||||
event.as_log()[log_schema().message_key().unwrap().to_string()], | ||||
"test".into() | ||||
); | ||||
}) | ||||
.await; | ||||
} | ||||
|
||||
#[tokio::test] | ||||
#[should_panic] | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to use vector/src/sources/socket/mod.rs Line 1018 in 12bf074
|
||||
async fn udp_invalid_multicast_group() { | ||||
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { | ||||
let (tx, _rx) = SourceSender::new_test(); | ||||
let socket_address = next_addr_any(); | ||||
let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap(); | ||||
let mut config = UdpConfig::from_address(socket_address.into()); | ||||
config.multicast_groups = vec![invalid_multicast_ip_address]; | ||||
init_udp_with_config(tx, config).await; | ||||
}) | ||||
.await; | ||||
} | ||||
|
||||
////////////// UNIX TEST LIBS ////////////// | ||||
|
||||
#[cfg(unix)] | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
use std::net::{Ipv4Addr, SocketAddr}; | ||
|
||
use super::default_host_key; | ||
use bytes::BytesMut; | ||
use chrono::Utc; | ||
|
@@ -41,6 +43,21 @@ pub struct UdpConfig { | |
#[configurable(derived)] | ||
address: SocketListenAddr, | ||
|
||
/// List of IPv4 multicast groups to join on socket's binding process. | ||
/// | ||
/// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`. | ||
/// If any other address is used (such as `127.0.0.1` or an specific interface address), the | ||
/// listening interface will filter out all multicast packets received, | ||
/// as their target IP would be the one of the multicast group | ||
/// and it will not match the socket's bound IP. | ||
/// | ||
/// Note that this setting will only work if the source's address | ||
/// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported | ||
/// with multicast groups). | ||
#[serde(default)] | ||
#[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))] | ||
pub(super) multicast_groups: Vec<Ipv4Addr>, | ||
|
||
/// The maximum buffer size of incoming messages. | ||
/// | ||
/// Messages larger than this are truncated. | ||
|
@@ -118,6 +135,7 @@ impl UdpConfig { | |
pub fn from_address(address: SocketListenAddr) -> Self { | ||
Self { | ||
address, | ||
multicast_groups: Vec::new(), | ||
max_length: default_max_length(), | ||
host_key: None, | ||
port_key: default_port_key(), | ||
|
@@ -152,6 +170,33 @@ pub(super) fn udp( | |
}) | ||
})?; | ||
|
||
if !config.multicast_groups.is_empty() { | ||
socket.set_multicast_loop_v4(true).unwrap(); | ||
let listen_addr = match config.address() { | ||
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr, | ||
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => { | ||
// We could support Ipv6 multicast with the | ||
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method | ||
// and specifying the interface index as `0`, in order to bind all interfaces. | ||
panic!("IPv6 multicast is not supported") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about adding ipv6 support too... But as no one requested it yet, I thought i wouldn't be worth to implement & test it in this PR. Ayn thoughts about this? Should I include ipv6 support in this PR? |
||
} | ||
SocketListenAddr::SystemdFd(_) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could use the multicast ip required setting 0.0.0.0 as a bind address to work. Otherwise, the interface would filter out packets targeting the multicast ip address. I think it would be difficult to get this working as I would have to investigate more about this, and no one requested it... Any thoughts? |
||
panic!("Multicast for systemd fd sockets is not supported") | ||
} | ||
}; | ||
for group_addr in config.multicast_groups { | ||
socket | ||
.join_multicast_v4(group_addr, *listen_addr.ip()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should I use see the method documentation here: https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v4 and more info about ip multicast here: https://stackoverflow.com/questions/10692956/what-does-it-mean-to-bind-a-multicast-udp-socket There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anyway, if either for example, this config: [sources.multicast_udp]
type = "socket"
mode = "udp"
address = "127.0.0.1:4242"
multicast_groups = ["224.0.0.2"]
[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json" and this command wont work because the socket address is no If we hardcode this to Is this clear enough? |
||
.map_err(|error| { | ||
emit!(SocketBindError { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this considered a |
||
mode: SocketMode::Udp, | ||
error, | ||
}) | ||
})?; | ||
info!(message = "Joined multicast group.", group = %group_addr); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To give a bit of feedback about group joining |
||
} | ||
} | ||
|
||
if let Some(receive_buffer_bytes) = config.receive_buffer_bytes { | ||
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) { | ||
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,6 +120,10 @@ pub fn next_addr() -> SocketAddr { | |
next_addr_for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)) | ||
} | ||
|
||
pub fn next_addr_any() -> SocketAddr { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to add this method due to https://github.com/vectordotdev/vector/pull/22099/files#r1921563985 I needed an address that pointed to I thought about changing the |
||
next_addr_for_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) | ||
} | ||
|
||
pub fn next_addr_v6() -> SocketAddr { | ||
next_addr_for_ip(IpAddr::V6(Ipv6Addr::LOCALHOST)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this notice needed if it was added too to website docs?