Skip to content

Commit

Permalink
use new ntex timer api
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Aug 28, 2021
1 parent ccafaf1 commit d67d7fe
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 96 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.5.0-b.10] - 2021-08-28

* use new ntex's timer api

## [codec-0.7.2] - 2021-08-23

* Add `.get_properties_mut()` helper method to some frames
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.5.0-b.9"
version = "0.5.0-b.10"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,8 +24,8 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.4.0-b.2"
ntex-amqp-codec = "0.7.0"
ntex = "0.4.0-b.4"
ntex-amqp-codec = "0.7.2"

bitflags = "1.2"
derive_more = "0.99"
Expand Down
19 changes: 11 additions & 8 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ntex::codec::{AsyncRead, AsyncWrite};
use ntex::framed::{Dispatcher as IoDispatcher, State as IoState, Timer};
use ntex::service::{fn_service, Service};
use ntex::time::Seconds;
use ntex::util::Ready;

use crate::codec::{AmqpCodec, AmqpFrame};
Expand All @@ -13,7 +14,7 @@ pub struct Client<Io, St = ()> {
state: IoState,
codec: AmqpCodec<AmqpFrame>,
connection: Connection,
keepalive: u16,
keepalive: Seconds,
remote_config: Configuration,
timer: Timer,
_st: State<St>,
Expand All @@ -29,7 +30,7 @@ where
state: IoState,
codec: AmqpCodec<AmqpFrame>,
connection: Connection,
keepalive: u16,
keepalive: Seconds,
remote_config: Configuration,
timer: Timer,
) -> Self {
Expand Down Expand Up @@ -80,16 +81,18 @@ where
self.connection,
fn_service(|_| Ready::<_, LinkError>::Ok(())),
fn_service(|_| Ready::<_, LinkError>::Ok(())),
self.remote_config.timeout_remote_secs(),
self.remote_config.timeout_remote_secs().into(),
)
.map(|_| Option::<AmqpFrame>::None);

let keepalive = if self.keepalive.non_zero() {
self.keepalive + Seconds(5)
} else {
Seconds::ZERO
};

IoDispatcher::new(self.io, self.codec, self.state, dispatcher, self.timer)
.keepalive_timeout(if self.keepalive == 0 {
0
} else {
self.keepalive + 5
})
.keepalive_timeout(keepalive)
.await
}
}
60 changes: 27 additions & 33 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::{future::Future, marker::PhantomData, time::Duration};
use std::{future::Future, marker::PhantomData};

use ntex::codec::{AsyncRead, AsyncWrite};
use ntex::connect::{self, Address, Connect};
use ntex::framed::{State, Timer};
use ntex::rt::time::delay_for;
use ntex::service::Service;
use ntex::util::{select, ByteString, Either};
use ntex::time::{timeout, Millis, Seconds};
use ntex::util::{ByteString, Either};

#[cfg(feature = "openssl")]
use ntex::connect::openssl::{OpensslConnector, SslConnector};

#[cfg(feature = "rustls")]
use ntex::connect::rustls::{ClientConfig, RustlsConnector};

use crate::codec::protocol::{Frame, Milliseconds, ProtocolId, SaslCode, SaslFrameBody, SaslInit};
use crate::codec::protocol::{Frame, ProtocolId, SaslCode, SaslFrameBody, SaslInit};
use crate::codec::{types::Symbol, AmqpCodec, AmqpFrame, ProtocolIdCodec, SaslFrame};
use crate::{error::ProtocolIdError, Configuration, Connection};

