diff --git a/CHANGES.md b/CHANGES.md index 06a64be..4948e77 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,13 @@ # Changes +## [0.5.0-b.5] - 2021-08-11 + +* Refactor server dispatch process + +## [codec-0.6.2] - 2021-08-11 + +* Add helper methods to Transfer type + ## [0.5.0-b.3] - 2021-08-10 * Add Session::connection() method, returns ref to Connection diff --git a/Cargo.toml b/Cargo.toml index ef204e4..72a0a62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "0.5.0-b.4" +version = "0.5.0-b.5" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -25,7 +25,7 @@ frame-trace = [] [dependencies] ntex = "0.4.0-b.1" -ntex-amqp-codec = "0.6.1" +ntex-amqp-codec = "0.6.2" bitflags = "1.2" derive_more = "0.99" diff --git a/codec/Cargo.toml b/codec/Cargo.toml index 4c51a9b..266ec2f 100644 --- a/codec/Cargo.toml +++ b/codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp-codec" -version = "0.6.1" +version = "0.6.2" description = "AMQP 1.0 Protocol Codec" authors = ["Nikolay Kim ", "Max Gortman ", "Mike Yagley "] license = "MIT/Apache-2.0" diff --git a/codec/src/protocol/mod.rs b/codec/src/protocol/mod.rs index 945bf1e..10c916b 100644 --- a/codec/src/protocol/mod.rs +++ b/codec/src/protocol/mod.rs @@ -5,7 +5,7 @@ use derive_more::From; use ntex_bytes::{BufMut, ByteString, Bytes, BytesMut}; use uuid::Uuid; -use super::codec::{self, DecodeFormatted, Encode}; +use super::codec::{self, Decode, DecodeFormatted, Encode}; use super::error::AmqpParseError; use super::message::Message; use super::types::*; @@ -253,18 +253,21 @@ impl TransferBody { } impl From for TransferBody { + #[inline] fn from(msg: Message) -> Self { Self::Message(Box::new(msg)) } } impl Encode for TransferBody { + #[inline] fn encoded_size(&self) -> usize { match self { TransferBody::Data(ref data) => data.len(), TransferBody::Message(ref data) => data.encoded_size(), } } + #[inline] fn encode(&self, dst: &mut BytesMut) { match *self { TransferBody::Data(ref data) => dst.put_slice(data), @@ -272,3 +275,22 @@ impl Encode for TransferBody { } } } + +impl Transfer { + #[inline] + pub fn get_body(&self) -> Option<&Bytes> { + match self.body { + Some(TransferBody::Data(ref b)) => Some(b), + _ => None, + } + } + + #[inline] + pub fn load_message(&self) -> Result { + if let Some(TransferBody::Data(ref b)) = self.body { + Ok(T::decode(b)?.1) + } else { + Err(AmqpParseError::UnexpectedType("body")) + } + } +} diff --git a/examples/server.rs b/examples/server.rs index 9b72563..c08a914 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -7,7 +7,7 @@ async fn server( ) -> Result< Box< dyn Service< - Request = server::Transfer<()>, + Request = server::Transfer, Response = server::Outcome, Error = AmqpError, Future = Ready, diff --git a/src/client/connection.rs b/src/client/connection.rs index f83d5aa..71644dc 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -16,7 +16,7 @@ pub struct Client { keepalive: u16, remote_config: Configuration, timer: Timer, - st: State, + _st: State, } impl Client @@ -41,7 +41,7 @@ where keepalive, remote_config, timer, - st: State::new(()), + _st: State::new(()), } } } @@ -68,7 +68,7 @@ where keepalive: self.keepalive, remote_config: self.remote_config, timer: self.timer, - st: State::new(st), + _st: State::new(st), } } @@ -77,9 +77,8 @@ where /// Default handler closes connection on any control message. pub async fn start_default(self) -> Result<(), DispatcherError> { let dispatcher = Dispatcher::new( - self.st, self.connection, - fn_service(|_| Ready::<_, LinkError>::Err(LinkError::force_detach())), + fn_service(|_| Ready::<_, LinkError>::Ok(())), fn_service(|_| Ready::<_, LinkError>::Ok(())), self.remote_config.timeout_remote_secs(), ) diff --git a/src/connection.rs b/src/connection.rs index 1642f6a..05a774d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -4,12 +4,11 @@ use ntex::channel::{condition::Condition, condition::Waiter, oneshot}; use ntex::framed::State; use ntex::util::{HashMap, Ready}; -use crate::cell::Cell; -use crate::codec::protocol::{Begin, Close, End, Error, Frame}; -use crate::codec::{AmqpCodec, AmqpCodecError, AmqpFrame}; -use crate::error::AmqpProtocolError; +use crate::codec::protocol::{Begin, Close, End, Error, Frame, Role}; +use crate::codec::{AmqpCodec, AmqpFrame}; use crate::session::{Session, SessionInner}; -use crate::Configuration; +use crate::sndlink::{SenderLink, SenderLinkInner}; +use crate::{cell::Cell, error::AmqpProtocolError, types::Action, Configuration}; #[derive(Clone)] pub struct Connection(pub(crate) Cell); @@ -154,62 +153,6 @@ impl Connection { } } - /// Get session by remote id. This method panics if session does not exists or in opening/closing state. - pub(crate) fn get_remote_session(&self, id: usize) -> Option> { - let inner = self.0.get_ref(); - inner.sessions_map.get(&(id as u16)).and_then(|token| { - inner.sessions.get(*token).and_then(|channel| { - if let ChannelState::Established(ref session) = channel { - Some(session.clone()) - } else { - None - } - }) - }) - } - - pub(crate) fn register_remote_session( - &self, - channel_id: u16, - begin: &Begin, - ) -> Result<(), AmqpCodecError> { - trace!("remote session opened: {:?}", channel_id); - - let cell = self.0.clone(); - let inner = self.0.get_mut(); - let entry = inner.sessions.vacant_entry(); - let token = entry.key(); - - let session = Cell::new(SessionInner::new( - token, - false, - Connection(cell), - token as u16, - begin.next_outgoing_id(), - begin.incoming_window(), - begin.outgoing_window(), - )); - entry.insert(ChannelState::Established(session)); - inner.sessions_map.insert(channel_id, token); - - let begin = Begin { - remote_channel: Some(channel_id), - next_outgoing_id: 1, - incoming_window: std::u32::MAX, - outgoing_window: begin.incoming_window(), - handle_max: std::u32::MAX, - offered_capabilities: None, - desired_capabilities: None, - properties: None, - }; - - inner - .state - .write() - .encode(AmqpFrame::new(token as u16, begin.into()), &inner.codec) - .map(|_| ()) - } - pub(crate) fn post_frame(&self, frame: AmqpFrame) { #[cfg(feature = "frame-trace")] log::trace!("outcoming: {:#?}", frame); @@ -246,6 +189,47 @@ impl ConnectionInner { } } + pub(crate) fn register_remote_session( + &mut self, + channel_id: u16, + begin: &Begin, + cell: &Cell, + ) -> Result<(), AmqpProtocolError> { + trace!("remote session opened: {:?}", channel_id); + + let entry = self.sessions.vacant_entry(); + let token = entry.key(); + + let session = Cell::new(SessionInner::new( + token, + false, + Connection(cell.clone()), + token as u16, + begin.next_outgoing_id(), + begin.incoming_window(), + begin.outgoing_window(), + )); + entry.insert(ChannelState::Established(session)); + self.sessions_map.insert(channel_id, token); + + let begin = Begin { + remote_channel: Some(channel_id), + next_outgoing_id: 1, + incoming_window: std::u32::MAX, + outgoing_window: begin.incoming_window(), + handle_max: std::u32::MAX, + offered_capabilities: None, + desired_capabilities: None, + properties: None, + }; + + self.state + .write() + .encode(AmqpFrame::new(token as u16, begin.into()), &self.codec) + .map(|_| ()) + .map_err(AmqpProtocolError::Codec) + } + pub(crate) fn complete_session_creation( &mut self, channel_id: u16, @@ -290,12 +274,15 @@ impl ConnectionInner { pub(crate) fn handle_frame( &mut self, frame: AmqpFrame, - ) -> Result, AmqpProtocolError> { - if let Frame::Empty = frame.performative() { - return Ok(None); + inner: &Cell, + ) -> Result { + let (channel_id, frame) = frame.into_parts(); + + if let Frame::Empty = frame { + return Ok(Action::None); } - if let Frame::Close(ref close) = frame.performative() { + if let Frame::Close(ref close) = frame { self.set_error(AmqpProtocolError::Closed(close.error.clone())); if self.st == ConnectionState::Closing { @@ -307,40 +294,38 @@ impl ConnectionInner { self.post_frame(AmqpFrame::new(0, close.into())); self.st = ConnectionState::RemoteClose; } - return Ok(None); + return Ok(Action::None); } if self.error.is_some() { error!("Connection closed but new framed is received: {:?}", frame); - return Ok(None); + return Ok(Action::None); } // get local session id - let state = if let Some(token) = self.sessions_map.get(&frame.channel_id()) { + let state = if let Some(token) = self.sessions_map.get(&channel_id) { if let Some(state) = self.sessions.get_mut(*token) { state } else { log::error!("Inconsistent internal state"); - let (id, frame) = frame.into_parts(); return Err(AmqpProtocolError::UnknownSession( - id as usize, + channel_id as usize, Box::new(frame), )); } } else { // we dont have channel info, only Begin frame is allowed on new channel - return if let Frame::Begin(ref begin) = frame.performative() { + return if let Frame::Begin(begin) = frame { // response Begin for open session if let Some(id) = begin.remote_channel() { - self.complete_session_creation(frame.channel_id(), id, begin); - Ok(None) + self.complete_session_creation(channel_id, id, &begin); } else { - Ok(Some(frame)) + self.register_remote_session(channel_id, &begin, inner)?; } + Ok(Action::None) } else { - let (id, frame) = frame.into_parts(); Err(AmqpProtocolError::UnknownSession( - id as usize, + channel_id as usize, Box::new(frame), )) }; @@ -349,53 +334,64 @@ impl ConnectionInner { // handle session frames match state { ChannelState::Opening(_, _) => { - error!("Unexpected opening state: {}", frame.channel_id()); - Err(AmqpProtocolError::UnexpectedOpeningState(Box::new( - frame.into_parts().1, - ))) + error!("Unexpected opening state: {}", channel_id); + Err(AmqpProtocolError::UnexpectedOpeningState(Box::new(frame))) } - ChannelState::Established(ref mut session) => match frame.performative() { + ChannelState::Established(ref mut session) => match frame { Frame::Attach(attach) => { let cell = session.clone(); - if session.get_mut().handle_attach(attach, cell) { - Ok(None) + if session.get_mut().handle_attach(&attach, cell) { + Ok(Action::None) } else { - Ok(Some(frame)) + match attach.role { + Role::Receiver => { + // remotly opened sender link + let link = SenderLink::new(Cell::new(SenderLinkInner::with( + &attach, + session.clone(), + ))); + Ok(Action::AttachSender(link, attach)) + } + Role::Sender => { + // receiver link + let link = session + .get_mut() + .attach_remote_receiver_link(session.clone(), attach); + Ok(Action::AttachReceiver(link)) + } + } } } - Frame::Flow(_) | Frame::Detach(_) => Ok(Some(frame)), + // Frame::Detach(frm) => Ok(Action::Detach(frm)), Frame::End(remote_end) => { - trace!("Remote session end: {}", frame.channel_id()); + trace!("Remote session end: {}", channel_id); let end = End { error: None }; session .get_mut() .set_error(AmqpProtocolError::SessionEnded(remote_end.error.clone())); let id = session.get_mut().id(); self.post_frame(AmqpFrame::new(id, end.into())); - if let Some(token) = self.sessions_map.remove(&frame.channel_id()) { + if let Some(token) = self.sessions_map.remove(&channel_id) { self.sessions.remove(token); } - Ok(None) - } - _ => { - session.get_mut().handle_frame(frame.into_parts().1); - Ok(None) + Ok(Action::None) } + _ => session.get_mut().handle_frame(frame), }, - ChannelState::Closing(ref mut tx) => match frame.performative() { + ChannelState::Closing(ref mut tx) => match frame { Frame::End(frm) => { trace!("Session end is confirmed: {:?}", frm); if let Some(tx) = tx.take() { let _ = tx.send(Ok(())); } - if let Some(token) = self.sessions_map.remove(&frame.channel_id()) { + if let Some(token) = self.sessions_map.remove(&channel_id) { self.sessions.remove(token); } - Ok(None) + Ok(Action::None) } frm => { trace!("Got frame after initiated session end: {:?}", frm); - Ok(None) + Ok(Action::None) } }, } diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 0004f55..8d803b0 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -1,20 +1,17 @@ -use std::{cell::RefCell, fmt, future::Future, pin::Pin, task::Context, task::Poll, time}; +use std::{cell::RefCell, fmt, future::Future, marker, pin::Pin, task::Context, task::Poll, time}; use ntex::framed::DispatchItem; use ntex::rt::time::{sleep, Sleep}; use ntex::service::Service; -use ntex::util::Ready; +use ntex::util::{Either, Ready}; -use crate::cell::Cell; -use crate::codec::protocol::{Frame, Role}; +use crate::codec::protocol::Frame; use crate::codec::{AmqpCodec, AmqpFrame}; use crate::error::{AmqpProtocolError, DispatcherError, Error}; -use crate::sndlink::{SenderLink, SenderLinkInner}; -use crate::{connection::Connection, types, ControlFrame, ControlFrameKind, State}; +use crate::{connection::Connection, types, ControlFrame, ControlFrameKind, ReceiverLink}; /// Amqp server dispatcher service. -pub(crate) struct Dispatcher { - state: State, +pub(crate) struct Dispatcher { sink: Connection, service: Sr, ctl_service: Ctl, @@ -24,9 +21,9 @@ pub(crate) struct Dispatcher { idle_timeout: usize, } -impl Dispatcher +impl Dispatcher where - Sr: Service, Response = ()>, + Sr: Service, Sr::Error: 'static, Sr::Future: 'static, Ctl: Service, @@ -34,7 +31,6 @@ where Error: From + From, { pub(crate) fn new( - state: State, sink: Connection, service: Sr, ctl_service: Ctl, @@ -42,7 +38,6 @@ where ) -> Self { Dispatcher { sink, - state, service, ctl_service, idle_timeout, @@ -119,14 +114,13 @@ where match frame.0.get_mut().kind { ControlFrameKind::AttachReceiver(ref link) => { let link = link.clone(); - let fut = self - .service - .call(types::Link::new(link.clone(), self.state.clone())); + let fut = self.service.call(types::Message::Attached(link.clone())); ntex::rt::spawn(async move { - let res = fut.await; - match res { - Ok(_) => link.close().await, - Err(err) => link.close_with_error(Error::from(err)).await, + if let Err(err) = fut.await { + let _ = link.close_with_error(Error::from(err)).await; + } else { + link.confirm_receiver_link(); + link.set_link_credit(50); } }); } @@ -136,14 +130,14 @@ where .get_mut() .attach_remote_sender_link(frm, link.inner.clone()); } - ControlFrameKind::Flow(ref frm, _) => { - frame.session_cell().get_mut().apply_flow(frm); + ControlFrameKind::Flow(ref frm, ref link) => { + frame.session_cell().get_mut().handle_flow(frm, Some(&link)); } - ControlFrameKind::DetachSender(ref mut frm, _) => { - frame.session_cell().get_mut().handle_detach(frm); + ControlFrameKind::DetachSender(_, _) => { + // frame.session_cell().get_mut().handle_detach(frm); } - ControlFrameKind::DetachReceiver(ref mut frm, _) => { - frame.session_cell().get_mut().handle_detach(frm); + ControlFrameKind::DetachReceiver(_, _) => { + // frame.session_cell().get_mut().handle_detach(frm); } ControlFrameKind::ProtocolError(ref err) => return Err(err.clone().into()), ControlFrameKind::Closed(_) => (), @@ -153,9 +147,9 @@ where } } -impl Service for Dispatcher +impl Service for Dispatcher where - Sr: Service, Response = ()>, + Sr: Service, Sr::Error: fmt::Debug + 'static, Sr::Future: 'static, Ctl: Service, @@ -166,20 +160,23 @@ where type Request = DispatchItem>; type Response = (); type Error = DispatcherError; - type Future = Ready; + type Future = Either, Ready>; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + // idle ttimeout + self.handle_idle_timeout(cx); + // process control frame let res0 = !self.handle_control_fut(cx)?; // check readiness let res1 = self.service.poll_ready(cx).map_err(|err| { - error!("Error during publish service readiness check: {:?}", err); + error!("Publish service readiness check failed: {:?}", err); let _ = self.sink.close_with_error(err); DispatcherError::Service })?; let res2 = self.ctl_service.poll_ready(cx).map_err(|err| { - error!("Error during control service readiness check: {:?}", err); + error!("Control service readiness check failed: {:?}", err); let _ = self.sink.close_with_error(err); DispatcherError::Service })?; @@ -220,142 +217,129 @@ where fn call(&self, request: Self::Request) -> Self::Future { match request { DispatchItem::Item(frame) => { - #[cfg(feature = "frame-trace")] + // #[cfg(feature = "frame-trace")] log::trace!("incoming: {:#?}", frame); - let item = try_ready_err!(self + let action = match self .sink .0 .get_mut() - .handle_frame(frame) - .map_err(DispatcherError::Protocol)); - let frame = if let Some(item) = item { - item - } else { - return Ready::Ok(()); + .handle_frame(frame, &self.sink.0) + .map_err(DispatcherError::Protocol) + { + Ok(a) => a, + Err(e) => return Either::Right(Ready::Err(e)), }; - let (channel_id, frame) = frame.into_parts(); - - // remote session - if let Frame::Begin(frm) = frame { - return Ready::from( - self.sink - .register_remote_session(channel_id, &frm) - .map_err(DispatcherError::Codec), - ); - } - - let id = channel_id as usize; - let session = match self.sink.get_remote_session(id) { - Some(session) => session, - None => { - return Ready::from(Err(AmqpProtocolError::UnknownSession( - id, - Box::new(frame), - ) - .into())) + match action { + types::Action::Transfer(link) => { + return Either::Left(ServiceResult { + link: link.clone(), + fut: self.service.call(types::Message::Transfer(link)), + _t: marker::PhantomData, + }); } - }; - - let result = match frame { - Frame::Flow(frm) => { + types::Action::Flow(link, frm) => { // apply flow to specific link - if let Some(link_id) = frm.handle { - // TODO: close session if link is not found - if let Some(link) = session.get_sender_link_by_handle(link_id) { - let frame = ControlFrame::new( - session.clone(), - ControlFrameKind::Flow(frm, link.clone()), - ); - *self.ctl_fut.borrow_mut() = - Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); - return Ready::from(Ok(())); - } - } - session.get_mut().apply_flow(&frm); - Ok(()) + let frame = ControlFrame::new( + link.session().inner.clone(), + ControlFrameKind::Flow(frm, link.clone()), + ); + *self.ctl_fut.borrow_mut() = + Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); } - Frame::Attach(attach) => { - match attach.role { - Role::Receiver => { - // remotly opened sender link - let link = SenderLink::new(Cell::new(SenderLinkInner::with( - &attach, - session.clone(), - ))); - let frame = ControlFrame::new( - session, - ControlFrameKind::AttachSender(Box::new(attach), link), - ); - - *self.ctl_fut.borrow_mut() = - Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); - Ok(()) - } - Role::Sender => { - // receiver link - let link = session - .get_mut() - .attach_remote_receiver_link(session.clone(), attach); + types::Action::AttachSender(link, frame) => { + let frame = ControlFrame::new( + link.session().inner.clone(), + ControlFrameKind::AttachSender(Box::new(frame), link), + ); - let frame = ControlFrame::new( - session, - ControlFrameKind::AttachReceiver(link), - ); - *self.ctl_fut.borrow_mut() = - Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); - Ok(()) - } - } + *self.ctl_fut.borrow_mut() = + Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); } - Frame::Detach(frm) => { - if let Some(link) = session.get_sender_link_by_handle(frm.handle) { - let frame = ControlFrame::new( - session.clone(), - ControlFrameKind::DetachSender(frm, link.clone()), - ); - *self.ctl_fut.borrow_mut() = - Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); - } else if let Some(link) = session.get_receiver_link_by_handle(frm.handle) { - let frame = ControlFrame::new( - session.clone(), - ControlFrameKind::DetachReceiver(frm, link.clone()), - ); - *self.ctl_fut.borrow_mut() = - Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); - } else { - session.get_mut().handle_frame(Frame::Detach(frm)); - } - Ok(()) + types::Action::AttachReceiver(link) => { + let frame = ControlFrame::new( + link.session().inner.clone(), + ControlFrameKind::AttachReceiver(link), + ); + *self.ctl_fut.borrow_mut() = + Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); + } + types::Action::DetachSender(link, frm) => { + let frame = ControlFrame::new( + link.session().inner.clone(), + ControlFrameKind::DetachSender(frm, link.clone()), + ); + *self.ctl_fut.borrow_mut() = + Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); } - _ => Err(AmqpProtocolError::Unexpected(Box::new(frame)).into()), + types::Action::DetachReceiver(link, frm) => { + let frame = ControlFrame::new( + link.session().inner.clone(), + ControlFrameKind::DetachReceiver(frm, link.clone()), + ); + *self.ctl_fut.borrow_mut() = + Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); + } + types::Action::None => (), }; - Ready::from(result) + Either::Right(Ready::Ok(())) } DispatchItem::EncoderError(err) | DispatchItem::DecoderError(err) => { let frame = ControlFrame::new_kind(ControlFrameKind::ProtocolError(err.into())); *self.ctl_fut.borrow_mut() = Some((frame.clone(), Box::pin(self.ctl_service.call(frame)))); - Ready::from(Ok(())) + Either::Right(Ready::Ok(())) } DispatchItem::KeepAliveTimeout => { self.sink .0 .get_mut() .set_error(AmqpProtocolError::KeepAliveTimeout); - Ready::from(Ok(())) + Either::Right(Ready::Ok(())) } DispatchItem::IoError(_) => { self.sink .0 .get_mut() .set_error(AmqpProtocolError::Disconnected); - Ready::from(Ok(())) + Either::Right(Ready::Ok(())) } DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => { - Ready::from(Ok(())) + Either::Right(Ready::Ok(())) + } + } + } +} + +pin_project_lite::pin_project! { + pub struct ServiceResult { + #[pin] + fut: F, + link: ReceiverLink, + _t: marker::PhantomData, + } +} + +impl Future for ServiceResult +where + F: Future>, + E: fmt::Debug, + Error: From, +{ + type Output = Result<(), DispatcherError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + match this.fut.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => { + log::trace!("Service error {:?}", e); + let _ = this.link.close_with_error(e); + Poll::Ready(Ok::<_, DispatcherError>(())) } } } diff --git a/src/error.rs b/src/error.rs index dd9004a..538cdd1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -41,6 +41,12 @@ pub enum AmqpProtocolError { Disconnected, #[display(fmt = "Unknown session: {} {:?}", _0, _1)] UnknownSession(usize, Box), + #[display(fmt = "Session {}, Unknown link: {} {:?}", session, link_handle, frame)] + UnknownLink { + session: usize, + link_handle: u32, + frame: Box, + }, #[display(fmt = "Connection closed, error: {:?}", _0)] Closed(Option), #[display(fmt = "Session ended, error: {:?}", _0)] @@ -49,8 +55,8 @@ pub enum AmqpProtocolError { LinkDetached(Option), #[display(fmt = "Unexpected frame for opening state, got: {:?}", _0)] UnexpectedOpeningState(Box), - #[display(fmt = "Unexpected frame, got: {:?}", _0)] - Unexpected(Box), + #[display(fmt = "{}, Unexpected frame: {:?}", _0, _1)] + Unexpected(&'static str, Box), } impl From for AmqpProtocolError { diff --git a/src/hb.rs b/src/hb.rs deleted file mode 100644 index bf93b86..0000000 --- a/src/hb.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use ntex::rt::time::{sleep_until, Instant, Sleep}; -use ntex::util::time::LowResTimeService; - -pub(crate) enum HeartbeatAction { - None, - Heartbeat, - Close, -} - -pub(crate) struct Heartbeat { - expire_local: Instant, - expire_remote: Instant, - local: Duration, - remote: Option, - time: LowResTimeService, - delay: Pin>, -} - -impl Heartbeat { - pub(crate) fn new(local: Duration, remote: Option, time: LowResTimeService) -> Self { - let now = Instant::from_std(time.now()); - let delay = if let Some(remote) = remote { - Box::pin(sleep_until(now + std::cmp::min(local, remote))) - } else { - Box::pin(sleep_until(now + local)) - }; - - Heartbeat { - expire_local: now, - expire_remote: now, - local, - remote, - time, - delay, - } - } - - pub(crate) fn update_local(&mut self, update: bool) { - if update { - self.expire_local = Instant::from_std(self.time.now()); - } - } - - pub(crate) fn update_remote(&mut self, update: bool) { - if update && self.remote.is_some() { - self.expire_remote = Instant::from_std(self.time.now()); - } - } - - fn next_expire(&self) -> Instant { - if let Some(remote) = self.remote { - let t1 = self.expire_local + self.local; - let t2 = self.expire_remote + remote; - if t1 < t2 { - t1 - } else { - t2 - } - } else { - self.expire_local + self.local - } - } - - pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> HeartbeatAction { - match Pin::new(&mut self.delay).poll(cx) { - Poll::Ready(_) => { - let mut act = HeartbeatAction::None; - let dl = self.delay.deadline(); - if dl >= self.expire_local + self.local { - // close connection - return HeartbeatAction::Close; - } - if let Some(remote) = self.remote { - if dl >= self.expire_remote + remote { - // send heartbeat - act = HeartbeatAction::Heartbeat; - } - } - let expire = self.next_expire(); - self.delay.as_mut().reset(expire); - let _ = Pin::new(&mut self.delay).poll(cx); - act - } - Poll::Pending => HeartbeatAction::None, - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 74490a2..db3e00c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,9 +10,6 @@ use ntex::util::ByteString; use ntex_amqp_codec::protocol::{Handle, Milliseconds, Open}; use uuid::Uuid; -#[macro_use] -mod utils; - mod cell; pub mod client; mod connection; @@ -21,7 +18,6 @@ mod default; mod dispatcher; pub mod error; pub mod error_code; -mod hb; mod rcvlink; mod router; pub mod server; diff --git a/src/rcvlink.rs b/src/rcvlink.rs index 70740ff..c7cf427 100644 --- a/src/rcvlink.rs +++ b/src/rcvlink.rs @@ -1,23 +1,35 @@ -use std::{collections::VecDeque, future::Future, pin::Pin, task::Context, task::Poll}; +use std::{collections::VecDeque, future::Future, hash, pin::Pin, task::Context, task::Poll}; use ntex::util::{ByteString, BytesMut}; -use ntex::Stream; -use ntex::{channel::oneshot, task::LocalWaker}; +use ntex::{channel::oneshot, task::LocalWaker, Stream}; use ntex_amqp_codec::protocol::{ Attach, DeliveryNumber, Disposition, Error, Handle, LinkError, ReceiverSettleMode, Role, SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, Transfer, TransferBody, }; use ntex_amqp_codec::Encode; -use crate::cell::Cell; -use crate::error::AmqpProtocolError; use crate::session::{Session, SessionInner}; +use crate::{cell::Cell, error::AmqpProtocolError, types::Action}; #[derive(Clone, Debug)] pub struct ReceiverLink { pub(crate) inner: Cell, } +impl Eq for ReceiverLink {} + +impl PartialEq for ReceiverLink { + fn eq(&self, other: &ReceiverLink) -> bool { + (self.inner.get_ref() as *const _ as usize) == (other.inner.get_ref() as *const _ as usize) + } +} + +impl hash::Hash for ReceiverLink { + fn hash(&self, state: &mut H) { + (self.inner.get_ref() as *const _ as usize).hash(state); + } +} + impl ReceiverLink { pub(crate) fn new(inner: Cell) -> ReceiverLink { ReceiverLink { inner } @@ -43,7 +55,7 @@ impl ReceiverLink { &self.inner.get_ref().attach } - pub(crate) fn open(&mut self) { + pub(crate) fn confirm_receiver_link(&self) { let inner = self.inner.get_mut(); inner .session @@ -63,6 +75,16 @@ impl ReceiverLink { self.inner.get_mut().set_max_partial_transfer(size); } + /// Check transfer frame + pub fn has_transfers(&self) -> bool { + !self.inner.get_mut().queue.is_empty() + } + + /// Get transfer frame + pub fn get_transfer(&self) -> Option { + self.inner.get_mut().queue.pop_front() + } + /// Send disposition frame pub fn send_disposition(&self, disp: Disposition) { self.inner @@ -100,7 +122,7 @@ impl ReceiverLink { let inner = self.inner.get_mut(); inner.closed = true; inner.error = error; - inner.reader_task.wake(); + inner.wake(); } } @@ -161,17 +183,21 @@ impl ReceiverLinkInner { handle, session: Session::new(session), closed: false, - reader_task: LocalWaker::new(), queue: VecDeque::with_capacity(4), credit: 0, error: None, partial_body: None, partial_body_max: 262_144, delivery_count: attach.initial_delivery_count().unwrap_or(0), + reader_task: LocalWaker::new(), attach, } } + fn wake(&self) { + self.reader_task.wake(); + } + pub(crate) fn detached(&mut self) { // drop pending transfers self.queue.clear(); @@ -191,7 +217,7 @@ impl ReceiverLinkInner { .get_mut() .detach_receiver_link(self.handle, true, error, tx); } - self.reader_task.wake(); + self.wake(); async move { match rx.await { @@ -215,7 +241,11 @@ impl ReceiverLinkInner { } #[allow(clippy::unnecessary_unwrap)] - pub(crate) fn handle_transfer(&mut self, mut transfer: Transfer) { + pub(crate) fn handle_transfer( + &mut self, + mut transfer: Transfer, + inner: &Cell, + ) -> Result { if self.credit == 0 { // check link credit let err = Error { @@ -224,6 +254,7 @@ impl ReceiverLinkInner { info: None, }; let _ = self.close(Some(err)); + Ok(Action::None) } else { self.credit -= 1; @@ -242,7 +273,7 @@ impl ReceiverLinkInner { info: None, }; let _ = self.close(Some(err)); - return; + return Ok(Action::None); } } @@ -255,7 +286,7 @@ impl ReceiverLinkInner { info: None, }; let _ = self.close(Some(err)); - return; + return Ok(Action::None); } transfer_body.encode(body); @@ -269,8 +300,11 @@ impl ReceiverLinkInner { self.queue.back_mut().unwrap().body = Some(TransferBody::Data(partial_body.unwrap().freeze())); if self.queue.len() == 1 { - self.reader_task.wake() + self.wake(); } + Ok(Action::Transfer(ReceiverLink { + inner: inner.clone(), + })) } else { log::error!("Inconsistent state, bug"); let err = Error { @@ -279,7 +313,10 @@ impl ReceiverLinkInner { info: None, }; let _ = self.close(Some(err)); + Ok(Action::None) } + } else { + Ok(Action::None) } } else if transfer.more { if transfer.delivery_id.is_none() { @@ -289,6 +326,7 @@ impl ReceiverLinkInner { info: None, }; let _ = self.close(Some(err)); + Ok(Action::None) } else { let body = if let Some(body) = transfer.body.take() { match body { @@ -304,13 +342,17 @@ impl ReceiverLinkInner { }; self.partial_body = Some(body); self.queue.push_back(transfer); + Ok(Action::None) } } else { self.delivery_count += 1; self.queue.push_back(transfer); if self.queue.len() == 1 { - self.reader_task.wake() + self.wake(); } + Ok(Action::Transfer(ReceiverLink { + inner: inner.clone(), + })) } } } diff --git a/src/router.rs b/src/router.rs index 4cb2b7b..2205b32 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,17 +1,18 @@ -use std::task::{Context, Poll}; -use std::{convert::TryFrom, future::Future, marker::PhantomData, pin::Pin}; +use std::{convert::TryFrom, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll}; use ntex::router::{IntoPattern, Router as PatternRouter}; use ntex::service::{boxed, fn_factory_with_config, IntoServiceFactory, Service, ServiceFactory}; -use ntex::util::{Either, Ready}; -use ntex::Stream; +use ntex::util::{Either, HashMap, Ready}; -use crate::codec::protocol::{DeliveryNumber, DeliveryState, Disposition, Error, Rejected, Role}; +use crate::codec::protocol::{ + DeliveryNumber, DeliveryState, Disposition, Error, Rejected, Role, Transfer, +}; use crate::error::LinkError; -use crate::types::{Link, Outcome, Transfer}; +use crate::types::{Link, Message, Outcome}; use crate::{cell::Cell, rcvlink::ReceiverLink, State}; -type Handle = boxed::BoxServiceFactory, Transfer, Outcome, Error, Error>; +type Handle = boxed::BoxServiceFactory, Transfer, Outcome, Error, Error>; +type HandleService = boxed::BoxService; pub struct Router(Vec<(Vec, Handle)>); @@ -30,7 +31,7 @@ impl Router { where T: IntoPattern, F: IntoServiceFactory, - U: ServiceFactory, Request = Transfer, Response = Outcome>, + U: ServiceFactory, Request = Transfer, Response = Outcome>, Error: From + From, Outcome: TryFrom, { @@ -46,7 +47,7 @@ impl Router { self, ) -> impl ServiceFactory< Config = State, - Request = Link, + Request = Message, Response = (), Error = Error, InitError = std::convert::Infallible, @@ -55,22 +56,28 @@ impl Router { for (addr, hnd) in self.0 { router.path(addr, hnd); } - let router = Cell::new(router.finish()); + let router = Rc::new(router.finish()); - fn_factory_with_config(move |_: State| { - Ready::Ok(RouterService { + fn_factory_with_config(move |state: State| { + Ready::Ok(RouterService(Cell::new(RouterServiceInner { + state, router: router.clone(), - }) + handlers: HashMap::default(), + }))) }) } } -struct RouterService { - router: Cell>>, +struct RouterService(Cell>); + +struct RouterServiceInner { + state: State, + router: Rc>>, + handlers: HashMap>, } impl Service for RouterService { - type Request = Link; + type Request = Message; type Response = (); type Error = Error; type Future = Either, RouterServiceResponse>; @@ -80,59 +87,76 @@ impl Service for RouterService { Poll::Ready(Ok(())) } - fn call(&self, mut link: Link) -> Self::Future { - let path = link - .frame() - .target - .as_ref() - .and_then(|target| target.address.as_ref().cloned()); - - if let Some(path) = path { - link.path_mut().set(path); - if let Some((hnd, _info)) = self.router.recognize(link.path_mut()) { - trace!("Create handler service for {}", link.path().get_ref()); - let fut = hnd.new_service(link.clone()); - Either::Right(RouterServiceResponse { - link: link.link.clone(), - app_state: link.state.clone(), - state: RouterServiceResponseState::NewService(fut), - }) - } else { - trace!( - "Target address is not recognized: {}", - link.path().get_ref() - ); - Either::Left(Ready::Err( - LinkError::force_detach() - .description(format!( - "Target address is not supported: {}", + fn call(&self, msg: Message) -> Self::Future { + match msg { + Message::Attached(link) => { + let path = link + .frame() + .target + .as_ref() + .and_then(|target| target.address.as_ref().cloned()); + + if let Some(path) = path { + let inner = self.0.get_mut(); + let mut link = Link::new(link, inner.state.clone(), path); + if let Some((hnd, _info)) = inner.router.recognize(link.path_mut()) { + trace!("Create handler service for {}", link.path().get_ref()); + let fut = hnd.new_service(link.clone()); + inner.handlers.insert(link.receiver().clone(), None); + Either::Right(RouterServiceResponse { + inner: self.0.clone(), + link: link.link.clone(), + state: RouterServiceResponseState::NewService(fut), + }) + } else { + trace!( + "Target address is not recognized: {}", link.path().get_ref() + ); + Either::Left(Ready::Err( + LinkError::force_detach() + .description(format!( + "Target address is not supported: {}", + link.path().get_ref() + )) + .into(), )) - .into(), - )) + } + } else { + Either::Left(Ready::Err( + LinkError::force_detach() + .description("Target address is required") + .into(), + )) + } + } + Message::Transfer(link) => { + if let Some(Some(_)) = self.0.get_ref().handlers.get(&link) { + if let Some(tr) = link.get_transfer() { + return Either::Right(RouterServiceResponse { + inner: self.0.clone(), + link, + state: RouterServiceResponseState::Service(Some(tr)), + }); + } + } + Either::Left(Ready::Ok(())) } - } else { - Either::Left(Ready::Err( - LinkError::force_detach() - .description("Target address is required") - .into(), - )) } } } struct RouterServiceResponse { + inner: Cell>, link: ReceiverLink, - app_state: State, - state: RouterServiceResponseState, + state: RouterServiceResponseState, } -enum RouterServiceResponseState { - Service(boxed::BoxService, Outcome, Error>), +enum RouterServiceResponseState { + Service(Option), + Transfer(Pin>>>, u32), NewService( - Pin< - Box, Outcome, Error>, Error>>>, - >, + Pin, Error>>>>, ), } @@ -141,18 +165,72 @@ impl Future for RouterServiceResponse { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut(); - let mut link = this.link.clone(); - let app_state = this.app_state.clone(); + let link = this.link.clone(); + let inner = this.inner.clone(); loop { match this.state { - RouterServiceResponseState::Service(ref mut srv) => { - // check readiness - match srv.poll_ready(cx) { - Poll::Ready(Ok(_)) => (), - Poll::Pending => { + RouterServiceResponseState::Service(ref mut tr) => { + if let Some(Some(srv)) = inner.handlers.get(&link) { + // check readiness + match srv.poll_ready(cx) { + Poll::Ready(Ok(_)) => (), + Poll::Pending => { + log::trace!( + "Handler service is not ready for {}", + this.link + .frame() + .target + .as_ref() + .map(|t| t + .address + .as_ref() + .map(|s| s.as_ref()) + .unwrap_or("")) + .unwrap_or("") + ); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + log::trace!("Service readiness check failed: {:?}", e); + let _ = this.link.close_with_error( + LinkError::force_detach().description(format!("error: {}", e)), + ); + return Poll::Ready(Ok(())); + } + } + + let tr = tr.take().unwrap(); + let delivery_id = match tr.delivery_id { + None => { + // #2.7.5 delivery_id MUST be set. batching is handled on lower level + let _ = this.link.close_with_error( + LinkError::force_detach() + .description("delivery_id MUST be set"), + ); + return Poll::Ready(Ok(())); + } + Some(delivery_id) => { + if this.link.credit() == 0 { + // self.has_credit = self.link.credit() != 0; + this.link.set_link_credit(50); + } + delivery_id + } + }; + + this.state = + RouterServiceResponseState::Transfer(srv.call(tr), delivery_id); + } else { + return Poll::Ready(Ok(())); + } + } + RouterServiceResponseState::Transfer(ref mut fut, delivery_id) => { + match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(outcome)) => { log::trace!( - "Handler service is not ready for {}", + "Outcome is ready {:?} for {}", + outcome, this.link .frame() .target @@ -160,78 +238,22 @@ impl Future for RouterServiceResponse { .map(|t| t.address.as_ref().map(|s| s.as_ref()).unwrap_or("")) .unwrap_or("") ); - return Poll::Pending; + settle(&mut this.link, delivery_id, outcome.into_delivery_state()); } + Poll::Pending => return Poll::Pending, Poll::Ready(Err(e)) => { - log::trace!("Service readiness check failed: {:?}", e); - let _ = this.link.close_with_error( - LinkError::force_detach().description(format!("error: {}", e)), + log::trace!("Service response error: {:?}", e); + settle( + &mut this.link, + delivery_id, + DeliveryState::Rejected(Rejected { error: Some(e) }), ); - return Poll::Ready(Ok(())); } } - - match Pin::new(&mut link).poll_next(cx) { - Poll::Ready(Some(Ok(transfer))) => { - match transfer.delivery_id { - None => { - // #2.7.5 delivery_id MUST be set. batching is handled on lower level - if transfer.delivery_id.is_none() { - let _ = this.link.close_with_error( - LinkError::force_detach() - .description("delivery_id MUST be set"), - ); - return Poll::Ready(Ok(())); - } - } - Some(delivery_id) => { - if link.credit() == 0 { - // self.has_credit = self.link.credit() != 0; - link.set_link_credit(50); - } - - let msg = - Transfer::new(app_state.clone(), transfer, link.clone()); - - let mut fut = srv.call(msg); - match Pin::new(&mut fut).poll(cx) { - Poll::Ready(Ok(outcome)) => settle( - &mut this.link, - delivery_id, - outcome.into_delivery_state(), - ), - Poll::Pending => { - ntex::rt::spawn(HandleMessage { - fut, - delivery_id, - link: this.link.clone(), - }); - } - Poll::Ready(Err(e)) => { - log::trace!("Service response error: {:?}", e); - settle( - &mut this.link, - delivery_id, - DeliveryState::Rejected(Rejected { - error: Some(e), - }), - ) - } - } - } - } - } - Poll::Ready(None) => { - // TODO: shutdown service - log::trace!("Link is gone"); - return Poll::Ready(Ok(())); - } - Poll::Pending => return Poll::Pending, - Poll::Ready(Some(Err(e))) => { - log::trace!("Link is failed: {:?}", e); - let _ = this.link.close_with_error(LinkError::force_detach()); - return Poll::Ready(Ok(())); - } + if let Some(tr) = this.link.get_transfer() { + this.state = RouterServiceResponseState::Service(Some(tr)); + } else { + return Poll::Ready(Ok(())); } } RouterServiceResponseState::NewService(ref mut fut) => match Pin::new(fut).poll(cx) @@ -246,10 +268,15 @@ impl Future for RouterServiceResponse { .map(|t| t.address.as_ref().map(|s| s.as_ref()).unwrap_or("")) .unwrap_or("") ); - this.link.open(); - this.link.set_link_credit(50); - this.state = RouterServiceResponseState::Service(srv); - continue; + this.inner + .get_mut() + .handlers + .insert(this.link.clone(), Some(srv)); + if let Some(tr) = this.link.get_transfer() { + this.state = RouterServiceResponseState::Service(Some(tr)); + } else { + return Poll::Ready(Ok(())); + } } Poll::Ready(Err(e)) => { log::error!( @@ -338,32 +365,32 @@ fn settle(link: &mut ReceiverLink, id: DeliveryNumber, state: DeliveryState) { struct ResourceServiceFactory { factory: T, - _t: PhantomData, + _t: marker::PhantomData, } impl ResourceServiceFactory where S: 'static, - T: ServiceFactory, Request = Transfer, Response = Outcome> + 'static, + T: ServiceFactory, Request = Transfer, Response = Outcome> + 'static, Error: From + From, Outcome: TryFrom, { fn create(factory: T) -> Handle { boxed::factory(ResourceServiceFactory { factory, - _t: PhantomData, + _t: marker::PhantomData, }) } } impl ServiceFactory for ResourceServiceFactory where - T: ServiceFactory, Request = Transfer, Response = Outcome>, + T: ServiceFactory, Request = Transfer, Response = Outcome>, Error: From + From, Outcome: TryFrom, { type Config = Link; - type Request = Transfer; + type Request = Transfer; type Response = Outcome; type Error = Error; type InitError = Error; @@ -373,7 +400,7 @@ where fn new_service(&self, cfg: Link) -> Self::Future { ResourceServiceFactoryFut { fut: self.factory.new_service(cfg), - _t: PhantomData, + _t: marker::PhantomData, } } } @@ -381,13 +408,13 @@ where pin_project_lite::pin_project! { struct ResourceServiceFactoryFut { #[pin] fut: T::Future, - _t: PhantomData, + _t: marker::PhantomData, } } impl Future for ResourceServiceFactoryFut where - T: ServiceFactory, Request = Transfer, Response = Outcome>, + T: ServiceFactory, Request = Transfer, Response = Outcome>, Error: From + From, Outcome: TryFrom, { @@ -401,23 +428,23 @@ where }; Poll::Ready(Ok(ResourceService { service, - _t: PhantomData, + _t: marker::PhantomData, })) } } struct ResourceService { service: T, - _t: PhantomData, + _t: marker::PhantomData, } impl Service for ResourceService where - T: Service, Response = Outcome>, + T: Service, Error: From, Outcome: TryFrom, { - type Request = Transfer; + type Request = Transfer; type Response = Outcome; type Error = Error; type Future = ResourceServiceFut; @@ -433,10 +460,10 @@ where } #[inline] - fn call(&self, req: Transfer) -> Self::Future { + fn call(&self, req: Transfer) -> Self::Future { ResourceServiceFut { fut: self.service.call(req), - _t: PhantomData, + _t: marker::PhantomData, } } } @@ -444,13 +471,13 @@ where pin_project_lite::pin_project! { struct ResourceServiceFut { #[pin] fut: T::Future, - _t: PhantomData, + _t: marker::PhantomData, } } impl Future for ResourceServiceFut where - T: Service, Response = Outcome>, + T: Service, Error: From, Outcome: TryFrom, { diff --git a/src/server/mod.rs b/src/server/mod.rs index 98278fb..17169a7 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,8 +7,9 @@ pub use self::error::{HandshakeError, ServerError}; pub use self::handshake::{Handshake, HandshakeAck, HandshakeAmqp, HandshakeAmqpOpened}; pub use self::sasl::Sasl; pub use self::service::Server; +pub use crate::codec::protocol::Transfer; pub use crate::control::{ControlFrame, ControlFrameKind}; pub use crate::error::{Error, LinkError}; pub use crate::router::Router; pub use crate::state::State; -pub use crate::types::{Link, Outcome, Transfer}; +pub use crate::types::{Link, Outcome}; diff --git a/src/server/service.rs b/src/server/service.rs index cbc2e72..97fe34b 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -5,9 +5,8 @@ use ntex::framed::{Dispatcher as FramedDispatcher, State as IoState, Timer}; use ntex::service::{IntoServiceFactory, Service, ServiceFactory}; use crate::codec::{protocol::ProtocolId, AmqpCodec, AmqpFrame, ProtocolIdCodec, ProtocolIdError}; -use crate::dispatcher::Dispatcher; -use crate::types::Link; use crate::{default::DefaultControlService, Configuration, Connection, ControlFrame, State}; +use crate::{dispatcher::Dispatcher, types::Message}; use super::handshake::{Handshake, HandshakeAck}; use super::{Error, HandshakeError, ServerError}; @@ -170,7 +169,7 @@ where > where F: IntoServiceFactory, - Pb: ServiceFactory, Request = Link, Response = ()> + 'static, + Pb: ServiceFactory, Request = Message, Response = ()> + 'static, Pb::Error: fmt::Debug, Pb::InitError: fmt::Debug, Error: From + From, @@ -211,7 +210,7 @@ where Ctl: ServiceFactory, Request = ControlFrame, Response = ()> + 'static, Ctl::Error: fmt::Debug, Ctl::InitError: fmt::Debug, - Pb: ServiceFactory, Request = Link, Response = ()> + 'static, + Pb: ServiceFactory, Request = Message, Response = ()> + 'static, Pb::Error: fmt::Debug, Pb::InitError: fmt::Debug, Error: From + From, @@ -253,7 +252,7 @@ where Ctl: ServiceFactory, Request = ControlFrame, Response = ()> + 'static, Ctl::Error: fmt::Debug, Ctl::InitError: fmt::Debug, - Pb: ServiceFactory, Request = Link, Response = ()> + 'static, + Pb: ServiceFactory, Request = Message, Response = ()> + 'static, Pb::Error: fmt::Debug, Pb::InitError: fmt::Debug, Error: From + From, @@ -309,7 +308,7 @@ where ServerError::ControlServiceError })?; - let dispatcher = Dispatcher::new(st, sink, pb_srv, ctl_srv, idle_timeout) + let dispatcher = Dispatcher::new(sink, pb_srv, ctl_srv, idle_timeout) .map(|_| Option::::None); FramedDispatcher::new(io, codec, state, dispatcher, inner.time.clone()) @@ -342,7 +341,7 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, H: Service, Response = HandshakeAck>, Ctl: ServiceFactory, Request = ControlFrame, Response = ()> + 'static, - Pb: ServiceFactory, Request = Link, Response = ()> + 'static, + Pb: ServiceFactory, Request = Message, Response = ()> + 'static, { let state = IoState::with_params( inner.read_hw, diff --git a/src/session.rs b/src/session.rs index 7df1959..a844fb8 100644 --- a/src/session.rs +++ b/src/session.rs @@ -11,11 +11,11 @@ use ntex_amqp_codec::protocol::{ }; use ntex_amqp_codec::AmqpFrame; -use crate::cell::Cell; use crate::connection::Connection; use crate::error::AmqpProtocolError; use crate::rcvlink::{ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner}; use crate::sndlink::{DeliveryPromise, SenderLink, SenderLinkBuilder, SenderLinkInner}; +use crate::{cell::Cell, types::Action}; const INITIAL_OUTGOING_ID: TransferNumber = 0; @@ -532,28 +532,54 @@ impl SessionInner { None } - pub(crate) fn handle_frame(&mut self, frame: Frame) { + pub(crate) fn handle_frame(&mut self, frame: Frame) -> Result { if self.error.is_none() { match frame { - Frame::Flow(flow) => self.apply_flow(&flow), + Frame::Flow(flow) => { + // apply link flow + if let Some(Either::Left(link)) = flow + .handle() + .and_then(|h| self.remote_handles.get(&h).copied()) + .and_then(|h| self.links.get_mut(h)) + { + if let SenderLinkState::Established(ref mut link) = link { + return Ok(Action::Flow(link.clone(), flow)); + } else { + warn!("Received flow frame"); + } + } + self.handle_flow(&flow, None); + Ok(Action::None) + } Frame::Disposition(disp) => { if let Some(sender) = self.disposition_subscribers.remove(&disp.first) { let _ = sender.send(disp); } else { self.settle_deliveries(disp); } + Ok(Action::None) } Frame::Transfer(transfer) => { let idx = if let Some(idx) = self.remote_handles.get(&transfer.handle()) { *idx } else { error!("Transfer's link {:?} is unknown", transfer.handle()); - return; + return Err(AmqpProtocolError::UnknownLink { + session: self.id, + link_handle: transfer.handle(), + frame: Box::new(Frame::Transfer(transfer)), + }); }; if let Some(link) = self.links.get_mut(idx) { match link { - Either::Left(_) => error!("Got trasfer from sender link"), + Either::Left(_) => { + error!("Got trasfer from sender link"); + Err(AmqpProtocolError::Unexpected( + "Got trasfer from sender link", + Box::new(Frame::Transfer(transfer)), + )) + } Either::Right(link) => match link { ReceiverLinkState::Opening(_) => { error!( @@ -561,6 +587,9 @@ impl SessionInner { transfer.handle(), idx ); + Err(AmqpProtocolError::UnexpectedOpeningState(Box::new( + Frame::Transfer(transfer), + ))) } ReceiverLinkState::OpeningLocal(_) => { error!( @@ -568,28 +597,34 @@ impl SessionInner { transfer.handle(), idx ); + Err(AmqpProtocolError::UnexpectedOpeningState(Box::new( + Frame::Transfer(transfer), + ))) } ReceiverLinkState::Established(link) => { // self.outgoing_window -= 1; let _ = self.next_incoming_id.wrapping_add(1); - link.inner.get_mut().handle_transfer(transfer); + link.inner.get_mut().handle_transfer(transfer, &link.inner) } - ReceiverLinkState::Closing(_) => (), + ReceiverLinkState::Closing(_) => Ok(Action::None), }, } } else { - error!( - "Remote link handle mapped to non-existing link: {} -> {}", - transfer.handle(), - idx - ); + Err(AmqpProtocolError::UnknownLink { + session: self.id, + link_handle: transfer.handle(), + frame: Box::new(Frame::Transfer(transfer)), + }) } } - Frame::Detach(mut detach) => { - self.handle_detach(&mut detach); + Frame::Detach(detach) => Ok(self.handle_detach(detach)), + frame => { + error!("Unexpected frame: {:?}", frame); + Ok(Action::None) } - frame => error!("Unexpected frame: {:?}", frame), } + } else { + Ok(Action::None) } } @@ -661,24 +696,27 @@ impl SessionInner { } /// Handle `Detach` frame. - pub(crate) fn handle_detach(&mut self, detach: &mut Detach) { + pub(crate) fn handle_detach(&mut self, mut frame: Detach) -> Action { // get local link instance - let idx = if let Some(idx) = self.remote_handles.get(&detach.handle()) { + let idx = if let Some(idx) = self.remote_handles.get(&frame.handle()) { *idx - } else if self.links.contains(detach.handle() as usize) { - detach.handle() as usize + } else if self.links.contains(frame.handle() as usize) { + frame.handle() as usize } else { // should not happen, error - log::info!("Detaching unknown link: {:?}", detach); - return; + log::info!("Detaching unknown link: {:?}", frame); + return Action::None; }; + let handle = frame.handle(); + let mut action = Action::None; + let remove = if let Some(link) = self.links.get_mut(idx) { match link { Either::Left(link) => match link { SenderLinkState::Opening(ref mut tx) => { if let Some(tx) = tx.take() { - let err = AmqpProtocolError::LinkDetached(detach.error.clone()); + let err = AmqpProtocolError::LinkDetached(frame.error.clone()); let _ = tx.send(Err(err)); } true @@ -688,7 +726,7 @@ impl SessionInner { let detach = Detach { handle: link.inner.get_ref().id(), closed: true, - error: detach.error.clone(), + error: frame.error.clone(), }; let err = AmqpProtocolError::LinkDetached(detach.error.clone()); @@ -715,6 +753,7 @@ impl SessionInner { link.inner.get_mut().detached(err); self.sink .post_frame(AmqpFrame::new(self.remote_channel_id, detach.into())); + action = Action::DetachSender(link.clone(), frame); true } SenderLinkState::Closing(_) => true, @@ -724,7 +763,7 @@ impl SessionInner { ReceiverLinkState::OpeningLocal(ref mut item) => { if let Some((inner, tx)) = item.take() { inner.get_mut().detached(); - if let Some(err) = detach.error.clone() { + if let Some(err) = frame.error.clone() { let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err)))); } else { let _ = tx.send(Err(AmqpProtocolError::LinkDetached(None))); @@ -736,7 +775,7 @@ impl SessionInner { true } ReceiverLinkState::Established(link) => { - link.remote_closed(detach.error.take()); + link.remote_closed(frame.error.take()); // detach from remote endpoint let detach = Detach { @@ -748,12 +787,14 @@ impl SessionInner { // detach rcv link self.sink .post_frame(AmqpFrame::new(self.remote_channel_id, detach.into())); + + action = Action::DetachReceiver(link.clone(), frame); true } ReceiverLinkState::Closing(tx) => { // detach confirmation if let Some(tx) = tx.take() { - if let Some(err) = detach.error.clone() { + if let Some(err) = frame.error.clone() { let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err)))); } else { let _ = tx.send(Ok(())); @@ -769,8 +810,9 @@ impl SessionInner { if remove { self.links.remove(idx); - self.remote_handles.remove(&detach.handle()); + self.remote_handles.remove(&handle); } + action } fn settle_deliveries(&mut self, disposition: Disposition) { @@ -816,7 +858,7 @@ impl SessionInner { } } - pub(crate) fn apply_flow(&mut self, flow: &Flow) { + pub(crate) fn handle_flow(&mut self, flow: &Flow, link: Option<&SenderLink>) { // # AMQP1.0 2.5.6 self.next_incoming_id = flow.next_outgoing_id(); self.remote_outgoing_window = flow.outgoing_window(); @@ -841,18 +883,6 @@ impl SessionInner { } } - // apply link flow - if let Some(Either::Left(link)) = flow - .handle() - .and_then(|h| self.remote_handles.get(&h).copied()) - .and_then(|h| self.links.get_mut(h)) - { - if let SenderLinkState::Established(ref mut link) = link { - link.inner.get_mut().apply_flow(flow); - } else { - warn!("Received flow frame"); - } - } if flow.echo() { let flow = Flow { next_incoming_id: if self.local { @@ -873,6 +903,11 @@ impl SessionInner { }; self.post_frame(flow.into()); } + + // apply link flow + if let Some(link) = link { + link.inner.get_mut().apply_flow(&flow); + } } pub(crate) fn rcv_link_flow(&mut self, handle: u32, delivery_count: u32, credit: u32) { diff --git a/src/types.rs b/src/types.rs index 41f053e..e8f82d9 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,13 +1,29 @@ use std::fmt; use ntex::router::Path; -use ntex::util::{ByteString, Bytes}; +use ntex::util::ByteString; -use crate::codec::protocol::{ - self, Accepted, Attach, DeliveryState, Error, Rejected, TransferBody, -}; -use crate::codec::{AmqpParseError, Decode}; -use crate::{rcvlink::ReceiverLink, session::Session, Handle, State}; +use crate::codec::protocol::{Accepted, Attach, DeliveryState, Detach, Error, Flow, Rejected}; +use crate::{rcvlink::ReceiverLink, session::Session, sndlink::SenderLink, Handle, State}; + +pub use crate::codec::protocol::Transfer; + +#[derive(Debug)] +pub enum Message { + Attached(ReceiverLink), + // Detached(ReceiverLink), + Transfer(ReceiverLink), +} + +pub(crate) enum Action { + None, + AttachSender(SenderLink, Attach), + AttachReceiver(ReceiverLink), + DetachSender(SenderLink, Detach), + DetachReceiver(ReceiverLink, Detach), + Flow(SenderLink, Flow), + Transfer(ReceiverLink), +} pub struct Link { pub(crate) state: State, @@ -16,11 +32,11 @@ pub struct Link { } impl Link { - pub(crate) fn new(link: ReceiverLink, state: State) -> Self { + pub(crate) fn new(link: ReceiverLink, state: State, path: ByteString) -> Self { Link { state, link, - path: Path::new(ByteString::from_static("")), + path: Path::new(path), } } @@ -36,8 +52,8 @@ impl Link { self.link.frame() } - pub fn state(&self) -> &S { - self.state.get_ref() + pub fn state(&self) -> &State { + &self.state } pub fn handle(&self) -> Handle { @@ -83,12 +99,6 @@ impl fmt::Debug for Link { } } -pub struct Transfer { - state: State, - frame: protocol::Transfer, - link: ReceiverLink, -} - #[derive(Debug)] pub enum Outcome { Accept, @@ -105,56 +115,3 @@ impl Outcome { } } } - -impl Transfer { - pub(crate) fn new(state: State, frame: protocol::Transfer, link: ReceiverLink) -> Self { - Transfer { state, frame, link } - } - - pub fn state(&self) -> &S { - self.state.get_ref() - } - - pub fn session(&self) -> &Session { - self.link.session() - } - - pub fn session_mut(&mut self) -> &mut Session { - self.link.session_mut() - } - - pub fn receiver(&self) -> &ReceiverLink { - &self.link - } - - pub fn receiver_mut(&mut self) -> &mut ReceiverLink { - &mut self.link - } - - pub fn frame(&self) -> &protocol::Transfer { - &self.frame - } - - pub fn body(&self) -> Option<&Bytes> { - match self.frame.body { - Some(TransferBody::Data(ref b)) => Some(b), - _ => None, - } - } - - pub fn load_message(&self) -> Result { - if let Some(TransferBody::Data(ref b)) = self.frame.body { - Ok(T::decode(b)?.1) - } else { - Err(AmqpParseError::UnexpectedType("body")) - } - } -} - -impl fmt::Debug for Transfer { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Transfer") - .field("frame", &self.frame) - .finish() - } -} diff --git a/src/utils.rs b/src/utils.rs deleted file mode 100644 index 1d8ba77..0000000 --- a/src/utils.rs +++ /dev/null @@ -1,11 +0,0 @@ -/// Unwrap result and return `err` future -/// -/// Err(e) get converted to err(e) -macro_rules! try_ready_err { - ($e:expr) => { - match $e { - Ok(value) => value, - Err(e) => return ntex::util::Ready::Err(e), - } - }; -} diff --git a/tests/test_server.rs b/tests/test_server.rs index c21adc7..54959bf 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -11,7 +11,7 @@ async fn server( ) -> Result< Box< dyn Service< - Request = types::Transfer<()>, + Request = types::Transfer, Response = types::Outcome, Error = LinkError, Future = Ready,