Skip to content

Commit

Permalink
balancer: send pongs in response to websocket pings (#1334)
Browse files Browse the repository at this point in the history
* balancer: send pongs in response to websocket pings

closes #1320

* fix lint
  • Loading branch information
dyc3 authored Feb 14, 2024
1 parent 4f834f0 commit d507065
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/harness-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ serde.workspace = true
serde_json.workspace = true
test-context.workspace = true
tokio.workspace = true
tungstenite.workspace = true
39 changes: 38 additions & 1 deletion crates/harness-tests/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::time::Duration;

use harness::{Client, Monolith, TestRunner};
use harness::{Client, Monolith, MonolithBuilder, TestRunner, WebsocketSender};
use test_context::test_context;
use tungstenite::Message;

#[test_context(TestRunner)]
#[tokio::test]
Expand All @@ -26,3 +27,39 @@ async fn should_kick_clients_when_monolith_lost(ctx: &mut TestRunner) {
assert!(!c1.connected());
assert!(!c2.connected());
}

#[test_context(TestRunner)]
#[tokio::test]
async fn should_send_pongs_to_clients(ctx: &mut TestRunner) {
let mut m = MonolithBuilder::new().build(ctx).await;
m.show().await;

let mut c1 = Client::new(ctx).expect("failed to create client");
c1.join("foo").await;

c1.send_raw(Message::Ping("foo".into())).await;

while let Ok(recv) = c1.recv().await {
if recv.is_pong() {
assert_eq!(recv, Message::Pong("foo".into()));
return;
}
}
panic!("No pong received");
}

#[test_context(TestRunner)]
#[tokio::test]
async fn should_send_pongs_to_monolith(ctx: &mut TestRunner) {
let mut m = MonolithBuilder::new().build(ctx).await;
m.show().await;

m.send_raw(Message::Ping("foo".into())).await;
m.wait_recv().await;

let recv = m.collect_recv_raw();
let pongs = recv.iter().filter(|msg| msg.is_pong()).count();
assert_eq!(pongs, 1);
let pong = recv.iter().find(|msg| msg.is_pong());
assert_eq!(pong, Some(&Message::Pong("foo".into())));
}
4 changes: 4 additions & 0 deletions crates/harness/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ impl Client {
while stream.next().await.is_some() {}
}

/// Receive a message from the balancer. This will block until a message is received, or 200ms has passed.
///
/// If it times out, the client will *not* be disconnected, and it will return `Err`. If the connection is closed,
/// it will return `Err`. If the message is a close message, the client will be disconnected, and it will return `Ok`.
pub async fn recv(&mut self) -> anyhow::Result<Message> {
if let Some(stream) = self.stream.as_mut() {
match tokio::time::timeout(Duration::from_millis(200), stream.next()).await {
Expand Down
14 changes: 12 additions & 2 deletions crates/harness/src/monolith.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ impl Monolith {
let to_send = {
let mut state = state.lock().unwrap();
let parsed = match &msg {
Message::Text(msg) => serde_json::from_str(msg).expect("failed to parse B2M message"),
Message::Text(msg) => Some(serde_json::from_str(msg).expect("failed to parse B2M message")),
Message::Ping(_) => None,
Message::Pong(_) => None,
_ => panic!("unexpected message type: {:?}", msg),
};

let to_send = behavior.on_msg(&parsed, &mut state);
let to_send = if let Some(parsed) = parsed {
behavior.on_msg(&parsed, &mut state)
} else {
vec![]
};
state.received_raw.push(msg);
to_send
};
Expand Down Expand Up @@ -272,6 +278,10 @@ impl Monolith {
.collect()
}

pub fn collect_recv_raw(&self) -> Vec<Message> {
self.state.lock().unwrap().received_raw.clone()
}

pub fn set_all_mock_http(&mut self, mocks: HashMap<String, (MockRespParts, Bytes)>) {
self.state.lock().unwrap().response_mocks = mocks;
}
Expand Down
8 changes: 8 additions & 0 deletions crates/ott-balancer/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ pub async fn client_entry<'r>(

Some(msg) = stream.next() => {
if let Ok(msg) = msg {
if let Message::Ping(ping) = msg {
if let Err(err) = stream.send(Message::Pong(ping)).await {
error!("Error sending pong to client: {:?}", err);
break;
}
continue;
}

if let Err(err) = client_link.inbound_send(msg).await {
error!("Error sending client message to balancer: {:?}", err);
break;
Expand Down
8 changes: 8 additions & 0 deletions crates/ott-balancer/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ async fn connect_and_maintain(

msg = stream.next() => {
if let Some(Ok(msg)) = msg {
if let Message::Ping(ping) = msg {
if let Err(err) = stream.send(Message::Pong(ping)).await {
error!("Error sending pong to monolith: {:?}", err);
break;
}
continue;
}

if let Err(err) = link
.send_monolith_message(monolith_id, SocketMessage::Message(msg))
.await {
Expand Down

0 comments on commit d507065

Please sign in to comment.