diff --git a/src/next/actor.rs b/src/next/actor.rs index dfe924f..84f35e0 100644 --- a/src/next/actor.rs +++ b/src/next/actor.rs @@ -1,14 +1,9 @@ use std::{ collections::{hash_map::Entry, HashMap}, - future::{pending, IntoFuture}, - time::Duration, + future::IntoFuture, }; -use futures::{ - channel::mpsc, - future::{BoxFuture, FusedFuture}, - pin_mut, FutureExt, SinkExt, StreamExt, -}; +use futures::{channel::mpsc, future::BoxFuture, stream::BoxStream, FutureExt, SinkExt, StreamExt}; use serde_json::{json, Value}; use crate::{ @@ -19,6 +14,7 @@ use crate::{ use super::{ connection::{Connection, Message}, + keepalive::KeepAliveSettings, ConnectionCommand, }; @@ -32,20 +28,22 @@ pub struct ConnectionActor { client: Option>, connection: Box, operations: HashMap>, - keep_alive_duration: Option, + keep_alive: KeepAliveSettings, + keep_alive_actor: BoxStream<'static, ConnectionCommand>, } impl ConnectionActor { pub(super) fn new( connection: Box, client: mpsc::Receiver, - keep_alive_duration: Option, + keep_alive: KeepAliveSettings, ) -> Self { ConnectionActor { client: Some(client), connection, operations: HashMap::new(), - keep_alive_duration, + keep_alive_actor: Box::pin(keep_alive.run()), + keep_alive, } } @@ -98,6 +96,7 @@ impl ConnectionActor { code: Some(code), reason: Some(reason), }), + ConnectionCommand::Ping => Some(Message::Ping), } } @@ -162,20 +161,15 @@ impl ConnectionActor { if let Some(client) = &mut self.client { let mut next_command = client.next().fuse(); let mut next_message = self.connection.receive().fuse(); - - let keep_alive = delay_or_pending(self.keep_alive_duration); - pin_mut!(keep_alive); + let mut next_keep_alive = self.keep_alive_actor.next().fuse(); futures::select! { - _ = keep_alive => { - warning!( - "No messages received within keep-alive ({:?}s) from server. Closing the connection", - self.keep_alive_duration - ); - return Some(Next::Command(ConnectionCommand::Close( - 4503, - "Service unavailable. keep-alive failure".to_string(), - ))); + command = next_keep_alive => { + let Some(command) = command else { + return self.keep_alive.report_timeout(); + }; + + return Some(Next::Command(command)); }, command = next_command => { let Some(command) = command else { @@ -186,6 +180,7 @@ impl ConnectionActor { return Some(Next::Command(command)); }, message = next_message => { + self.keep_alive_actor = Box::pin(self.keep_alive.run()); return Some(Next::Message(message?)); }, } @@ -261,12 +256,15 @@ impl Event { } } -fn delay_or_pending(duration: Option) -> impl FusedFuture { - async move { - match duration { - Some(duration) => futures_timer::Delay::new(duration).await, - None => pending::<()>().await, - } +impl KeepAliveSettings { + fn report_timeout(&self) -> Option { + warning!( + "No messages received within keep-alive ({:?}s) from server. Closing the connection", + self.interval.unwrap() + ); + Some(Next::Command(ConnectionCommand::Close( + 4503, + "Service unavailable. keep-alive failure".to_string(), + ))) } - .fuse() } diff --git a/src/next/builder.rs b/src/next/builder.rs index ce25ffb..8b3e44f 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -8,6 +8,7 @@ use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error}; use super::{ actor::ConnectionActor, connection::{Connection, Message}, + keepalive::KeepAliveSettings, Client, Subscription, }; @@ -30,7 +31,7 @@ pub struct ClientBuilder { payload: Option, subscription_buffer_size: Option, connection: Box, - keep_alive_duration: Option, + keep_alive: KeepAliveSettings, } impl super::Client { @@ -53,7 +54,7 @@ impl super::Client { payload: None, subscription_buffer_size: None, connection: Box::new(connection), - keep_alive_duration: None, + keep_alive: KeepAliveSettings::default(), } } } @@ -82,13 +83,21 @@ impl ClientBuilder { } } - /// Sets the duration within which if Client does not receive Ping messages - /// the connection will be closed - pub fn keep_alive_duration(self, new: Duration) -> Self { - ClientBuilder { - keep_alive_duration: Some(new), - ..self - } + /// Sets the interval between keep alives. + /// + /// Any incoming messages automatically reset this interval so keep alives may not be sent + /// on busy connections even if this is set. + pub fn keep_alive_interval(mut self, new: Duration) -> Self { + self.keep_alive.interval = Some(new); + self + } + + /// The number of keepalive retries before a connection is considered broken. + /// + /// This defaults to 3, but has no effect if `keep_alive_interval` is not called. + pub fn keep_alive_retries(mut self, count: usize) -> Self { + self.keep_alive.retries = count; + self } /// Initialise a Client and use it to run a single subscription @@ -157,8 +166,8 @@ impl ClientBuilder { let Self { payload, subscription_buffer_size, - keep_alive_duration, mut connection, + keep_alive, } = self; connection.send(Message::init(payload)).await?; @@ -207,7 +216,7 @@ impl ClientBuilder { let (command_sender, command_receiver) = mpsc::channel(5); - let actor = ConnectionActor::new(connection, command_receiver, keep_alive_duration); + let actor = ConnectionActor::new(connection, command_receiver, keep_alive); let client = Client::new_internal(command_sender, subscription_buffer_size.unwrap_or(5)); diff --git a/src/next/keepalive.rs b/src/next/keepalive.rs new file mode 100644 index 0000000..2f6f4a6 --- /dev/null +++ b/src/next/keepalive.rs @@ -0,0 +1,73 @@ +use std::{future::pending, time::Duration}; + +use futures::Stream; + +use crate::ConnectionCommand; + +#[derive(Clone)] +pub(super) struct KeepAliveSettings { + /// How often to send a keep alive ping + pub(super) interval: Option, + + /// How many pings can be sent without receiving a reply before + /// the connection is considered dropped + pub(super) retries: usize, +} + +impl Default for KeepAliveSettings { + fn default() -> Self { + Self { + interval: None, + retries: 3, + } + } +} + +enum KeepAliveState { + Running, + StartedKeepAlive, + TimingOut { failure_count: usize }, +} + +impl KeepAliveSettings { + pub(super) fn run(&self) -> impl Stream + 'static { + let settings = self.clone(); + + futures::stream::unfold(KeepAliveState::Running, move |mut state| async move { + match settings.interval { + Some(duration) => futures_timer::Delay::new(duration).await, + None => pending::<()>().await, + } + + match state { + KeepAliveState::Running => { + state = KeepAliveState::StartedKeepAlive; + } + KeepAliveState::StartedKeepAlive => { + state = KeepAliveState::TimingOut { failure_count: 0 }; + } + KeepAliveState::TimingOut { failure_count } => { + state = KeepAliveState::TimingOut { + failure_count: failure_count + 1, + }; + } + } + + if state.failure_count() > settings.retries { + // returning None aborts + return None; + } + + Some((ConnectionCommand::Ping, state)) + }) + } +} + +impl KeepAliveState { + pub fn failure_count(&self) -> usize { + match self { + KeepAliveState::Running | KeepAliveState::StartedKeepAlive => 0, + KeepAliveState::TimingOut { failure_count } => *failure_count, + } + } +} diff --git a/src/next/mod.rs b/src/next/mod.rs index afc45c7..da00f47 100644 --- a/src/next/mod.rs +++ b/src/next/mod.rs @@ -18,6 +18,7 @@ use crate::{ mod actor; mod builder; mod connection; +mod keepalive; mod stream; pub use self::{ @@ -130,6 +131,7 @@ pub(super) enum ConnectionCommand { sender: mpsc::Sender, id: usize, }, + Ping, Cancel(usize), Close(u16, String), }