-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sources): multicast udp socket support #22099
base: master
Are you sure you want to change the base?
feat(sources): multicast udp socket support #22099
Conversation
@@ -402,6 +402,31 @@ impl ToValue for SocketAddr { | |||
} | |||
} | |||
|
|||
impl Configurable for Ipv4Addr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inspired from the SocketAddr
implementation a few lines above
This looks awesome🤯 |
Hi @nomalord perhaps you could test this? |
I don't really have experience with rust, but I'll try to compile the feature branch and update you🥹 |
Maybe this is helpful to you:
|
Thank you for popping in @pront 😃. I have left to include some tests to validate this PR and also add documentation & changelog. I will update this as soon as I can and mark the pr as non-draft then |
} | ||
|
||
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> { | ||
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the same TODOs as in
vector/lib/vector-config/src/stdlib.rs
Line 391 in e63e2bf
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> { |
send_lines_udp_from(next_addr(), addr, lines) | ||
} | ||
|
||
fn send_lines_udp_from( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored this a little bit, because the socket from where we will send the packets must be on the same interface as where the source is listening. And, with multicast groups, 127.0.0.1
seems to not work propertly and we have to use both 0.0.0.0
for the source's listening socket and for the socket that will send the packets. Hope I'm explaining this well..
.join_multicast_v4(group_addr, *listen_addr.ip()) | ||
.map_err(|error| { | ||
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case? | ||
emit!(SocketBindError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this considered a SocketBindError
? or should we create a new error for this case?
// We could support Ipv6 multicast with the | ||
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method | ||
// and specifying the interface index as `0`, in order to bind all interfaces. | ||
panic!("IPv6 multicast is not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about adding ipv6 support too... But as no one requested it yet, I thought i wouldn't be worth to implement & test it in this PR. Ayn thoughts about this? Should I include ipv6 support in this PR?
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address | ||
// for the multicast group join that the user has set in the config. | ||
// if systemd{N} fd sockets are required to work too, we should investigate on this. | ||
SocketListenAddr::SystemdFd(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use the UdpSocket::local_addr
with this socket, but I wonder if this would ever output IPADDR_ANY
(0.0.0.0) instead of the real interface address that socket is bound to.
multicast ip required setting 0.0.0.0 as a bind address to work. Otherwise, the interface would filter out packets targeting the multicast ip address.
I think it would be difficult to get this working as I would have to investigate more about this, and no one requested it... Any thoughts?
}; | ||
for group_addr in config.multicast_groups { | ||
socket | ||
.join_multicast_v4(group_addr, *listen_addr.ip()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I use listen_addr.ip()
here o just hardcode Ipv4Addr::UNSPECIFIED
? If 0.0.0.0
is not set, the interface would filter out the packets targeting the multicast ip
see the method documentation here: https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v4
and more info about ip multicast here: https://stackoverflow.com/questions/10692956/what-does-it-mean-to-bind-a-multicast-udp-socket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, if either listen_addr.ip()
or Ipv4Addr::UNSPECIFIED
is used here, this would only if the source's address in config is 0.0.0.0
as setting it to 127.0.0.1
(for example) would filter out any packet going to the multicast addres
for example, this config:
[sources.multicast_udp]
type = "socket"
mode = "udp"
address = "127.0.0.1:4242"
multicast_groups = ["224.0.0.2"]
[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json"
and this command echo hello | nc 224.0.0.2 4242 -u
wont work because the socket address is no 0..0.0.0
.
If we hardcode this to Ipv4Addr::UNSPECIFIED
, the behaviour would be the same; that config won't work.
Is this clear enough?
} | ||
|
||
#[tokio::test] | ||
#[should_panic] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to use #[should_panic]
here as init_udp_with_config
does not return a Result
to know whether the initialization failed or not
vector/src/sources/socket/mod.rs
Line 1018 in 12bf074
.unwrap(); |
error, | ||
}) | ||
})?; | ||
info!(message = "Joined multicast group.", group = %group_addr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To give a bit of feedback about group joining
Closes #5732
This PR is still in draft. I have a few pending
TODOs
and also missing tests to propertly check this. Although, the happy path is working.In order to test this, use this vector config:
and with this command
you can see logs in vector.
I would like to receive some feedback about how the configuration of this setting should look like.
Also, note that IPv6 is not supported. We can work on that, but maybe it is not worth it if no one request that.
@nomalord take a look into this please, It would be great if you can build the binary from this branch and test if it works in your systems.
@dalesample as you were the first requester of this I also ping you, just in case (although this issue was created 4 years ago)