-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sources): multicast udp socket support #22099
base: master
Are you sure you want to change the base?
Changes from 1 commit
a8563a3
ba9015f
1ab728f
b1e6c92
e63e2bf
0e32685
adad168
72131fb
12bf074
3018327
588bb09
c140d42
58223d5
6b96901
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -2,7 +2,7 @@ use std::{ | |||
cell::RefCell, | ||||
collections::{BTreeMap, BTreeSet, HashMap, HashSet}, | ||||
hash::Hash, | ||||
net::SocketAddr, | ||||
net::{Ipv4Addr, SocketAddr}, | ||||
num::{ | ||||
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64, | ||||
NonZeroU8, NonZeroUsize, | ||||
|
@@ -402,6 +402,31 @@ impl ToValue for SocketAddr { | |||
} | ||||
} | ||||
|
||||
impl Configurable for Ipv4Addr { | ||||
fn referenceable_name() -> Option<&'static str> { | ||||
Some("stdlib::Ipv4Addr") | ||||
} | ||||
|
||||
fn metadata() -> Metadata { | ||||
let mut metadata = Metadata::default(); | ||||
metadata.set_description("An IPv4 address."); | ||||
metadata | ||||
} | ||||
|
||||
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> { | ||||
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the same TODOs as in vector/lib/vector-config/src/stdlib.rs Line 391 in e63e2bf
|
||||
// but we eventually should have validation since the format for the possible permutations | ||||
// is well-known and can be easily codified. | ||||
Ok(generate_string_schema()) | ||||
} | ||||
} | ||||
|
||||
impl ToValue for Ipv4Addr { | ||||
fn to_value(&self) -> Value { | ||||
Value::String(self.to_string()) | ||||
} | ||||
} | ||||
|
||||
impl Configurable for PathBuf { | ||||
fn referenceable_name() -> Option<&'static str> { | ||||
Some("stdlib::PathBuf") | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
use std::net::{Ipv4Addr, SocketAddr}; | ||
|
||
use super::default_host_key; | ||
use bytes::BytesMut; | ||
use chrono::Utc; | ||
|
@@ -41,6 +43,17 @@ pub struct UdpConfig { | |
#[configurable(derived)] | ||
address: SocketListenAddr, | ||
|
||
/// TODO: document this. | ||
/// TODO: The join multicast method should fail if the address is not SocketListenAddr::SocketAddr. | ||
/// multicast wont work with systemd{N} fd sockets. | ||
/// Also, if the address is IPv4, the multicast address should be IPv4 too. | ||
/// TODO: should we support a list of groups or a single group? The `join_multicast` method supports | ||
/// just one group per call. | ||
/// TODO: document that we use `IPv4Addr` and not `SocketAddr` for the multicast groups because | ||
/// the `join_multicast_v6` is not supported due to the need of using an interface index.s | ||
#[serde(default)] | ||
multicast_groups: Vec<Ipv4Addr>, | ||
|
||
/// The maximum buffer size of incoming messages. | ||
/// | ||
/// Messages larger than this are truncated. | ||
|
@@ -118,6 +131,7 @@ impl UdpConfig { | |
pub fn from_address(address: SocketListenAddr) -> Self { | ||
Self { | ||
address, | ||
multicast_groups: Vec::new(), | ||
max_length: default_max_length(), | ||
host_key: None, | ||
port_key: default_port_key(), | ||
|
@@ -152,6 +166,35 @@ pub(super) fn udp( | |
}) | ||
})?; | ||
|
||
if !config.multicast_groups.is_empty() { | ||
let listen_addr = match config.address() { | ||
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr, | ||
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => { | ||
todo!("handle this error, IPv6 multicast is not supported") | ||
} | ||
// TODO: if we need to support systemd{N} fd sockets, we should use the | ||
// `UdpSocket::local_addr` method to get the address of the socket. | ||
// that method can fail and I wonder if the user sets `IP_ADDR_ANY` (`0.0.0.0`) in the config, | ||
// the `UdpSocket::local_addr` would return the real interface address that the | ||
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address | ||
// for the multicast group join that the user has set in the config. | ||
// if systemd{N} fd sockets are required to work too, we should investigate on this. | ||
SocketListenAddr::SystemdFd(_) => todo!("handle this error"), | ||
}; | ||
for group_addr in config.multicast_groups { | ||
socket | ||
.join_multicast_v4(group_addr, *listen_addr.ip()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should I use see the method documentation here: https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v4 and more info about ip multicast here: https://stackoverflow.com/questions/10692956/what-does-it-mean-to-bind-a-multicast-udp-socket There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anyway, if either for example, this config: [sources.multicast_udp]
type = "socket"
mode = "udp"
address = "127.0.0.1:4242"
multicast_groups = ["224.0.0.2"]
[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json" and this command wont work because the socket address is no If we hardcode this to Is this clear enough? |
||
.map_err(|error| { | ||
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case? | ||
emit!(SocketBindError { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this considered a |
||
mode: SocketMode::Udp, | ||
error, | ||
}) | ||
})?; | ||
// TODO: add debug (or info) logs here to inform the user that the socket has joined the multicast group. | ||
} | ||
} | ||
|
||
if let Some(receive_buffer_bytes) = config.receive_buffer_bytes { | ||
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) { | ||
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inspired from the
SocketAddr
implementation a few lines above