Skip to content

Commit

Permalink
Refactor dispatch process (#12)
Browse files Browse the repository at this point in the history
* refactor server impl

* refactor link handling
  • Loading branch information
fafhrd91 authored Aug 11, 2021
1 parent 5d0f60f commit 50f9dd9
Show file tree
Hide file tree
Showing 19 changed files with 600 additions and 631 deletions.
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changes

## [0.5.0-b.5] - 2021-08-11

* Refactor server dispatch process

## [codec-0.6.2] - 2021-08-11

* Add helper methods to Transfer type

## [0.5.0-b.3] - 2021-08-10

* Add Session::connection() method, returns ref to Connection
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.5.0-b.4"
version = "0.5.0-b.5"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -25,7 +25,7 @@ frame-trace = []

[dependencies]
ntex = "0.4.0-b.1"
ntex-amqp-codec = "0.6.1"
ntex-amqp-codec = "0.6.2"

bitflags = "1.2"
derive_more = "0.99"
Expand Down
2 changes: 1 addition & 1 deletion codec/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp-codec"
version = "0.6.1"
version = "0.6.2"
description = "AMQP 1.0 Protocol Codec"
authors = ["Nikolay Kim <[email protected]>", "Max Gortman <[email protected]>", "Mike Yagley <[email protected]>"]
license = "MIT/Apache-2.0"
Expand Down
24 changes: 23 additions & 1 deletion codec/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use derive_more::From;
use ntex_bytes::{BufMut, ByteString, Bytes, BytesMut};
use uuid::Uuid;

use super::codec::{self, DecodeFormatted, Encode};
use super::codec::{self, Decode, DecodeFormatted, Encode};
use super::error::AmqpParseError;
use super::message::Message;
use super::types::*;
Expand Down Expand Up @@ -253,22 +253,44 @@ impl TransferBody {
}

impl From<Message> for TransferBody {
#[inline]
fn from(msg: Message) -> Self {
Self::Message(Box::new(msg))
}
}

impl Encode for TransferBody {
#[inline]
fn encoded_size(&self) -> usize {
match self {
TransferBody::Data(ref data) => data.len(),
TransferBody::Message(ref data) => data.encoded_size(),
}
}
#[inline]
fn encode(&self, dst: &mut BytesMut) {
match *self {
TransferBody::Data(ref data) => dst.put_slice(data),
TransferBody::Message(ref data) => data.encode(dst),
}
}
}

impl Transfer {
#[inline]
pub fn get_body(&self) -> Option<&Bytes> {
match self.body {
Some(TransferBody::Data(ref b)) => Some(b),
_ => None,
}
}

#[inline]
pub fn load_message<T: Decode>(&self) -> Result<T, AmqpParseError> {
if let Some(TransferBody::Data(ref b)) = self.body {
Ok(T::decode(b)?.1)
} else {
Err(AmqpParseError::UnexpectedType("body"))
}
}
}
2 changes: 1 addition & 1 deletion examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async fn server(
) -> Result<
Box<
dyn Service<
Request = server::Transfer<()>,
Request = server::Transfer,
Response = server::Outcome,
Error = AmqpError,
Future = Ready<server::Outcome, AmqpError>,
Expand Down
9 changes: 4 additions & 5 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct Client<Io, St = ()> {
keepalive: u16,
remote_config: Configuration,
timer: Timer,
st: State<St>,
_st: State<St>,
}

impl<T> Client<T, ()>
Expand All @@ -41,7 +41,7 @@ where
keepalive,
remote_config,
timer,
st: State::new(()),
_st: State::new(()),
}
}
}
Expand All @@ -68,7 +68,7 @@ where
keepalive: self.keepalive,
remote_config: self.remote_config,
timer: self.timer,
st: State::new(st),
_st: State::new(st),
}
}

Expand All @@ -77,9 +77,8 @@ where
/// Default handler closes connection on any control message.
pub async fn start_default(self) -> Result<(), DispatcherError> {
let dispatcher = Dispatcher::new(
self.st,
self.connection,
fn_service(|_| Ready::<_, LinkError>::Err(LinkError::force_detach())),
fn_service(|_| Ready::<_, LinkError>::Ok(())),
fn_service(|_| Ready::<_, LinkError>::Ok(())),
self.remote_config.timeout_remote_secs(),
)
Expand Down
Loading

0 comments on commit 50f9dd9

Please sign in to comment.