diff --git a/rust/foxglove/src/tests/websocket.rs b/rust/foxglove/src/tests/websocket.rs new file mode 100644 index 00000000..e4e798d5 --- /dev/null +++ b/rust/foxglove/src/tests/websocket.rs @@ -0,0 +1,519 @@ +use futures_util::{FutureExt, SinkExt, StreamExt}; +use serde_json::{json, Value}; +use std::sync::Arc; +use tokio_tungstenite::tungstenite::{self, http::HeaderValue, Message}; +use tungstenite::client::IntoClientRequest; + +use crate::websocket::{ + create_server, ClientMessage, ServerOptions, StatusLevel, SubscriptionId, SUBPROTOCOL, +}; +use crate::{collection, Channel, ChannelBuilder, LogContext, LogSink, Metadata, Schema}; + +fn new_channel(topic: &str, ctx: &LogContext) -> Arc { + ChannelBuilder::new(topic) + .message_encoding("message_encoding") + .schema(Schema::new( + "schema_name", + "schema_encoding", + b"schema_data", + )) + .metadata(collection! {"key".to_string() => "value".to_string()}) + .with_context(ctx) + .build() + .expect("Failed to create channel") +} + +#[tokio::test] +async fn test_client_connect() { + let server = create_server(ServerOptions { + session_id: Some("mock_sess_id".to_string()), + name: Some("mock_server".to_string()), + ..Default::default() + }); + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let mut client_stream = connect_client(addr).await; + + let result = client_stream.next().await.expect("No message received"); + let msg = result.expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + let server_info: Value = serde_json::from_str(&text).expect("Failed to parse server info"); + + assert_eq!(server_info["name"], "mock_server"); + assert_eq!(server_info["sessionId"], "mock_sess_id"); + + server.stop().await; +} + +#[tokio::test] +async fn test_handshake_with_unknown_subprotocol_fails_on_client() { + let server = create_server(ServerOptions::default()); + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let mut request = format!("ws://{addr}/") + .into_client_request() + .expect("Failed to build request"); + + request.headers_mut().insert( + "sec-websocket-protocol", + HeaderValue::from_static("unknown"), + ); + + let result = tokio_tungstenite::connect_async(request).await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "WebSocket protocol error: SubProtocol error: Server sent no subprotocol" + ); +} + +#[tokio::test] +async fn test_handshake_with_multiple_subprotocols() { + let server = create_server(ServerOptions::default()); + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let request = format!("ws://{addr}/") + .into_client_request() + .expect("Failed to build request"); + + let mut req1 = request.clone(); + let header = format!("{}, foxglove.sdk.v2", SUBPROTOCOL); + req1.headers_mut().insert( + "sec-websocket-protocol", + HeaderValue::from_str(&header).unwrap(), + ); + + let (_, response) = tokio_tungstenite::connect_async(req1) + .await + .expect("Failed to connect"); + + assert_eq!( + response.headers().get("sec-websocket-protocol"), + Some(&HeaderValue::from_static(SUBPROTOCOL)) + ); + + // In req2, the client's preferred (initial) subprotocol is not valid + let mut req2 = request.clone(); + let header = format!("unknown, {}, another", SUBPROTOCOL); + req2.headers_mut().insert( + "sec-websocket-protocol", + HeaderValue::from_str(&header).unwrap(), + ); + + let (_, response) = tokio_tungstenite::connect_async(req2) + .await + .expect("Failed to connect"); + + assert_eq!( + response.headers().get("sec-websocket-protocol"), + Some(&HeaderValue::from_static(SUBPROTOCOL)) + ); + + server.stop().await; +} + +#[tokio::test] +async fn test_advertise_to_client() { + let server = create_server(ServerOptions::default()); + + let ctx = LogContext::new(); + ctx.add_sink(server.clone()); + + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let client_stream = connect_client(addr).await; + let (mut client_sender, mut client_receiver) = client_stream.split(); + + let msg = client_receiver.next().await.expect("No serverInfo sent"); + msg.expect("Invalid serverInfo"); + + let ch = new_channel("/foo", &ctx); + let metadata = Metadata::default(); + + server.log(&ch, b"foo bar", &metadata).unwrap(); + + let subscription_id = 1; + let result = client_receiver.next().await.expect("No advertisement sent"); + let advertisement = result.expect("Failed to parse advertisement"); + let text = advertisement.into_text().expect("Invalid advertisement"); + let msg: Value = serde_json::from_str(&text).expect("Failed to advertisement"); + assert_eq!(msg["op"], "advertise"); + assert_eq!( + msg["channels"][0]["id"].as_u64().unwrap(), + u64::from(ch.id()) + ); + + let subscribe = json!({ + "op": "subscribe", + "subscriptions": [ + { + "id": subscription_id, + "channelId": ch.id(), + } + ] + }); + client_sender + .send(Message::text(subscribe.to_string())) + .await + .expect("Failed to send"); + + // Allow the server to process the subscription + // FG-9723: replace this with an on_subscribe callback + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + server.log(&ch, b"{\"a\":1}", &metadata).unwrap(); + + let result = client_receiver.next().await.unwrap(); + let msg = result.expect("Failed to parse message"); + let data = msg.into_data(); + + assert_eq!(data[0], 0x01); // message data opcode + assert_eq!( + u32::from_le_bytes(data[1..=4].try_into().unwrap()), + subscription_id + ); + + server.stop().await; +} + +#[tokio::test] +async fn test_log_only_to_subscribers() { + let server = create_server(ServerOptions::default()); + + let ctx = LogContext::new(); + + ctx.add_sink(server.clone()); + + let ch1 = new_channel("/foo", &ctx); + let ch2 = new_channel("/bar", &ctx); + let ch3 = new_channel("/baz", &ctx); + + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let mut client1 = connect_client(addr.clone()).await; + let mut client2 = connect_client(addr.clone()).await; + let mut client3 = connect_client(addr).await; + + // client1 subscribes to ch1; client2 subscribes to ch2; client3 unsubscribes from all + // Read the server info message from each + let _ = client1.next().await.expect("No serverInfo sent").unwrap(); + let _ = client2.next().await.expect("No serverInfo sent").unwrap(); + let _ = client3.next().await.expect("No serverInfo sent").unwrap(); + + // Read the channel advertisement from each client for all 3 channels + for _ in 0..3 { + let _ = client1 + .next() + .await + .expect("No advertisement sent") + .unwrap(); + let _ = client2 + .next() + .await + .expect("No advertisement sent") + .unwrap(); + let _ = client3 + .next() + .await + .expect("No advertisement sent") + .unwrap(); + } + + let subscribe1 = json!({ + "op": "subscribe", + "subscriptions": [ + { + "id": 1, + "channelId": ch1.id(), + } + ] + }); + client1 + .send(Message::text(subscribe1.to_string())) + .await + .expect("Failed to send"); + + let subscribe2 = json!({ + "op": "subscribe", + "subscriptions": [ + { + "id": 2, + "channelId": ch2.id(), + } + ] + }); + client2 + .send(Message::text(subscribe2.to_string())) + .await + .expect("Failed to send"); + + let unsubscribe_both = json!(ClientMessage::Unsubscribe { + subscription_ids: vec![SubscriptionId::new(1), SubscriptionId::new(2)] + }); + client3 + .send(Message::text(subscribe1.to_string())) + .await + .expect("Failed to send"); + client3 + .send(Message::text(subscribe2.to_string())) + .await + .expect("Failed to send"); + client3 + .send(Message::text(unsubscribe_both.to_string())) + .await + .expect("Failed to send"); + + // Allow the server to process the subscription + // FG-9723: replace this with an on_subscribe callback + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let metadata = Metadata { + log_time: 123456, + ..Metadata::default() + }; + server.log(&ch1, b"channel1", &metadata).unwrap(); + server.log(&ch2, b"channel2", &metadata).unwrap(); + server.log(&ch3, b"channel3", &metadata).unwrap(); + + // Receive the message for client1 and client2 + let result = client1.next().await.unwrap(); + let msg = result.expect("Failed to parse message"); + let data = msg.into_data(); + assert_eq!(data[0], 0x01); // message data opcode + assert_eq!(u32::from_le_bytes(data[1..=4].try_into().unwrap()), 1); + assert_eq!(u64::from_le_bytes(data[5..=12].try_into().unwrap()), 123456); + assert_eq!(&data[13..], b"channel1"); + + let result = client2.next().await.unwrap(); + let msg = result.expect("Failed to parse message"); + let data = msg.into_data(); + assert_eq!(data[0], 0x01); // message data opcode + assert_eq!(u32::from_le_bytes(data[1..=4].try_into().unwrap()), 2); + assert_eq!(u64::from_le_bytes(data[5..=12].try_into().unwrap()), 123456); + assert_eq!(&data[13..], b"channel2"); + + // Client 3 should not receive any messages since it unsubscribed from all channels + let rs = client3.next().now_or_never(); + assert!(rs.is_none()); + + server.stop().await; +} + +#[tokio::test] +async fn test_error_when_client_publish_unsupported() { + // Server does not support clientPublish capability by default + let server = create_server(ServerOptions::default()); + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let mut ws_client = connect_client(addr).await; + ws_client.next().await.expect("No serverInfo sent").ok(); + + let advertise = json!({ + "op": "advertise", + "channels": [ + { + "id": 1, + "topic": "/test", + "encoding": "json", + "schemaName": "test", + } + ] + }); + ws_client + .send(Message::text(advertise.to_string())) + .await + .expect("Failed to send advertisement"); + + // Server should respond with an error status + let result = ws_client.next().await.expect("No message received"); + let msg = result.expect("Failed to parse message"); + let msg = msg.into_text().expect("Failed to parse message text"); + let status: Value = serde_json::from_str(&msg).expect("Failed to parse status"); + assert_eq!(status["op"], "status"); + assert_eq!(status["level"], 2); + assert_eq!( + status["message"], + "Server does not support clientPublish capability" + ); + + ws_client.close(None).await.unwrap(); + server.stop().await; +} + +#[tokio::test] +async fn test_error_status_message() { + let server = create_server(ServerOptions::default()); + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let mut ws_client = connect_client(addr).await; + + _ = ws_client.next().await.expect("No serverInfo sent"); + + ws_client + .send(Message::text("nonsense".to_string())) + .await + .expect("Failed to send message"); + + let result = ws_client.next().await.unwrap(); + let msg = result.expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + let status: Value = serde_json::from_str(&text).expect("Failed to parse status"); + assert_eq!(status["level"], 2); + assert_eq!(status["message"], "Unsupported message: nonsense"); + + let msg = json!({ + "op": "subscribe", + "subscriptions": [{ "id": 1, "channelId": 555, }] + }); + ws_client + .send(Message::text(msg.to_string())) + .await + .expect("Failed to send message"); + + let result = ws_client.next().await.unwrap(); + let msg = result.expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + let status: Value = serde_json::from_str(&text).expect("Failed to parse status"); + assert_eq!(status["level"], 2); + assert_eq!(status["message"], "Unknown channel ID: 555"); + + ws_client + .send(Message::binary(vec![0xff])) + .await + .expect("Failed to send message"); + + let result = ws_client.next().await.unwrap(); + let msg = result.expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + let status: Value = serde_json::from_str(&text).expect("Failed to parse status"); + assert_eq!(status["level"], 2); + assert_eq!(status["message"], "Invalid binary opcode: 255"); + + server.stop().await; +} + +#[tokio::test] +async fn test_publish_status_message() { + let server = create_server(ServerOptions::default()); + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let mut ws_client = connect_client(addr).await; + + _ = ws_client.next().await.expect("No serverInfo sent"); + + server.publish_status( + StatusLevel::Info, + "Hello, world!".to_string(), + Some("123".to_string()), + ); + server.publish_status( + StatusLevel::Error, + "Reactor core overload!".to_string(), + Some("abc".to_string()), + ); + + let msg = ws_client + .next() + .await + .expect("No message received") + .expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + assert_eq!( + text, + r#"{"op":"status","level":1,"message":"Hello, world!","id":"123"}"# + ); + + let msg = ws_client + .next() + .await + .expect("No message received") + .expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + assert_eq!( + text, + r#"{"op":"status","level":3,"message":"Reactor core overload!","id":"abc"}"# + ); +} + +#[tokio::test] +async fn test_remove_status() { + let server = create_server(ServerOptions::default()); + let addr = server + .start("127.0.0.1", 0) + .await + .expect("Failed to start server"); + + let mut ws_client1 = connect_client(addr.clone()).await; + let mut ws_client2 = connect_client(addr).await; + + _ = ws_client1.next().await.expect("No serverInfo sent"); + _ = ws_client2.next().await.expect("No serverInfo sent"); + + // These don't have to exist, and aren't checked + server.remove_status(vec!["123".to_string(), "abc".to_string()]); + + let msg = ws_client1 + .next() + .await + .expect("No message received") + .expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + assert_eq!(text, r#"{"op":"removeStatus","statusIds":["123","abc"]}"#); + + let msg = ws_client2 + .next() + .await + .expect("No message received") + .expect("Failed to parse message"); + let text = msg.into_text().expect("Failed to get message text"); + assert_eq!(text, r#"{"op":"removeStatus","statusIds":["123","abc"]}"#); +} + +/// Connect to a server, ensuring the protocol header is set, and return the client WS stream +async fn connect_client( + addr: String, +) -> tokio_tungstenite::WebSocketStream> { + let mut request = format!("ws://{addr}/") + .into_client_request() + .expect("Failed to build request"); + + request.headers_mut().insert( + "sec-websocket-protocol", + HeaderValue::from_static(SUBPROTOCOL), + ); + + let (ws_stream, response) = tokio_tungstenite::connect_async(request) + .await + .expect("Failed to connect"); + + assert_eq!( + response.headers().get("sec-websocket-protocol"), + Some(&HeaderValue::from_static(SUBPROTOCOL)) + ); + + ws_stream +} diff --git a/rust/foxglove/src/websocket.rs b/rust/foxglove/src/websocket.rs index b10c8101..5116a8c9 100644 --- a/rust/foxglove/src/websocket.rs +++ b/rust/foxglove/src/websocket.rs @@ -4,7 +4,7 @@ use crate::websocket::protocol::client::Subscription; pub use crate::websocket::protocol::client::{ ClientChannel, ClientChannelId, ClientMessage, SubscriptionId, }; -pub use crate::websocket::protocol::server::Capability; +pub use crate::websocket::protocol::server::{Capability, Status, StatusLevel}; #[cfg(feature = "unstable")] pub use crate::websocket::protocol::server::{Parameter, ParameterType, ParameterValue}; use crate::{get_runtime_handle, Channel, FoxgloveError, LogSink, Metadata}; @@ -90,6 +90,7 @@ impl ChannelView<'_> { } pub(crate) const SUBPROTOCOL: &str = "foxglove.sdk.v1"; +const MAX_SEND_RETRIES: usize = 10; type WebsocketSender = SplitSink, Message>; @@ -235,6 +236,30 @@ impl ConnectedClient { } } + /// Send the message on the data plane, dropping up to retries older messages to make room, if necessary. + fn send_data_lossy(&self, message: Message, retries: usize) -> SendLossyResult { + send_lossy( + &self.addr, + &self.data_plane_tx, + &self.data_plane_rx, + message, + retries, + ) + } + + /// Send the message on the control plane, disconnecting the client if the channel is full. + fn send_control_msg(&self, message: Message) -> bool { + if let Err(TrySendError::Full(_)) = self.control_plane_tx.try_send(message) { + // TODO disconnect the slow client FG-10441 + tracing::error!( + "Client control plane is full for {}, dropping message", + self.addr + ); + return false; + } + true + } + fn handle_binary_message(&self, message: Message) { if message.is_empty() { tracing::debug!("Received empty binary message from {}", self.addr); @@ -470,26 +495,26 @@ impl ConnectedClient { /// Send an ad hoc error status message to the client, with the given message. fn send_error(&self, message: String) { - self.send_status(protocol::server::StatusLevel::Error, message, None); + self.send_status(Status::new(StatusLevel::Error, message)); } /// Send an ad hoc warning status message to the client, with the given message. + #[allow(dead_code)] fn send_warning(&self, message: String) { - self.send_status(protocol::server::StatusLevel::Warning, message, None); + self.send_status(Status::new(StatusLevel::Warning, message)); } - fn send_status( - &self, - level: protocol::server::StatusLevel, - message: String, - id: Option, - ) { - let status = protocol::server::Status { level, message, id }; + /// Send a status message to the client. + fn send_status(&self, status: Status) { let message = Message::text(serde_json::to_string(&status).unwrap()); - // If the message can't be sent, or the outbox is full, log a warning and continue. - self.data_plane_tx.try_send(message).unwrap_or_else(|err| { - tracing::warn!("Failed to send status to client {}: {err}", self.addr) - }); + match status.level { + StatusLevel::Info => { + self.send_data_lossy(message, MAX_SEND_RETRIES); + } + _ => { + self.send_control_msg(message); + } + } } } @@ -604,13 +629,7 @@ impl Server { let clients = self.clients.get(); for client in clients.iter() { - if let Err(err) = client - .control_plane_tx - .send_async(Message::text(message.clone())) - .await - { - tracing::error!("Error advertising channel to client {}: {err}", client.addr); - } else { + if client.send_control_msg(Message::text(message.clone())) { tracing::info!( "Advertised channel {} with id {} to client {}", channel.topic, @@ -627,16 +646,7 @@ impl Server { let message = protocol::server::unadvertise(channel_id); let clients = self.clients.get(); for client in clients.iter() { - if let Err(err) = client - .control_plane_tx - .send_async(Message::text(message.clone())) - .await - { - tracing::error!( - "Error unadvertising channel to client {}: {err}", - client.addr - ); - } else { + if client.send_control_msg(Message::text(message.clone())) { tracing::info!( "Unadvertised channel with id {} to client {}", channel_id, @@ -662,9 +672,7 @@ impl Server { let clients = self.clients.get(); for client in clients.iter() { - if let Err(err) = client.control_plane_tx.send_async(message.clone()).await { - tracing::error!("Failed to send time to client {}: {err}", client.addr); - } + client.send_control_msg(message.clone()); } } @@ -693,16 +701,25 @@ impl Server { if client_addr.is_some_and(|addr| addr != client.addr) { continue; } - if let Err(err) = client - .control_plane_tx - .send_async(Message::text(message.clone())) - .await - { - tracing::error!( - "Failed to send parameter values to client {}: {err}", - client.addr - ); - } + client.send_control_msg(Message::text(message.clone())); + } + } + + /// Send a message to all clients. + pub fn publish_status(&self, status: Status) { + let clients = self.clients.get(); + for client in clients.iter() { + client.send_status(status.clone()); + } + } + + /// Remove status messages by id from all clients. + pub fn remove_status(&self, status_ids: Vec) { + let remove = protocol::server::RemoveStatus { status_ids }; + let message = Message::text(serde_json::to_string(&remove).unwrap()); + let clients = self.clients.get(); + for client in clients.iter() { + client.send_control_msg(message.clone()); } } @@ -869,6 +886,7 @@ impl Server { #[derive(Debug, Clone, Copy)] enum SendLossyResult { Sent, + #[allow(dead_code)] SentLossy(usize), ExhaustedRetries, } @@ -881,19 +899,34 @@ enum SendLossyResult { /// in this manner, this function returns `SendLossyResult::SentLossy(dropped)`. If the maximum /// number of retries is reached, it returns `SendLossyResult::ExhaustedRetries`. fn send_lossy( + client_addr: &SocketAddr, tx: &flume::Sender, rx: &flume::Receiver, mut message: Message, retries: usize, ) -> SendLossyResult { + // If the queue is full, drop the oldest message(s). We do this because the websocket + // client is falling behind, and we either start dropping messages, or we'll end up + // buffering until we run out of memory. There's no point in that because the client is + // unlikely to catch up and be able to consume the messages. let mut dropped = 0; loop { match (dropped, tx.try_send(message)) { (0, Ok(_)) => return SendLossyResult::Sent, - (dropped, Ok(_)) => return SendLossyResult::SentLossy(dropped), + (dropped, Ok(_)) => { + tracing::warn!( + "outbox for client {} full, dropped {dropped} messages", + client_addr + ); + return SendLossyResult::SentLossy(dropped); + } (_, Err(TrySendError::Disconnected(_))) => unreachable!("we're holding rx"), (_, Err(TrySendError::Full(rejected))) => { if dropped >= retries { + tracing::warn!( + "outbox for client {} full, dropping message after 10 attempts", + client_addr + ); return SendLossyResult::ExhaustedRetries; } message = rejected; @@ -928,23 +961,7 @@ impl LogSink for Server { let message = Message::binary(buf); - // If the queue is full, drop the oldest message(s). We do this because the websocket - // client is falling behind, and we either start dropping messages, or we'll end up - // buffering until we run out of memory. There's no point in that because the client is - // unlikely to catch up and be able to consume the messages. - match send_lossy(&client.data_plane_tx, &client.data_plane_rx, message, 10) { - SendLossyResult::Sent => (), - SendLossyResult::SentLossy(dropped) => tracing::warn!( - "outbox for client {} full, dropped {dropped} messages", - client.addr - ), - SendLossyResult::ExhaustedRetries => { - tracing::warn!( - "outbox for client {} full, dropping message after 10 attempts", - client.addr - ) - } - }; + client.send_data_lossy(message, MAX_SEND_RETRIES); } Ok(()) } diff --git a/rust/foxglove/src/websocket/protocol/server.rs b/rust/foxglove/src/websocket/protocol/server.rs index c92a2513..bc2df573 100644 --- a/rust/foxglove/src/websocket/protocol/server.rs +++ b/rust/foxglove/src/websocket/protocol/server.rs @@ -90,7 +90,7 @@ pub enum ServerMessage { }, } -#[derive(Serialize_repr)] +#[derive(Debug, Copy, Clone, Serialize_repr)] #[repr(u8)] pub enum StatusLevel { #[allow(dead_code)] @@ -100,14 +100,37 @@ pub enum StatusLevel { Error = 2, } -#[derive(Serialize)] +#[derive(Debug, Clone, Serialize)] #[serde(tag = "op")] #[serde(rename = "status")] pub struct Status { - pub level: StatusLevel, - pub message: String, + pub(crate) level: StatusLevel, + pub(crate) message: String, #[serde(skip_serializing_if = "Option::is_none")] - pub id: Option, + pub(crate) id: Option, +} + +impl Status { + pub fn new(level: StatusLevel, message: String) -> Self { + Self { + level, + message, + id: None, + } + } + + pub fn with_id(mut self, id: String) -> Self { + self.id = Some(id); + self + } +} + +#[derive(Serialize)] +#[serde(tag = "op")] +#[serde(rename = "removeStatus")] +#[serde(rename_all = "camelCase")] +pub struct RemoveStatus { + pub status_ids: Vec, } /// A capability that the websocket server advertises to its clients. diff --git a/rust/foxglove/src/websocket/tests.rs b/rust/foxglove/src/websocket/tests.rs index 7eb8e948..1068f703 100644 --- a/rust/foxglove/src/websocket/tests.rs +++ b/rust/foxglove/src/websocket/tests.rs @@ -1,6 +1,7 @@ use assert_matches::assert_matches; use futures_util::{FutureExt, SinkExt, StreamExt}; use serde_json::{json, Value}; +use std::net::SocketAddr; use std::sync::Arc; use tokio_tungstenite::tungstenite::{self, http::HeaderValue, Message}; use tungstenite::client::IntoClientRequest; @@ -28,10 +29,12 @@ fn test_send_lossy() { const BACKLOG: usize = 4; const TOTAL: usize = 10; + let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 1234); + let (tx, rx) = flume::bounded(BACKLOG); for i in 0..BACKLOG { assert_matches!( - send_lossy(&tx, &rx, make_message(i), 0), + send_lossy(&addr, &tx, &rx, make_message(i), 0), SendLossyResult::Sent ); } @@ -39,11 +42,11 @@ fn test_send_lossy() { // The queue is full now. We'll only succeed with retries. for i in BACKLOG..TOTAL { assert_matches!( - send_lossy(&tx, &rx, make_message(TOTAL + i), 0), + send_lossy(&addr, &tx, &rx, make_message(TOTAL + i), 0), SendLossyResult::ExhaustedRetries ); assert_matches!( - send_lossy(&tx, &rx, make_message(i), 1), + send_lossy(&addr, &tx, &rx, make_message(i), 1), SendLossyResult::SentLossy(1) ); } diff --git a/rust/foxglove/src/websocket_server.rs b/rust/foxglove/src/websocket_server.rs index a4f65777..05ffc11d 100644 --- a/rust/foxglove/src/websocket_server.rs +++ b/rust/foxglove/src/websocket_server.rs @@ -4,12 +4,11 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::runtime::Handle; - -use crate::websocket::{create_server, Server, ServerOptions}; +use crate::websocket::{create_server, Server, ServerOptions, Status}; #[cfg(feature = "unstable")] use crate::websocket::{Capability, Parameter}; use crate::{get_runtime_handle, FoxgloveError, LogContext, LogSink}; +use tokio::runtime::Handle; /// A websocket server for live visualization. #[must_use] @@ -175,6 +174,24 @@ impl WebSocketServerHandle { .await; } + /// Publishes a status message to all clients. + /// + /// For more information, refer to the [Status][status] message specification. + /// + /// [status]: https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md#status + pub fn publish_status(&self, status: Status) { + self.0.publish_status(status); + } + + /// Removes status messages by id from all clients. + /// + /// For more information, refer to the [Remove Status][remove-status] message specification. + /// + /// [remove-status]: https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md#remove-status + pub fn remove_status(&self, status_ids: Vec) { + self.0.remove_status(status_ids); + } + /// Gracefully shutdown the websocket server. pub async fn stop(self) { let sink = self.0.clone() as Arc; @@ -206,6 +223,24 @@ impl WebSocketServerBlockingHandle { .block_on(self.0.publish_parameter_values(parameters)) } + /// Publishes a status message to all clients. + /// + /// For more information, refer to the [Status][status] message specification. + /// + /// [status]: https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md#status + pub fn publish_status(&self, status: Status) { + self.0.publish_status(status); + } + + /// Removes status messages by id from all clients. + /// + /// For more information, refer to the [Remove Status][remove-status] message specification. + /// + /// [remove-status]: https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md#remove-status + pub fn remove_status(&self, status_ids: Vec) { + self.0.remove_status(status_ids); + } + /// Gracefully shutdown the websocket server. pub fn stop(self) { self.0.runtime().clone().block_on(self.0.stop());