Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): multicast udp socket support #22099

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

jorgehermo9
Copy link
Contributor

@jorgehermo9 jorgehermo9 commented Dec 31, 2024

Closes #5732

This PR is still in draft. I have a few pending TODOs and also missing tests to propertly check this. Although, the happy path is working.

In order to test this, use this vector config:

[sources.multicast_udp]
type = "socket"
mode = "udp"
address = "0.0.0.0:4242"
multicast_groups = ["224.0.0.2"]


[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json"

and with this command

echo "hello" | nc 224.0.0.2 4242 -u

you can see logs in vector.
image

I would like to receive some feedback about how the configuration of this setting should look like.

Also, note that IPv6 is not supported. We can work on that, but maybe it is not worth it if no one request that.

@nomalord take a look into this please, It would be great if you can build the binary from this branch and test if it works in your systems.

@dalesample as you were the first requester of this I also ping you, just in case (although this issue was created 4 years ago)

@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Dec 31, 2024
@@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired from the SocketAddr implementation a few lines above

@nomalord
Copy link

This looks awesome🤯
As it's new years I won't be able to check our usecase until Thursday...
I will say though, that this looks like it answers our usecase exactly🙏🙏🙏🙏🙏🙏

@jorgehermo9 jorgehermo9 changed the title feat: multicast udp socket support feat(sources): multicast udp socket support Jan 2, 2025
@jorgehermo9
Copy link
Contributor Author

Hi @nomalord perhaps you could test this?

@nomalord
Copy link

nomalord commented Jan 9, 2025

I don't really have experience with rust, but I'll try to compile the feature branch and update you🥹

@pront
Copy link
Member

pront commented Jan 9, 2025

I don't really have experience with rust, but I'll try to compile the feature branch and update you🥹

Maybe this is helpful to you:

# cd /path/to/vector/repo
# brew install gh 
gh pr checkout 22099
cargo run -- --config /path/to/your/config.yaml

@jorgehermo9
Copy link
Contributor Author

Thank you for popping in @pront 😃. I have left to include some tests to validate this PR and also add documentation & changelog. I will update this as soon as I can and mark the pr as non-draft then

}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the same TODOs as in

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {

send_lines_udp_from(next_addr(), addr, lines)
}

fn send_lines_udp_from(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored this a little bit, because the socket from where we will send the packets must be on the same interface as where the source is listening. And, with multicast groups, 127.0.0.1 seems to not work propertly and we have to use both 0.0.0.0 for the source's listening socket and for the socket that will send the packets. Hope I'm explaining this well..

.join_multicast_v4(group_addr, *listen_addr.ip())
.map_err(|error| {
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case?
emit!(SocketBindError {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this considered a SocketBindError? or should we create a new error for this case?

// We could support Ipv6 multicast with the
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
// and specifying the interface index as `0`, in order to bind all interfaces.
panic!("IPv6 multicast is not supported")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about adding ipv6 support too... But as no one requested it yet, I thought i wouldn't be worth to implement & test it in this PR. Ayn thoughts about this? Should I include ipv6 support in this PR?

// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address
// for the multicast group join that the user has set in the config.
// if systemd{N} fd sockets are required to work too, we should investigate on this.
SocketListenAddr::SystemdFd(_) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use the UdpSocket::local_addr with this socket, but I wonder if this would ever output IPADDR_ANY (0.0.0.0) instead of the real interface address that socket is bound to.

multicast ip required setting 0.0.0.0 as a bind address to work. Otherwise, the interface would filter out packets targeting the multicast ip address.

I think it would be difficult to get this working as I would have to investigate more about this, and no one requested it... Any thoughts?

};
for group_addr in config.multicast_groups {
socket
.join_multicast_v4(group_addr, *listen_addr.ip())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I use listen_addr.ip() here o just hardcode Ipv4Addr::UNSPECIFIED? If 0.0.0.0 is not set, the interface would filter out the packets targeting the multicast ip

see the method documentation here: https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v4

and more info about ip multicast here: https://stackoverflow.com/questions/10692956/what-does-it-mean-to-bind-a-multicast-udp-socket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, if either listen_addr.ip() or Ipv4Addr::UNSPECIFIED is used here, this would only if the source's address in config is 0.0.0.0 as setting it to 127.0.0.1 (for example) would filter out any packet going to the multicast addres

for example, this config:

[sources.multicast_udp]
type = "socket"
mode = "udp"
address = "127.0.0.1:4242"
multicast_groups = ["224.0.0.2"]


[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json"

and this command echo hello | nc 224.0.0.2 4242 -u

wont work because the socket address is no 0..0.0.0.

If we hardcode this to Ipv4Addr::UNSPECIFIED, the behaviour would be the same; that config won't work.

Is this clear enough?

}

#[tokio::test]
#[should_panic]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to use #[should_panic] here as init_udp_with_config does not return a Result to know whether the initialization failed or not

.unwrap();

error,
})
})?;
info!(message = "Joined multicast group.", group = %group_addr);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To give a bit of feedback about group joining

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sources Anything related to the Vector's sources
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support multicast for UDP
3 participants