From 7c6d0c942831441c4aa84ecedfb19c627d3f426d Mon Sep 17 00:00:00 2001 From: jpovixwm <53523617+jpovixwm@users.noreply.github.com> Date: Thu, 19 Dec 2024 00:38:34 +0100 Subject: [PATCH] enhancement(socket sink): support unix datagram mode (#21762) * enhancement(socket sink): support unix datagram mode * 5269_support_unix_datagram_mode_in_socket_sink.enhancement.md: fix linter error * sinks/util/{udp,unix}.rs: abstract out common logic into sinks/util/datagram.rs * sinks/util/service/net/unix: use sinks/util/unix/UnixEither and move impls there * remove problematic feature gates for 'sinks-socket' and 'sinks-statsd' * use std type and spawn blocking * basic_unix_datagram_sink: attempt to reduce flakiness * socket sink: ignore unix_mode on macOS --------- Co-authored-by: Pavlos Rontidis --- ...atagram_mode_in_socket_sink.enhancement.md | 8 + src/internal_events/unix.rs | 2 - src/sinks/socket.rs | 74 +++++- src/sinks/util/datagram.rs | 106 ++++++++ src/sinks/util/mod.rs | 3 +- src/sinks/util/service/net/mod.rs | 4 +- src/sinks/util/service/net/unix.rs | 36 +-- src/sinks/util/udp.rs | 71 ++---- src/sinks/util/unix.rs | 241 ++++++++++++++++-- .../components/sinks/base/socket.cue | 16 ++ 10 files changed, 426 insertions(+), 135 deletions(-) create mode 100644 changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md create mode 100644 src/sinks/util/datagram.rs diff --git a/changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md b/changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md new file mode 100644 index 0000000000000..13e41be657c1d --- /dev/null +++ b/changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md @@ -0,0 +1,8 @@ +The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values: + +- `Stream` (default) - Stream-oriented (`SOCK_STREAM`) +- `Datagram` - Datagram-oriented (`SOCK_DGRAM`) + +This option only applies when `mode = "unix"`, and is unavailable on macOS, where `SOCK_STREAM` is always used for Unix sockets. + +authors: jpovixwm diff --git a/src/internal_events/unix.rs b/src/internal_events/unix.rs index 3b0e56db8fcf6..34fb63b80f5f5 100644 --- a/src/internal_events/unix.rs +++ b/src/internal_events/unix.rs @@ -65,14 +65,12 @@ impl InternalEvent for UnixSocketError<'_, E> { } } -#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] #[derive(Debug)] pub struct UnixSocketSendError<'a, E> { pub(crate) error: &'a E, pub path: &'a std::path::Path, } -#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] impl InternalEvent for UnixSocketSendError<'_, E> { fn emit(self) { let reason = "Unix socket send error."; diff --git a/src/sinks/socket.rs b/src/sinks/socket.rs index d84ad27e4294a..0fba354fb2999 100644 --- a/src/sinks/socket.rs +++ b/src/sinks/socket.rs @@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig { #[cfg(test)] mod test { - #[cfg(unix)] - use std::path::PathBuf; use std::{ future::ready, net::{SocketAddr, UdpSocket}, }; + #[cfg(unix)] + use std::{os::unix::net::UnixDatagram, path::PathBuf}; use futures::stream::StreamExt; use futures_util::stream; @@ -196,14 +196,42 @@ mod test { crate::test_util::test_generate_config::(); } - async fn test_udp(addr: SocketAddr) { - let receiver = UdpSocket::bind(addr).unwrap(); + enum DatagramSocket { + Udp(UdpSocket), + #[cfg(unix)] + Unix(UnixDatagram), + } + + enum DatagramSocketAddr { + Udp(SocketAddr), + #[cfg(unix)] + Unix(PathBuf), + } + + async fn test_datagram(datagram_addr: DatagramSocketAddr) { + let receiver = match &datagram_addr { + DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()), + #[cfg(unix)] + DatagramSocketAddr::Unix(path) => { + DatagramSocket::Unix(UnixDatagram::bind(path).unwrap()) + } + }; let config = SocketSinkConfig { - mode: Mode::Udp(UdpMode { - config: UdpSinkConfig::from_address(addr.to_string()), - encoding: JsonSerializerConfig::default().into(), - }), + mode: match &datagram_addr { + DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode { + config: UdpSinkConfig::from_address(addr.to_string()), + encoding: JsonSerializerConfig::default().into(), + }), + #[cfg(unix)] + DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode { + config: UnixSinkConfig::new( + path.to_path_buf(), + crate::sinks::util::service::net::UnixMode::Datagram, + ), + encoding: (None::, JsonSerializerConfig::default()).into(), + }), + }, acknowledgements: Default::default(), }; @@ -218,9 +246,13 @@ mod test { .expect("Running sink failed"); let mut buf = [0; 256]; - let (size, _src_addr) = receiver - .recv_from(&mut buf) - .expect("Did not receive message"); + let size = match &receiver { + DatagramSocket::Udp(sock) => { + sock.recv_from(&mut buf).expect("Did not receive message").0 + } + #[cfg(unix)] + DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"), + }; let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received"); let data = serde_json::from_str::(&packet).expect("Invalid JSON received"); @@ -234,14 +266,25 @@ mod test { async fn udp_ipv4() { trace_init(); - test_udp(next_addr()).await; + test_datagram(DatagramSocketAddr::Udp(next_addr())).await; } #[tokio::test] async fn udp_ipv6() { trace_init(); - test_udp(next_addr_v6()).await; + test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await; + } + + #[cfg(unix)] + #[tokio::test] + async fn unix_datagram() { + trace_init(); + + test_datagram(DatagramSocketAddr::Unix(temp_uds_path( + "unix_datagram_socket_test", + ))) + .await; } #[tokio::test] @@ -292,7 +335,10 @@ mod test { let config = SocketSinkConfig { mode: Mode::Unix(UnixMode { - config: UnixSinkConfig::new(out_path), + config: UnixSinkConfig::new( + out_path, + crate::sinks::util::service::net::UnixMode::Stream, + ), encoding: (None::, NativeJsonSerializerConfig).into(), }), acknowledgements: Default::default(), diff --git a/src/sinks/util/datagram.rs b/src/sinks/util/datagram.rs new file mode 100644 index 0000000000000..c9baf175855e8 --- /dev/null +++ b/src/sinks/util/datagram.rs @@ -0,0 +1,106 @@ +#[cfg(unix)] +use std::path::PathBuf; + +use bytes::BytesMut; +use futures::{stream::BoxStream, StreamExt}; +use futures_util::stream::Peekable; +use tokio::net::UdpSocket; +#[cfg(unix)] +use tokio::net::UnixDatagram; +use tokio_util::codec::Encoder; +use vector_lib::internal_event::RegisterInternalEvent; +use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle}; +use vector_lib::EstimatedJsonEncodedSizeOf; + +use crate::{ + codecs::Transformer, + event::{Event, EventStatus, Finalizable}, + internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError}, +}; + +#[cfg(unix)] +use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError}; + +pub enum DatagramSocket { + Udp(UdpSocket), + #[cfg(unix)] + Unix(UnixDatagram, PathBuf), +} + +pub async fn send_datagrams>( + input: &mut Peekable>, + mut socket: DatagramSocket, + transformer: &Transformer, + encoder: &mut E, + bytes_sent: &::Handle, +) { + while let Some(mut event) = input.next().await { + let byte_size = event.estimated_json_encoded_size_of(); + + transformer.transform(&mut event); + + let finalizers = event.take_finalizers(); + let mut bytes = BytesMut::new(); + + // Errors are handled by `Encoder`. + if encoder.encode(event, &mut bytes).is_err() { + continue; + } + + match send_datagram(&mut socket, &bytes).await { + Ok(()) => { + emit!(SocketEventsSent { + mode: match socket { + DatagramSocket::Udp(_) => SocketMode::Udp, + #[cfg(unix)] + DatagramSocket::Unix(..) => SocketMode::Unix, + }, + count: 1, + byte_size, + }); + + bytes_sent.emit(ByteSize(bytes.len())); + finalizers.update_status(EventStatus::Delivered); + } + Err(error) => { + match socket { + DatagramSocket::Udp(_) => emit!(SocketSendError { + mode: SocketMode::Udp, + error + }), + #[cfg(unix)] + DatagramSocket::Unix(_, path) => { + emit!(UnixSocketSendError { + path: path.as_path(), + error: &error + }) + } + }; + finalizers.update_status(EventStatus::Errored); + return; + } + } + } +} + +async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> { + let sent = match socket { + DatagramSocket::Udp(udp) => udp.send(buf).await, + #[cfg(unix)] + DatagramSocket::Unix(uds, _) => uds.send(buf).await, + }?; + if sent != buf.len() { + match socket { + DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError { + data_size: buf.len(), + sent, + }), + #[cfg(unix)] + DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError { + data_size: buf.len(), + sent, + }), + } + } + Ok(()) +} diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index c71b49163f0d2..990095ed3452f 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -6,6 +6,7 @@ pub mod batch; pub mod buffer; pub mod builder; pub mod compressor; +pub mod datagram; pub mod encoding; pub mod http; pub mod metadata; @@ -23,7 +24,7 @@ pub mod tcp; #[cfg(any(test, feature = "test-utils"))] pub mod test; pub mod udp; -#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))] +#[cfg(unix)] pub mod unix; pub mod uri; pub mod zstd; diff --git a/src/sinks/util/service/net/mod.rs b/src/sinks/util/service/net/mod.rs index 3ad532f747994..aa5f6451a4a69 100644 --- a/src/sinks/util/service/net/mod.rs +++ b/src/sinks/util/service/net/mod.rs @@ -12,7 +12,7 @@ use std::{ }; #[cfg(unix)] -use std::path::PathBuf; +use {crate::sinks::util::unix::UnixEither, std::path::PathBuf}; use crate::{ internal_events::{ @@ -33,7 +33,7 @@ pub use self::unix::{UnixConnectorConfig, UnixMode}; use self::tcp::TcpConnector; use self::udp::UdpConnector; #[cfg(unix)] -use self::unix::{UnixConnector, UnixEither}; +use self::unix::UnixConnector; use futures_util::{future::BoxFuture, FutureExt}; use snafu::{ResultExt, Snafu}; diff --git a/src/sinks/util/service/net/unix.rs b/src/sinks/util/service/net/unix.rs index f0655015f06fe..0cae976b04630 100644 --- a/src/sinks/util/service/net/unix.rs +++ b/src/sinks/util/service/net/unix.rs @@ -1,18 +1,11 @@ -use std::{ - io, - os::fd::{AsFd, BorrowedFd}, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; use snafu::ResultExt; -use tokio::{ - io::AsyncWriteExt, - net::{UnixDatagram, UnixStream}, -}; +use tokio::net::{UnixDatagram, UnixStream}; use vector_lib::configurable::configurable_component; -use crate::net; +use crate::{net, sinks::util::unix::UnixEither}; use super::{net_error::*, ConnectorType, NetError, NetworkConnector}; @@ -74,29 +67,6 @@ impl UnixConnectorConfig { } } -pub(super) enum UnixEither { - Datagram(UnixDatagram), - Stream(UnixStream), -} - -impl UnixEither { - pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result { - match self { - Self::Datagram(datagram) => datagram.send(buf).await, - Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()), - } - } -} - -impl AsFd for UnixEither { - fn as_fd(&self) -> BorrowedFd<'_> { - match self { - Self::Datagram(datagram) => datagram.as_fd(), - Self::Stream(stream) => stream.as_fd(), - } - } -} - #[derive(Clone)] pub(super) struct UnixConnector { path: PathBuf, diff --git a/src/sinks/util/udp.rs b/src/sinks/util/udp.rs index 96b0c0393e37c..e681e7fd0dc66 100644 --- a/src/sinks/util/udp.rs +++ b/src/sinks/util/udp.rs @@ -5,24 +5,22 @@ use std::{ }; use async_trait::async_trait; -use bytes::BytesMut; use futures::{stream::BoxStream, FutureExt, StreamExt}; use snafu::{ResultExt, Snafu}; use tokio::{net::UdpSocket, time::sleep}; use tokio_util::codec::Encoder; use vector_lib::configurable::configurable_component; -use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle, Protocol, Registered}; -use vector_lib::EstimatedJsonEncodedSizeOf; +use vector_lib::internal_event::{BytesSent, Protocol, Registered}; -use super::SinkBuildError; +use super::{ + datagram::{send_datagrams, DatagramSocket}, + SinkBuildError, +}; use crate::{ codecs::Transformer, dns, - event::{Event, EventStatus, Finalizable}, - internal_events::{ - SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError, - UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError, - }, + event::Event, + internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError}, net, sinks::{ util::{retries::ExponentialBackoff, StreamSink}, @@ -198,58 +196,21 @@ where let mut encoder = self.encoder.clone(); while Pin::new(&mut input).peek().await.is_some() { - let mut socket = self.connector.connect_backoff().await; - while let Some(mut event) = input.next().await { - let byte_size = event.estimated_json_encoded_size_of(); - - self.transformer.transform(&mut event); - - let finalizers = event.take_finalizers(); - let mut bytes = BytesMut::new(); - - // Errors are handled by `Encoder`. - if encoder.encode(event, &mut bytes).is_err() { - continue; - } - - match udp_send(&mut socket, &bytes).await { - Ok(()) => { - emit!(SocketEventsSent { - mode: SocketMode::Udp, - count: 1, - byte_size, - }); - - self.bytes_sent.emit(ByteSize(bytes.len())); - finalizers.update_status(EventStatus::Delivered); - } - Err(error) => { - emit!(SocketSendError { - mode: SocketMode::Udp, - error - }); - finalizers.update_status(EventStatus::Errored); - break; - } - } - } + let socket = self.connector.connect_backoff().await; + send_datagrams( + &mut input, + DatagramSocket::Udp(socket), + &self.transformer, + &mut encoder, + &self.bytes_sent, + ) + .await; } Ok(()) } } -async fn udp_send(socket: &mut UdpSocket, buf: &[u8]) -> tokio::io::Result<()> { - let sent = socket.send(buf).await?; - if sent != buf.len() { - emit!(UdpSendIncompleteError { - data_size: buf.len(), - sent, - }); - } - Ok(()) -} - pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr { match remote_addr { SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index b12434c347cd6..9afde6d053ad3 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -1,13 +1,26 @@ -use std::{path::PathBuf, pin::Pin, time::Duration}; +use std::{ + io, + os::fd::{AsFd, BorrowedFd}, + path::PathBuf, + pin::Pin, + time::Duration, +}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use futures::{stream::BoxStream, SinkExt, StreamExt}; use snafu::{ResultExt, Snafu}; -use tokio::{net::UnixStream, time::sleep}; +use tokio::{ + io::AsyncWriteExt, + net::{UnixDatagram, UnixStream}, + time::sleep, +}; use tokio_util::codec::Encoder; -use vector_lib::configurable::configurable_component; use vector_lib::json_size::JsonSize; +use vector_lib::{ + configurable::configurable_component, + internal_event::{BytesSent, Protocol}, +}; use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ @@ -21,6 +34,7 @@ use crate::{ sinks::{ util::{ retries::ExponentialBackoff, + service::net::UnixMode, socket_bytes_sink::{BytesSink, ShutdownCheck}, EncodedEvent, StreamSink, }, @@ -28,6 +42,8 @@ use crate::{ }, }; +use super::datagram::{send_datagrams, DatagramSocket}; + #[derive(Debug, Snafu)] pub enum UnixError { #[snafu(display("Failed connecting to socket at path {}: {}", path.display(), source))] @@ -35,6 +51,9 @@ pub enum UnixError { source: tokio::io::Error, path: PathBuf, }, + + #[snafu(display("Failed to bind socket: {}.", source))] + FailedToBind { source: std::io::Error }, } /// A Unix Domain Socket sink. @@ -46,11 +65,22 @@ pub struct UnixSinkConfig { /// This should be an absolute path. #[configurable(metadata(docs::examples = "/path/to/socket"))] pub path: PathBuf, + + /// The Unix socket mode to use. + /// + /// Unavailable on macOS, where the mode is always `Stream`. + #[cfg_attr(target_os = "macos", serde(skip))] + #[serde(default = "default_unix_mode")] + unix_mode: UnixMode, +} + +const fn default_unix_mode() -> UnixMode { + UnixMode::Stream } impl UnixSinkConfig { - pub const fn new(path: PathBuf) -> Self { - Self { path } + pub const fn new(path: PathBuf, unix_mode: UnixMode) -> Self { + Self { path, unix_mode } } pub fn build( @@ -62,7 +92,7 @@ impl UnixSinkConfig { + Sync + 'static, ) -> crate::Result<(VectorSink, Healthcheck)> { - let connector = UnixConnector::new(self.path.clone()); + let connector = UnixConnector::new(self.path.clone(), self.unix_mode); let sink = UnixSink::new(connector.clone(), transformer, encoder); Ok(( VectorSink::from_event_streamsink(sink), @@ -71,14 +101,38 @@ impl UnixSinkConfig { } } +pub enum UnixEither { + Datagram(UnixDatagram), + Stream(UnixStream), +} + +impl UnixEither { + pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result { + match self { + Self::Datagram(datagram) => datagram.send(buf).await, + Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()), + } + } +} + +impl AsFd for UnixEither { + fn as_fd(&self) -> BorrowedFd<'_> { + match self { + Self::Datagram(datagram) => datagram.as_fd(), + Self::Stream(stream) => stream.as_fd(), + } + } +} + #[derive(Debug, Clone)] struct UnixConnector { pub path: PathBuf, + mode: UnixMode, } impl UnixConnector { - const fn new(path: PathBuf) -> Self { - Self { path } + const fn new(path: PathBuf, mode: UnixMode) -> Self { + Self { path, mode } } const fn fresh_backoff() -> ExponentialBackoff { @@ -88,15 +142,30 @@ impl UnixConnector { .max_delay(Duration::from_secs(60)) } - async fn connect(&self) -> Result { - UnixStream::connect(&self.path) - .await - .context(ConnectionSnafu { - path: self.path.clone(), - }) + async fn connect(&self) -> Result { + match self.mode { + UnixMode::Stream => UnixStream::connect(&self.path) + .await + .context(ConnectionSnafu { + path: self.path.clone(), + }) + .map(UnixEither::Stream), + UnixMode::Datagram => { + UnixDatagram::unbound() + .context(FailedToBindSnafu) + .and_then(|datagram| { + datagram + .connect(&self.path) + .context(ConnectionSnafu { + path: self.path.clone(), + }) + .map(|_| UnixEither::Datagram(datagram)) + }) + } + } } - async fn connect_backoff(&self) -> UnixStream { + async fn connect_backoff(&self) -> UnixEither { let mut backoff = Self::fresh_backoff(); loop { match self.connect().await { @@ -139,18 +208,22 @@ where } async fn connect(&mut self) -> BytesSink { - let stream = self.connector.connect_backoff().await; + let stream = match self.connector.connect_backoff().await { + UnixEither::Stream(stream) => stream, + UnixEither::Datagram(_) => unreachable!("connect is only called with Stream mode"), + }; BytesSink::new(stream, |_| ShutdownCheck::Alive, SocketMode::Unix) } -} -#[async_trait] -impl StreamSink for UnixSink -where - E: Encoder + Clone + Send + Sync, -{ + async fn run_internal(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + match self.connector.mode { + UnixMode::Stream => self.run_stream(input).await, + UnixMode::Datagram => self.run_datagram(input).await, + } + } + // Same as TcpSink, more details there. - async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + async fn run_stream(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let mut encoder = self.encoder.clone(); let transformer = self.transformer.clone(); let mut input = input @@ -197,12 +270,50 @@ where Ok(()) } + + async fn run_datagram(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let bytes_sent = register!(BytesSent::from(Protocol::UNIX)); + let mut input = input.peekable(); + + let mut encoder = self.encoder.clone(); + while Pin::new(&mut input).peek().await.is_some() { + let socket = match self.connector.connect_backoff().await { + UnixEither::Datagram(datagram) => datagram, + UnixEither::Stream(_) => { + unreachable!("run_datagram is only called with Datagram mode") + } + }; + + send_datagrams( + &mut input, + DatagramSocket::Unix(socket, self.connector.path.clone()), + &self.transformer, + &mut encoder, + &bytes_sent, + ) + .await; + } + + Ok(()) + } +} + +#[async_trait] +impl StreamSink for UnixSink +where + E: Encoder + Clone + Send + Sync, +{ + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_internal(input).await + } } #[cfg(test)] mod tests { use tokio::net::UnixListener; - use vector_lib::codecs::{encoding::Framer, NewlineDelimitedEncoder, TextSerializerConfig}; + use vector_lib::codecs::{ + encoding::Framer, BytesEncoder, NewlineDelimitedEncoder, TextSerializerConfig, + }; use super::*; use crate::{ @@ -219,9 +330,9 @@ mod tests { #[tokio::test] async fn unix_sink_healthcheck() { - let good_path = temp_uds_path("valid_uds"); + let good_path = temp_uds_path("valid_stream_uds"); let _listener = UnixListener::bind(&good_path).unwrap(); - assert!(UnixSinkConfig::new(good_path) + assert!(UnixSinkConfig::new(good_path.clone(), UnixMode::Stream) .build( Default::default(), Encoder::<()>::new(TextSerializerConfig::default().build().into()) @@ -230,9 +341,30 @@ mod tests { .1 .await .is_ok()); + assert!( + UnixSinkConfig::new(good_path.clone(), UnixMode::Datagram) + .build( + Default::default(), + Encoder::<()>::new(TextSerializerConfig::default().build().into()) + ) + .unwrap() + .1 + .await + .is_err(), + "datagram mode should fail when attempting to send into a stream mode UDS" + ); let bad_path = temp_uds_path("no_one_listening"); - assert!(UnixSinkConfig::new(bad_path) + assert!(UnixSinkConfig::new(bad_path.clone(), UnixMode::Stream) + .build( + Default::default(), + Encoder::<()>::new(TextSerializerConfig::default().build().into()) + ) + .unwrap() + .1 + .await + .is_err()); + assert!(UnixSinkConfig::new(bad_path.clone(), UnixMode::Datagram) .build( Default::default(), Encoder::<()>::new(TextSerializerConfig::default().build().into()) @@ -252,7 +384,7 @@ mod tests { let mut receiver = CountReceiver::receive_lines_unix(out_path.clone()); // Set up Sink - let config = UnixSinkConfig::new(out_path); + let config = UnixSinkConfig::new(out_path, UnixMode::Stream); let (sink, _healthcheck) = config .build( Default::default(), @@ -276,4 +408,57 @@ mod tests { // Receive the data sent by the Sink to the receiver assert_eq!(input_lines, receiver.await); } + + #[cfg_attr(target_os = "macos", ignore)] + #[tokio::test] + async fn basic_unix_datagram_sink() { + let num_lines = 1000; + let out_path = temp_uds_path("unix_datagram_test"); + + // Set up listener to receive events from the Sink. + let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap(); + let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); + + // Listen in the background to avoid blocking + let handle = tokio::task::spawn_blocking(move || { + let mut output_lines = Vec::::with_capacity(num_lines); + + ready_tx.send(()).expect("failed to signal readiness"); + for _ in 0..num_lines { + let mut buf = [0; 101]; + let (size, _) = receiver + .recv_from(&mut buf) + .expect("Did not receive message"); + let line = String::from_utf8_lossy(&buf[..size]).to_string(); + output_lines.push(line); + } + + output_lines + }); + ready_rx.await.expect("failed to receive ready signal"); + + // Set up Sink + let config = UnixSinkConfig::new(out_path.clone(), UnixMode::Datagram); + let (sink, _healthcheck) = config + .build( + Default::default(), + Encoder::::new( + BytesEncoder.into(), + TextSerializerConfig::default().build().into(), + ), + ) + .unwrap(); + + // Send the test data + let (input_lines, events) = random_lines_with_stream(100, num_lines, None); + + assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await }) + .await + .expect("Running sink failed"); + + // Receive the data sent by the Sink to the receiver + let output_lines = handle.await.expect("UDS Datagram receiver failed"); + + assert_eq!(input_lines, output_lines); + } } diff --git a/website/cue/reference/components/sinks/base/socket.cue b/website/cue/reference/components/sinks/base/socket.cue index df3f0c6aae57c..55fa8321652c3 100644 --- a/website/cue/reference/components/sinks/base/socket.cue +++ b/website/cue/reference/components/sinks/base/socket.cue @@ -588,4 +588,20 @@ base: components: sinks: socket: configuration: { } } } + unix_mode: { + description: """ + The Unix socket mode to use. + + Unavailable on macOS, where the mode is always `Stream`. + """ + relevant_when: "mode = \"unix\"" + required: false + type: string: { + default: "Stream" + enum: { + Datagram: "Datagram-oriented (`SOCK_DGRAM`)." + Stream: "Stream-oriented (`SOCK_STREAM`)." + } + } + } }