Skip to content

Commit

Permalink
feat: send periodic pings for keep alives (#103)
Browse files Browse the repository at this point in the history
#93 & #94 added a notion of keep alives to graphql-ws-client. But the
implementation caused the connection to drop whenever no messages were
received for the specified period. This isn't how I'd usually expect a
keep alive to work though - I'd usually expect it to send pings during
inactive periods, and only drop the connection if a few of those pings
were not replied to.

This PR implements that behaviour instead. It's a slightly breaking
change over the earlier implementation, but that was not yet released so
that seems fine.
  • Loading branch information
obmarg authored Jun 8, 2024
1 parent 16cb857 commit 7259683
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 40 deletions.
56 changes: 27 additions & 29 deletions src/next/actor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::{
collections::{hash_map::Entry, HashMap},
future::{pending, IntoFuture},
time::Duration,
future::IntoFuture,
};

use futures::{
channel::mpsc,
future::{BoxFuture, FusedFuture},
pin_mut, FutureExt, SinkExt, StreamExt,
};
use futures::{channel::mpsc, future::BoxFuture, stream::BoxStream, FutureExt, SinkExt, StreamExt};
use serde_json::{json, Value};

use crate::{
Expand All @@ -19,6 +14,7 @@ use crate::{

use super::{
connection::{Connection, Message},
keepalive::KeepAliveSettings,
ConnectionCommand,
};

Expand All @@ -32,20 +28,22 @@ pub struct ConnectionActor {
client: Option<mpsc::Receiver<ConnectionCommand>>,
connection: Box<dyn Connection + Send>,
operations: HashMap<usize, mpsc::Sender<Value>>,
keep_alive_duration: Option<Duration>,
keep_alive: KeepAliveSettings,
keep_alive_actor: BoxStream<'static, ConnectionCommand>,
}

impl ConnectionActor {
pub(super) fn new(
connection: Box<dyn Connection + Send>,
client: mpsc::Receiver<ConnectionCommand>,
keep_alive_duration: Option<Duration>,
keep_alive: KeepAliveSettings,
) -> Self {
ConnectionActor {
client: Some(client),
connection,
operations: HashMap::new(),
keep_alive_duration,
keep_alive_actor: Box::pin(keep_alive.run()),
keep_alive,
}
}

Expand Down Expand Up @@ -98,6 +96,7 @@ impl ConnectionActor {
code: Some(code),
reason: Some(reason),
}),
ConnectionCommand::Ping => Some(Message::Ping),
}
}

Expand Down Expand Up @@ -162,20 +161,15 @@ impl ConnectionActor {
if let Some(client) = &mut self.client {
let mut next_command = client.next().fuse();
let mut next_message = self.connection.receive().fuse();

let keep_alive = delay_or_pending(self.keep_alive_duration);
pin_mut!(keep_alive);
let mut next_keep_alive = self.keep_alive_actor.next().fuse();

futures::select! {
_ = keep_alive => {
warning!(
"No messages received within keep-alive ({:?}s) from server. Closing the connection",
self.keep_alive_duration
);
return Some(Next::Command(ConnectionCommand::Close(
4503,
"Service unavailable. keep-alive failure".to_string(),
)));
command = next_keep_alive => {
let Some(command) = command else {
return self.keep_alive.report_timeout();
};

return Some(Next::Command(command));
},
command = next_command => {
let Some(command) = command else {
Expand All @@ -186,6 +180,7 @@ impl ConnectionActor {
return Some(Next::Command(command));
},
message = next_message => {
self.keep_alive_actor = Box::pin(self.keep_alive.run());
return Some(Next::Message(message?));
},
}
Expand Down Expand Up @@ -261,12 +256,15 @@ impl Event {
}
}

fn delay_or_pending(duration: Option<Duration>) -> impl FusedFuture {
async move {
match duration {
Some(duration) => futures_timer::Delay::new(duration).await,
None => pending::<()>().await,
}
impl KeepAliveSettings {
fn report_timeout(&self) -> Option<Next> {
warning!(
"No messages received within keep-alive ({:?}s) from server. Closing the connection",
self.interval.unwrap()
);
Some(Next::Command(ConnectionCommand::Close(
4503,
"Service unavailable. keep-alive failure".to_string(),
)))
}
.fuse()
}
31 changes: 20 additions & 11 deletions src/next/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error};
use super::{
actor::ConnectionActor,
connection::{Connection, Message},
keepalive::KeepAliveSettings,
Client, Subscription,
};

Expand All @@ -30,7 +31,7 @@ pub struct ClientBuilder {
payload: Option<serde_json::Value>,
subscription_buffer_size: Option<usize>,
connection: Box<dyn Connection + Send>,
keep_alive_duration: Option<Duration>,
keep_alive: KeepAliveSettings,
}

impl super::Client {
Expand All @@ -53,7 +54,7 @@ impl super::Client {
payload: None,
subscription_buffer_size: None,
connection: Box::new(connection),
keep_alive_duration: None,
keep_alive: KeepAliveSettings::default(),
}
}
}
Expand Down Expand Up @@ -82,13 +83,21 @@ impl ClientBuilder {
}
}

/// Sets the duration within which if Client does not receive Ping messages
/// the connection will be closed
pub fn keep_alive_duration(self, new: Duration) -> Self {
ClientBuilder {
keep_alive_duration: Some(new),
..self
}
/// Sets the interval between keep alives.
///
/// Any incoming messages automatically reset this interval so keep alives may not be sent
/// on busy connections even if this is set.
pub fn keep_alive_interval(mut self, new: Duration) -> Self {
self.keep_alive.interval = Some(new);
self
}

/// The number of keepalive retries before a connection is considered broken.
///
/// This defaults to 3, but has no effect if `keep_alive_interval` is not called.
pub fn keep_alive_retries(mut self, count: usize) -> Self {
self.keep_alive.retries = count;
self
}

/// Initialise a Client and use it to run a single subscription
Expand Down Expand Up @@ -157,8 +166,8 @@ impl ClientBuilder {
let Self {
payload,
subscription_buffer_size,
keep_alive_duration,
mut connection,
keep_alive,
} = self;

connection.send(Message::init(payload)).await?;
Expand Down Expand Up @@ -207,7 +216,7 @@ impl ClientBuilder {

let (command_sender, command_receiver) = mpsc::channel(5);

let actor = ConnectionActor::new(connection, command_receiver, keep_alive_duration);
let actor = ConnectionActor::new(connection, command_receiver, keep_alive);

let client = Client::new_internal(command_sender, subscription_buffer_size.unwrap_or(5));

Expand Down
73 changes: 73 additions & 0 deletions src/next/keepalive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::{future::pending, time::Duration};

use futures::Stream;

use crate::ConnectionCommand;

#[derive(Clone)]
pub(super) struct KeepAliveSettings {
/// How often to send a keep alive ping
pub(super) interval: Option<Duration>,

/// How many pings can be sent without receiving a reply before
/// the connection is considered dropped
pub(super) retries: usize,
}

impl Default for KeepAliveSettings {
fn default() -> Self {
Self {
interval: None,
retries: 3,
}
}
}

enum KeepAliveState {
Running,
StartedKeepAlive,
TimingOut { failure_count: usize },
}

impl KeepAliveSettings {
pub(super) fn run(&self) -> impl Stream<Item = ConnectionCommand> + 'static {
let settings = self.clone();

futures::stream::unfold(KeepAliveState::Running, move |mut state| async move {
match settings.interval {
Some(duration) => futures_timer::Delay::new(duration).await,
None => pending::<()>().await,
}

match state {
KeepAliveState::Running => {
state = KeepAliveState::StartedKeepAlive;
}
KeepAliveState::StartedKeepAlive => {
state = KeepAliveState::TimingOut { failure_count: 0 };
}
KeepAliveState::TimingOut { failure_count } => {
state = KeepAliveState::TimingOut {
failure_count: failure_count + 1,
};
}
}

if state.failure_count() > settings.retries {
// returning None aborts
return None;
}

Some((ConnectionCommand::Ping, state))
})
}
}

impl KeepAliveState {
pub fn failure_count(&self) -> usize {
match self {
KeepAliveState::Running | KeepAliveState::StartedKeepAlive => 0,
KeepAliveState::TimingOut { failure_count } => *failure_count,
}
}
}
2 changes: 2 additions & 0 deletions src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
mod actor;
mod builder;
mod connection;
mod keepalive;
mod stream;

pub use self::{
Expand Down Expand Up @@ -130,6 +131,7 @@ pub(super) enum ConnectionCommand {
sender: mpsc::Sender<Value>,
id: usize,
},
Ping,
Cancel(usize),
Close(u16, String),
}
Expand Down

0 comments on commit 7259683

Please sign in to comment.