Skip to content

Commit

Permalink
Do not handle transfers if connection is down
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 27, 2023
1 parent 3f628a3 commit 50a897c
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [0.8.6] - 2023-11-27

* Better server builder

* Do not handle transfers if connection is down

## [0.8.5] - 2023-11-12

* Update io
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.5"
version = "0.8.6"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.7.9"
ntex = "0.7.12"
ntex-amqp-codec = "0.9.0"

bitflags = "2.4"
Expand Down
2 changes: 1 addition & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Connection {
if inner.state != ConnectionState::Normal {
return false;
}
inner.error.is_none()
inner.error.is_none() && !inner.io.is_closed()
}

/// Get waiter for `on_close` event
Expand Down
14 changes: 9 additions & 5 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,15 @@ where

match action {
types::Action::Transfer(link) => {
return Either::Left(ServiceResult {
link: link.clone(),
fut: self.service.call(types::Message::Transfer(link)),
_t: marker::PhantomData,
});
return if self.sink.is_opened() {
Either::Left(ServiceResult {
link: link.clone(),
fut: self.service.call(types::Message::Transfer(link)),
_t: marker::PhantomData,
})
} else {
Either::Right(Ready::Ok(None))
};
}
types::Action::Flow(link, frm) => {
// apply flow to specific link
Expand Down
19 changes: 12 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ impl Configuration {
///
/// If max size is set to `0`, size is unlimited.
/// By default max size is set to `0`
pub fn max_size(mut self, size: usize) -> Self {
pub fn max_size(&mut self, size: usize) -> &mut Self {
self.max_size = size;
self
}

/// Set handshake timeout.
///
/// By default handshake timeout is 5 seconds.
pub fn handshake_timeout(mut self, timeout: Seconds) -> Self {
pub fn handshake_timeout(&mut self, timeout: Seconds) -> &mut Self {
self.handshake_timeout = timeout;
self
}
Expand All @@ -136,7 +136,7 @@ impl Configuration {
/// To disable timeout set value to 0.
///
/// By default keep-alive timeout is disabled.
pub fn keepalive_timeout(self, val: Seconds) -> Self {
pub fn keepalive_timeout(&mut self, val: Seconds) -> &mut Self {
self.disp_config.set_keepalive_timeout(val);
self
}
Expand All @@ -149,19 +149,24 @@ impl Configuration {
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(self, val: Seconds) -> Self {
pub fn disconnect_timeout(&mut self, val: Seconds) -> &mut Self {
self.disp_config.set_disconnect_timeout(val);
self
}

/// Set read rate parameters for single frame.
///
/// Set max timeout for reading single frame. If the client
/// sends `rate` amount of data, increase the timeout by 1 second for every.
/// Set read timeout, max timeout and rate for reading payload. If the client
/// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
/// But no more than `max_timeout` timeout.
///
/// By default frame read rate is disabled.
pub fn frame_read_rate(self, timeout: Seconds, max_timeout: Seconds, rate: u16) -> Self {
pub fn frame_read_rate(
&mut self,
timeout: Seconds,
max_timeout: Seconds,
rate: u16,
) -> &mut Self {
self.disp_config
.set_frame_read_rate(timeout, max_timeout, rate);
self
Expand Down
3 changes: 2 additions & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ mod service;
pub use self::error::{HandshakeError, ServerError};
pub use self::handshake::{Handshake, HandshakeAck, HandshakeAmqp, HandshakeAmqpOpened};
pub use self::sasl::Sasl;
pub use self::service::Server;
pub use self::service::{Server, ServerBuilder};

pub use crate::codec::protocol::Transfer;
pub use crate::control::{ControlFrame, ControlFrameKind};
pub use crate::error::{Error, LinkError};
Expand Down
15 changes: 12 additions & 3 deletions src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Server<St, H, Ctl, Pb> {
pub struct ServerBuilder<St, H, Ctl> {
handshake: H,
control: Ctl,
config: Rc<Configuration>,
config: Configuration,
_t: marker::PhantomData<St>,
}

Expand Down Expand Up @@ -56,9 +56,9 @@ where
H: ServiceFactory<Handshake, Response = HandshakeAck<St>>,
{
ServerBuilder {
config,
handshake: handshake.into_factory(),
control: DefaultControlService::default(),
config: Rc::new(config),
_t: marker::PhantomData,
}
}
Expand All @@ -72,6 +72,15 @@ where
Ctl::InitError: fmt::Debug,
Error: From<Ctl::Error>,
{
/// Modify server configuration
pub fn config<F>(mut self, f: F) -> Self
where
F: FnOnce(&mut Configuration),
{
f(&mut self.config);
self
}

/// Service to call with control frames
pub fn control<F, S>(self, service: F) -> ServerBuilder<St, H, S>
where
Expand Down Expand Up @@ -99,7 +108,7 @@ where
Server {
handshake: self.handshake,
inner: Rc::new(ServerInner {
config: self.config,
config: Rc::new(self.config),
publish: service.into_factory(),
control: self.control,
_t: marker::PhantomData,
Expand Down

0 comments on commit 50a897c

Please sign in to comment.