Expand All @@ -23,8 +23,8 @@ use super::{connection::Client, error::ConnectError, SaslAuth};
pub struct Connector<A, T> {
connector: T,
config: Configuration,
handshake_timeout: u16,
disconnect_timeout: u16,
handshake_timeout: Seconds,
disconnect_timeout: Seconds,
lw: u16,
read_hw: u16,
write_hw: u16,
Expand All @@ -38,13 +38,13 @@ impl<A> Connector<A, ()> {
pub fn new() -> Connector<A, connect::Connector<A>> {
Connector {
connector: connect::Connector::default(),
handshake_timeout: 0,
disconnect_timeout: 3,
handshake_timeout: Seconds::ZERO,
disconnect_timeout: Seconds(3),
lw: 1024,
read_hw: 8 * 1024,
write_hw: 8 * 1024,
config: Configuration::default(),
timer: Timer::with(Duration::from_secs(1)),
timer: Timer::new(Millis::ONE_SEC),
_t: PhantomData,
}
}
Expand Down Expand Up @@ -79,11 +79,11 @@ where
self.config.max_frame_size as usize
}

/// Set idle time-out for the connection in seconds.
/// Set idle time-out for the connection.
///
/// By default idle time-out is set to 120 seconds
pub fn idle_timeout(&mut self, timeout: u16) -> &mut Self {
self.config.idle_time_out = (timeout * 1000) as Milliseconds;
pub fn idle_timeout(&mut self, timeout: Seconds) -> &mut Self {
self.config.idle_time_out = (timeout.seconds() * 1000) as u32;
self
}

Expand All @@ -95,25 +95,25 @@ where
self
}

/// Set handshake timeout in milliseconds.
/// Set handshake timeout.
///
/// Handshake includes `connect` packet and response `connect-ack`.
/// By default handshake timeuot is disabled.
pub fn handshake_timeout(mut self, timeout: u16) -> Self {
self.handshake_timeout = timeout as u16;
pub fn handshake_timeout(mut self, timeout: Seconds) -> Self {
self.handshake_timeout = timeout;
self
}

/// Set client connection disconnect timeout in milliseconds.
/// Set client connection disconnect timeout.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(mut self, timeout: u16) -> Self {
self.disconnect_timeout = timeout as u16;
pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self {
self.disconnect_timeout = timeout;
self
}

Expand Down Expand Up @@ -191,15 +191,12 @@ where
&self,
address: A,
) -> impl Future<Output = Result<Client<T::Response>, ConnectError>> {
if self.handshake_timeout > 0 {
let fut = select(
delay_for(Duration::from_millis(self.handshake_timeout as u64)),
self._connect(address),
);
if self.handshake_timeout.non_zero() {
let fut = timeout(self.handshake_timeout, self._connect(address));
Either::Left(async move {
match fut.await {
Either::Left(_) => Err(ConnectError::HandshakeTimeout),
Either::Right(res) => res.map_err(From::from),
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
})
} else {
Expand Down Expand Up @@ -252,15 +249,12 @@ where
addr: A,
auth: SaslAuth,
) -> impl Future<Output = Result<Client<T::Response>, ConnectError>> {
if self.handshake_timeout > 0 {
let fut = select(
delay_for(Duration::from_millis(self.handshake_timeout as u64)),
self._connect_sasl(addr, auth),
);
if self.handshake_timeout.non_zero() {
let fut = timeout(self.handshake_timeout, self._connect_sasl(addr, auth));
Either::Left(async move {
match fut.await {
Either::Left(_) => Err(ConnectError::HandshakeTimeout),
Either::Right(res) => res.map_err(From::from),
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
})
} else {
Expand Down Expand Up @@ -445,7 +439,7 @@ where
state,
codec,
connection,
config.timeout_secs() as u16,
config.timeout_secs(),
remote_config,
timer,
);
Expand Down
35 changes: 14 additions & 21 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{cell, fmt, future::Future, marker, pin::Pin, task::Context, task::Poll, time};
use std::{cell, fmt, future::Future, marker, pin::Pin, task::Context, task::Poll};

use ntex::framed::DispatchItem;
use ntex::rt::time::{sleep, Sleep};
use ntex::service::Service;
use ntex::time::{sleep, Millis, Sleep};
use ntex::util::{Either, Ready};

use crate::codec::protocol::Frame;
Expand All @@ -17,12 +17,10 @@ pub(crate) struct Dispatcher<Sr, Ctl: Service> {
ctl_service: Ctl,
ctl_fut: cell::RefCell<Option<(ControlFrame, Pin<Box<Ctl::Future>>)>>,
shutdown: cell::Cell<bool>,
expire: cell::RefCell<Pin<Box<Sleep>>>,
idle_timeout: time::Duration,
expire: Sleep,
idle_timeout: Millis,
}

const ZERO: time::Duration = time::Duration::from_nanos(0);

impl<Sr, Ctl> Dispatcher<Sr, Ctl>
where
Sr: Service<Request = types::Message, Response = ()>,
Expand All @@ -36,33 +34,28 @@ where
sink: Connection,
service: Sr,
ctl_service: Ctl,
idle_timeout: usize,
idle_timeout: Millis,
) -> Self {
let idle_timeout = time::Duration::from_secs(idle_timeout as u64);

Dispatcher {
sink,
service,
ctl_service,
idle_timeout,
ctl_fut: cell::RefCell::new(None),
shutdown: cell::Cell::new(false),
expire: cell::RefCell::new(Box::pin(sleep(idle_timeout))),
expire: sleep(idle_timeout),
}
}

fn handle_idle_timeout(&self, cx: &mut Context<'_>) {
let idle_timeout = self.idle_timeout;
if idle_timeout > ZERO {
let mut expire = self.expire.borrow_mut();
if Pin::new(&mut *expire).poll(cx).is_ready() {
log::trace!("Send keep-alive ping, timeout: {:?} secs", idle_timeout);
self.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
expire
.as_mut()
.reset((time::Instant::now() + idle_timeout).into());
let _ = Pin::new(&mut *expire).poll(cx);
}
if self.idle_timeout.non_zero() && self.expire.poll_elapsed(cx).is_ready() {
log::trace!(
"Send keep-alive ping, timeout: {:?} secs",
self.idle_timeout
);
self.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
self.expire.reset(self.idle_timeout);
self.handle_idle_timeout(cx);
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate derive_more;
#[macro_use]
extern crate log;

use ntex::util::ByteString;
use ntex::{time::Seconds, util::ByteString};
use ntex_amqp_codec::protocol::{Handle, Milliseconds, Open, OpenInner};
use uuid::Uuid;

Expand Down Expand Up @@ -122,19 +122,19 @@ impl Configuration {
}))
}

pub(crate) fn timeout_secs(&self) -> usize {
pub(crate) fn timeout_secs(&self) -> Seconds {
if self.idle_time_out > 0 {
(self.idle_time_out / 1000) as usize
Seconds::checked_new((self.idle_time_out / 1000) as usize)
} else {
0
Seconds::ZERO
}
}

pub(crate) fn timeout_remote_secs(&self) -> usize {
pub(crate) fn timeout_remote_secs(&self) -> Seconds {
if self.idle_time_out > 0 {
((self.idle_time_out as f32) * 0.75 / 1000.0) as usize
Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize)
} else {
0
Seconds::ZERO
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/server/handshake.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::rc::Rc;

use ntex::codec::{AsyncRead, AsyncWrite};
use ntex::framed::State;
use ntex::{framed::State, time::Seconds};

use crate::codec::protocol::{Frame, Open};
use crate::codec::{AmqpCodec, AmqpFrame};
Expand Down Expand Up @@ -162,11 +162,11 @@ pub struct HandshakeAck<Io, St> {
io: Io,
sink: Connection,
state: State,
idle_timeout: usize,
idle_timeout: Seconds,
}

impl<Io, St> HandshakeAck<Io, St> {
pub(crate) fn into_inner(self) -> (St, Io, Connection, State, usize) {
pub(crate) fn into_inner(self) -> (St, Io, Connection, State, Seconds) {
(self.st, self.io, self.sink, self.state, self.idle_timeout)
}
}
Loading

0 comments on commit d67d7fe

Please sign in to comment.