From 50a897c59925770fea9b898f20a14599565ab636 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 27 Nov 2023 18:09:04 +0600 Subject: [PATCH] Do not handle transfers if connection is down --- CHANGES.md | 6 ++++++ Cargo.toml | 4 ++-- src/connection.rs | 2 +- src/dispatcher.rs | 14 +++++++++----- src/lib.rs | 19 ++++++++++++------- src/server/mod.rs | 3 ++- src/server/service.rs | 15 ++++++++++++--- 7 files changed, 44 insertions(+), 19 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a85c262..15dcbbe 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.8.6] - 2023-11-27 + +* Better server builder + +* Do not handle transfers if connection is down + ## [0.8.5] - 2023-11-12 * Update io diff --git a/Cargo.toml b/Cargo.toml index 8ed6938..3dc01df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "0.8.5" +version = "0.8.6" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -24,7 +24,7 @@ default = [] frame-trace = [] [dependencies] -ntex = "0.7.9" +ntex = "0.7.12" ntex-amqp-codec = "0.9.0" bitflags = "2.4" diff --git a/src/connection.rs b/src/connection.rs index c47f31f..a02c712 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -82,7 +82,7 @@ impl Connection { if inner.state != ConnectionState::Normal { return false; } - inner.error.is_none() + inner.error.is_none() && !inner.io.is_closed() } /// Get waiter for `on_close` event diff --git a/src/dispatcher.rs b/src/dispatcher.rs index dfa255e..b012af1 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -293,11 +293,15 @@ where match action { types::Action::Transfer(link) => { - return Either::Left(ServiceResult { - link: link.clone(), - fut: self.service.call(types::Message::Transfer(link)), - _t: marker::PhantomData, - }); + return if self.sink.is_opened() { + Either::Left(ServiceResult { + link: link.clone(), + fut: self.service.call(types::Message::Transfer(link)), + _t: marker::PhantomData, + }) + } else { + Either::Right(Ready::Ok(None)) + }; } types::Action::Flow(link, frm) => { // apply flow to specific link diff --git a/src/lib.rs b/src/lib.rs index 19b5f80..17eafdc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,7 +118,7 @@ impl Configuration { /// /// If max size is set to `0`, size is unlimited. /// By default max size is set to `0` - pub fn max_size(mut self, size: usize) -> Self { + pub fn max_size(&mut self, size: usize) -> &mut Self { self.max_size = size; self } @@ -126,7 +126,7 @@ impl Configuration { /// Set handshake timeout. /// /// By default handshake timeout is 5 seconds. - pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { + pub fn handshake_timeout(&mut self, timeout: Seconds) -> &mut Self { self.handshake_timeout = timeout; self } @@ -136,7 +136,7 @@ impl Configuration { /// To disable timeout set value to 0. /// /// By default keep-alive timeout is disabled. - pub fn keepalive_timeout(self, val: Seconds) -> Self { + pub fn keepalive_timeout(&mut self, val: Seconds) -> &mut Self { self.disp_config.set_keepalive_timeout(val); self } @@ -149,19 +149,24 @@ impl Configuration { /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(self, val: Seconds) -> Self { + pub fn disconnect_timeout(&mut self, val: Seconds) -> &mut Self { self.disp_config.set_disconnect_timeout(val); self } /// Set read rate parameters for single frame. /// - /// Set max timeout for reading single frame. If the client - /// sends `rate` amount of data, increase the timeout by 1 second for every. + /// Set read timeout, max timeout and rate for reading payload. If the client + /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds. /// But no more than `max_timeout` timeout. /// /// By default frame read rate is disabled. - pub fn frame_read_rate(self, timeout: Seconds, max_timeout: Seconds, rate: u16) -> Self { + pub fn frame_read_rate( + &mut self, + timeout: Seconds, + max_timeout: Seconds, + rate: u16, + ) -> &mut Self { self.disp_config .set_frame_read_rate(timeout, max_timeout, rate); self diff --git a/src/server/mod.rs b/src/server/mod.rs index 17169a7..386bc01 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,7 +6,8 @@ mod service; pub use self::error::{HandshakeError, ServerError}; pub use self::handshake::{Handshake, HandshakeAck, HandshakeAmqp, HandshakeAmqpOpened}; pub use self::sasl::Sasl; -pub use self::service::Server; +pub use self::service::{Server, ServerBuilder}; + pub use crate::codec::protocol::Transfer; pub use crate::control::{ControlFrame, ControlFrameKind}; pub use crate::error::{Error, LinkError}; diff --git a/src/server/service.rs b/src/server/service.rs index 1486e04..c700d5f 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -22,7 +22,7 @@ pub struct Server { pub struct ServerBuilder { handshake: H, control: Ctl, - config: Rc, + config: Configuration, _t: marker::PhantomData, } @@ -56,9 +56,9 @@ where H: ServiceFactory>, { ServerBuilder { + config, handshake: handshake.into_factory(), control: DefaultControlService::default(), - config: Rc::new(config), _t: marker::PhantomData, } } @@ -72,6 +72,15 @@ where Ctl::InitError: fmt::Debug, Error: From, { + /// Modify server configuration + pub fn config(mut self, f: F) -> Self + where + F: FnOnce(&mut Configuration), + { + f(&mut self.config); + self + } + /// Service to call with control frames pub fn control(self, service: F) -> ServerBuilder where @@ -99,7 +108,7 @@ where Server { handshake: self.handshake, inner: Rc::new(ServerInner { - config: self.config, + config: Rc::new(self.config), publish: service.into_factory(), control: self.control, _t: marker::PhantomData,