From d5070651f9049a94cf35d2296b6b97f65529ecf2 Mon Sep 17 00:00:00 2001 From: Carson McManus Date: Wed, 14 Feb 2024 08:56:56 -0500 Subject: [PATCH] balancer: send pongs in response to websocket pings (#1334) * balancer: send pongs in response to websocket pings closes #1320 * fix lint --- Cargo.lock | 1 + crates/harness-tests/Cargo.toml | 1 + crates/harness-tests/src/connection.rs | 39 +++++++++++++++++++++++++- crates/harness/src/client.rs | 4 +++ crates/harness/src/monolith.rs | 14 +++++++-- crates/ott-balancer/src/client.rs | 8 ++++++ crates/ott-balancer/src/connection.rs | 8 ++++++ 7 files changed, 72 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5bfbb8d6a..22a9d9aaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -956,6 +956,7 @@ dependencies = [ "serde_json", "test-context", "tokio", + "tungstenite", ] [[package]] diff --git a/crates/harness-tests/Cargo.toml b/crates/harness-tests/Cargo.toml index b88c53cf2..671c50e5f 100644 --- a/crates/harness-tests/Cargo.toml +++ b/crates/harness-tests/Cargo.toml @@ -11,3 +11,4 @@ serde.workspace = true serde_json.workspace = true test-context.workspace = true tokio.workspace = true +tungstenite.workspace = true diff --git a/crates/harness-tests/src/connection.rs b/crates/harness-tests/src/connection.rs index b6cd72cf1..6ce36fdff 100644 --- a/crates/harness-tests/src/connection.rs +++ b/crates/harness-tests/src/connection.rs @@ -1,7 +1,8 @@ use std::time::Duration; -use harness::{Client, Monolith, TestRunner}; +use harness::{Client, Monolith, MonolithBuilder, TestRunner, WebsocketSender}; use test_context::test_context; +use tungstenite::Message; #[test_context(TestRunner)] #[tokio::test] @@ -26,3 +27,39 @@ async fn should_kick_clients_when_monolith_lost(ctx: &mut TestRunner) { assert!(!c1.connected()); assert!(!c2.connected()); } + +#[test_context(TestRunner)] +#[tokio::test] +async fn should_send_pongs_to_clients(ctx: &mut TestRunner) { + let mut m = MonolithBuilder::new().build(ctx).await; + m.show().await; + + let mut c1 = Client::new(ctx).expect("failed to create client"); + c1.join("foo").await; + + c1.send_raw(Message::Ping("foo".into())).await; + + while let Ok(recv) = c1.recv().await { + if recv.is_pong() { + assert_eq!(recv, Message::Pong("foo".into())); + return; + } + } + panic!("No pong received"); +} + +#[test_context(TestRunner)] +#[tokio::test] +async fn should_send_pongs_to_monolith(ctx: &mut TestRunner) { + let mut m = MonolithBuilder::new().build(ctx).await; + m.show().await; + + m.send_raw(Message::Ping("foo".into())).await; + m.wait_recv().await; + + let recv = m.collect_recv_raw(); + let pongs = recv.iter().filter(|msg| msg.is_pong()).count(); + assert_eq!(pongs, 1); + let pong = recv.iter().find(|msg| msg.is_pong()); + assert_eq!(pong, Some(&Message::Pong("foo".into()))); +} diff --git a/crates/harness/src/client.rs b/crates/harness/src/client.rs index 5dc26b88c..7e8372257 100644 --- a/crates/harness/src/client.rs +++ b/crates/harness/src/client.rs @@ -89,6 +89,10 @@ impl Client { while stream.next().await.is_some() {} } + /// Receive a message from the balancer. This will block until a message is received, or 200ms has passed. + /// + /// If it times out, the client will *not* be disconnected, and it will return `Err`. If the connection is closed, + /// it will return `Err`. If the message is a close message, the client will be disconnected, and it will return `Ok`. pub async fn recv(&mut self) -> anyhow::Result { if let Some(stream) = self.stream.as_mut() { match tokio::time::timeout(Duration::from_millis(200), stream.next()).await { diff --git a/crates/harness/src/monolith.rs b/crates/harness/src/monolith.rs index 122d34f37..8f7b5a994 100644 --- a/crates/harness/src/monolith.rs +++ b/crates/harness/src/monolith.rs @@ -122,11 +122,17 @@ impl Monolith { let to_send = { let mut state = state.lock().unwrap(); let parsed = match &msg { - Message::Text(msg) => serde_json::from_str(msg).expect("failed to parse B2M message"), + Message::Text(msg) => Some(serde_json::from_str(msg).expect("failed to parse B2M message")), + Message::Ping(_) => None, + Message::Pong(_) => None, _ => panic!("unexpected message type: {:?}", msg), }; - let to_send = behavior.on_msg(&parsed, &mut state); + let to_send = if let Some(parsed) = parsed { + behavior.on_msg(&parsed, &mut state) + } else { + vec![] + }; state.received_raw.push(msg); to_send }; @@ -272,6 +278,10 @@ impl Monolith { .collect() } + pub fn collect_recv_raw(&self) -> Vec { + self.state.lock().unwrap().received_raw.clone() + } + pub fn set_all_mock_http(&mut self, mocks: HashMap) { self.state.lock().unwrap().response_mocks = mocks; } diff --git a/crates/ott-balancer/src/client.rs b/crates/ott-balancer/src/client.rs index e9bb6c8b2..be971dee5 100644 --- a/crates/ott-balancer/src/client.rs +++ b/crates/ott-balancer/src/client.rs @@ -197,6 +197,14 @@ pub async fn client_entry<'r>( Some(msg) = stream.next() => { if let Ok(msg) = msg { + if let Message::Ping(ping) = msg { + if let Err(err) = stream.send(Message::Pong(ping)).await { + error!("Error sending pong to client: {:?}", err); + break; + } + continue; + } + if let Err(err) = client_link.inbound_send(msg).await { error!("Error sending client message to balancer: {:?}", err); break; diff --git a/crates/ott-balancer/src/connection.rs b/crates/ott-balancer/src/connection.rs index d4170883a..2d9472f72 100644 --- a/crates/ott-balancer/src/connection.rs +++ b/crates/ott-balancer/src/connection.rs @@ -183,6 +183,14 @@ async fn connect_and_maintain( msg = stream.next() => { if let Some(Ok(msg)) = msg { + if let Message::Ping(ping) = msg { + if let Err(err) = stream.send(Message::Pong(ping)).await { + error!("Error sending pong to monolith: {:?}", err); + break; + } + continue; + } + if let Err(err) = link .send_monolith_message(monolith_id, SocketMessage::Message(msg)) .await {