From ec01e48e928dba07bd4353a2216f204abe60a60f Mon Sep 17 00:00:00 2001 From: Ralph Scharpf Date: Tue, 30 Nov 2021 21:50:02 +0100 Subject: [PATCH 1/2] Shutdown connection when socket gets disconnected. --- src/client/client.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/client/client.rs b/src/client/client.rs index 10e8157..431efa2 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -916,6 +916,8 @@ impl IoTask { match read { Err(Error::Disconnected) => { + // Drop Connection as socket disconnected itself + self.shutdown_conn().await; self.tx_recv_published.send(Err(Error::Disconnected)).await .map_err(Error::from_std_err)?; } From c9763041eb487d8e70b4a38c3477eff32b26db0a Mon Sep 17 00:00:00 2001 From: Ralph Scharpf Date: Fri, 18 Mar 2022 20:53:46 +0100 Subject: [PATCH 2/2] Added support for last will handling --- src/client/builder.rs | 12 ++++++++++++ src/client/client.rs | 13 ++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/client/builder.rs b/src/client/builder.rs index 78c8aa0..7336125 100644 --- a/src/client/builder.rs +++ b/src/client/builder.rs @@ -3,6 +3,7 @@ use crate::{ Client, ClientOptions, client::ConnectionMode, + value_types::Publish, KeepAlive, }, Error, Result, @@ -12,6 +13,7 @@ use crate::{ }; use url::Url; +use Publish as LastWill; #[cfg(any(feature = "tls", feature = "websocket"))] use ::rustls; @@ -37,6 +39,7 @@ pub struct ClientBuilder { connection_mode: ConnectionMode, automatic_connect: Option, connect_retry_delay: Option, + last_will: Option, } impl ClientBuilder { @@ -58,6 +61,7 @@ impl ClientBuilder { connection_mode: self.connection_mode.clone(), automatic_connect: self.automatic_connect.unwrap_or(true), connect_retry_delay: self.connect_retry_delay.unwrap_or(Duration::from_secs(30)), + last_will: self.last_will.clone(), }) } @@ -104,6 +108,14 @@ impl ClientBuilder { Ok(self) } + /// Set the last will testament topic + /// + /// Topic published by broker as connection to (this) client is lost + pub fn set_last_will(&mut self, lwt: &LastWill) -> &mut Self { + self.last_will = Some(lwt.to_owned()); + self + } + /// Set username to authenticate with. /// /// The default value is no username. diff --git a/src/client/client.rs b/src/client/client.rs index 431efa2..93608b7 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -74,6 +74,7 @@ use tokio::{ #[cfg(feature = "tls")] use tokio_rustls::{self, webpki::DNSNameRef, TlsConnector}; use url::Url; +use Publish as LastWill; /// An MQTT client. /// @@ -128,6 +129,7 @@ pub(crate) struct ClientOptions { pub(crate) connection_mode: ConnectionMode, pub(crate) automatic_connect: bool, pub(crate) connect_retry_delay: Duration, + pub(crate) last_will: Option, } impl fmt::Debug for ClientOptions { @@ -144,6 +146,7 @@ impl fmt::Debug for ClientOptions { .field("operation_timeout", &self.operation_timeout) .field("automatic_connect", &self.automatic_connect) .field("connect_retry_delay", &self.connect_retry_delay) + .field("last_will", &self.last_will) .finish() } } @@ -643,7 +646,12 @@ fn connect_packet(opts: &ClientOptions) -> Result { Some(cid) => cid.to_owned(), }, clean_session: true, // TODO - last_will: None, // TODO + last_will: opts.last_will.as_ref().map(|lwt| mqttrs::LastWill { + topic: lwt.topic().to_owned(), + message: lwt.payload().to_owned(), + qos: lwt.qos(), + retain: lwt.retain(), + }), username: opts.username.clone(), password: opts.password.clone(), })) @@ -746,7 +754,7 @@ impl IoTask { _ => panic!("Not reached"), }; let conn = connect_packet(&self.options)?; - debug!("IoTask: Sending connect packet"); + debug!("IoTask: Sending connect packet: {:?}", conn); Self::write_packet(&self.options, c, &conn).await?; let read = Self::read_packet(&mut c.stream, &mut c.read_buf, @@ -1141,7 +1149,6 @@ impl Default for ConnectionMode { } } - #[cfg(test)] mod test { use super::Client;