From b8853a26e9e2e6dcb1a138cdf21e0a1f63bf3866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 24 Jan 2025 12:32:35 +0100 Subject: [PATCH] Add tests for `websocket_server` --- Cargo.toml | 3 +- src/sinks/websocket_server/sink.rs | 168 ++++++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e558409118d66..bdb3a40164f11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -799,7 +799,8 @@ sinks-statsd = ["sinks-utils-udp", "tokio-util/net"] sinks-utils-udp = [] sinks-vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build", "dep:prost"] sinks-websocket = ["dep:tokio-tungstenite"] -sinks-websocket-server = ["dep:tokio-tungstenite"] +sinks-websocket-server = ["dep:tokio-tungstenite", "sources-utils-http-auth", +"sources-utils-http-error", "sources-utils-http-prelude"] sinks-webhdfs = ["dep:opendal"] # Identifies that the build is a nightly build diff --git a/src/sinks/websocket_server/sink.rs b/src/sinks/websocket_server/sink.rs index 3abd8ee925a16..7379ca24538dc 100644 --- a/src/sinks/websocket_server/sink.rs +++ b/src/sinks/websocket_server/sink.rs @@ -103,8 +103,7 @@ impl WebSocketListenerSink { let header_callback = |req: &Request, response: Response| match auth.is_valid( &req.headers() .get(AUTHORIZATION) - .map(|h| h.to_str().ok()) - .flatten() + .and_then(|h| h.to_str().ok()) .map(ToString::to_string), ) { Ok(_) => Ok(response), @@ -227,3 +226,168 @@ impl StreamSink for WebSocketListenerSink { Ok(()) } } + +#[cfg(test)] +mod tests { + use futures::{channel::mpsc::UnboundedReceiver, SinkExt, Stream, StreamExt}; + use futures_util::stream; + use std::future::ready; + + use tokio::{task::JoinHandle, time}; + use vector_lib::sink::VectorSink; + + use super::*; + + use crate::{ + event::{Event, LogEvent}, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + next_addr, + }, + }; + + #[tokio::test] + async fn test_single_client() { + let event = Event::Log(LogEvent::from("foo")); + + let (mut sender, input_events) = build_test_event_channel(); + let address = next_addr(); + let port = address.port(); + + let websocket_sink = start_websocket_server_sink( + WebSocketListenerSinkConfig { + address, + ..Default::default() + }, + input_events, + ) + .await; + + let client_handle = attach_websocket_client(port, vec![event.clone()]).await; + sender.send(event).await.expect("Failed to send."); + + client_handle.await.unwrap(); + drop(sender); + websocket_sink.await.unwrap(); + } + + #[tokio::test] + async fn test_single_client_late_connect() { + let event1 = Event::Log(LogEvent::from("foo1")); + let event2 = Event::Log(LogEvent::from("foo2")); + + let (mut sender, input_events) = build_test_event_channel(); + let address = next_addr(); + let port = address.port(); + + let websocket_sink = start_websocket_server_sink( + WebSocketListenerSinkConfig { + address, + ..Default::default() + }, + input_events, + ) + .await; + + // Sending event 1 before client joined, the client should not received it + sender.send(event1).await.expect("Failed to send."); + + // Now connect the client + let client_handle = attach_websocket_client(port, vec![event2.clone()]).await; + + // Sending event 2, this one should be received by the client + sender.send(event2).await.expect("Failed to send."); + + client_handle.await.unwrap(); + drop(sender); + websocket_sink.await.unwrap(); + } + + #[tokio::test] + async fn test_multiple_clients() { + let event = Event::Log(LogEvent::from("foo")); + + let (mut sender, input_events) = build_test_event_channel(); + let address = next_addr(); + let port = address.port(); + + let websocket_sink = start_websocket_server_sink( + WebSocketListenerSinkConfig { + address, + ..Default::default() + }, + input_events, + ) + .await; + + let client_handle_1 = attach_websocket_client(port, vec![event.clone()]).await; + let client_handle_2 = attach_websocket_client(port, vec![event.clone()]).await; + sender.send(event).await.expect("Failed to send."); + + client_handle_1.await.unwrap(); + client_handle_2.await.unwrap(); + drop(sender); + websocket_sink.await.unwrap(); + } + + #[tokio::test] + async fn sink_spec_compliance() { + let event = Event::Log(LogEvent::from("foo")); + + let sink = WebSocketListenerSink::new(WebSocketListenerSinkConfig { + address: next_addr(), + ..Default::default() + }) + .unwrap(); + + run_and_assert_sink_compliance( + VectorSink::from_event_streamsink(sink), + stream::once(ready(event)), + &SINK_TAGS, + ) + .await; + } + + async fn start_websocket_server_sink( + config: WebSocketListenerSinkConfig, + events: S, + ) -> JoinHandle<()> + where + S: Stream + Send + 'static, + { + let sink = WebSocketListenerSink::new(config).unwrap(); + + let compliance_assertion = tokio::spawn(run_and_assert_sink_compliance( + VectorSink::from_event_streamsink(sink), + events, + &SINK_TAGS, + )); + + time::sleep(time::Duration::from_millis(100)).await; + + compliance_assertion + } + + async fn attach_websocket_client(port: u16, expected_events: Vec) -> JoinHandle<()> { + let (ws_stream, _) = tokio_tungstenite::connect_async(format!("ws://localhost:{port}")) + .await + .expect("Client failed to connect."); + let (_, rx) = ws_stream.split(); + tokio::spawn(async move { + let events = expected_events.clone(); + rx.take(events.len()) + .zip(stream::iter(events)) + .for_each(|(msg, expected)| async { + let msg_text = msg.unwrap().into_text().unwrap(); + let expected = serde_json::to_string(expected.into_log().value()).unwrap(); + assert_eq!(expected, msg_text); + }) + .await; + }) + } + + fn build_test_event_channel() -> (UnboundedSender, UnboundedReceiver) { + let (tx, rx) = futures::channel::mpsc::unbounded(); + (tx, rx) + } +}