Skip to content

Commit

Permalink
Migrate to ntex-service-1.0 (#27)
Browse files Browse the repository at this point in the history
* use service 1.0
  • Loading branch information
fafhrd91 authored Dec 28, 2022
1 parent ef335bc commit 1afaa60
Show file tree
Hide file tree
Showing 17 changed files with 202 additions and 257 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
version:
- 1.57.0 # MSRV
- 1.65.0 # MSRV
- stable
- nightly

Expand All @@ -34,7 +34,7 @@ jobs:
uses: Swatinem/[email protected]

- name: Cache cargo tarpaulin
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
uses: actions/cache@v1
with:
path: ~/.cargo/bin
Expand All @@ -48,19 +48,19 @@ jobs:
args: --all --no-fail-fast --features=ntex/tokio -- --nocapture

- name: Install tarpaulin
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo install cargo-tarpaulin
- name: Generate coverage report
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo tarpaulin --out Xml --all --features=ntex/tokio
- name: Upload to Codecov
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
uses: codecov/codecov-action@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.0-beta.0] - 2022-12-28

* Migrate to ntex-service 1.0

## [0.6.4] - 2022-08-22

* Must respond with attach before detach when rejecting links #24
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.6.4"
version = "0.7.0-beta.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.5.14"
ntex = "0.6.0-beta.0"
ntex-amqp-codec = "0.8.2"

bitflags = "1.3"
Expand All @@ -35,8 +35,8 @@ slab = "0.4"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
env_logger = "0.9"
ntex = { version = "0.5", features = ["tokio"] }
env_logger = "0.10"
ntex = { version = "0.6.0-beta.0", features = ["tokio"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
2 changes: 1 addition & 1 deletion codec/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl ArrayEncode for bool {
1
}
fn array_encode(&self, buf: &mut BytesMut) {
buf.put_u8(if *self { 1 } else { 0 });
buf.put_u8(u8::from(*self));
}
}

Expand Down
4 changes: 2 additions & 2 deletions codec/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ impl From<String> for Str {
impl hash::Hash for Str {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
match self {
Str::String(s) => (&**s).hash(state),
Str::ByteStr(s) => (&**s).hash(state),
Str::String(s) => (**s).hash(state),
Str::ByteStr(s) => (**s).hash(state),
Str::Static(s) => s.hash(state),
}
}
Expand Down
4 changes: 2 additions & 2 deletions codec/src/types/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl Variant {

pub fn as_int(&self) -> Option<i32> {
match self {
Variant::Int(v) => Some(*v as i32),
Variant::Int(v) => Some(*v),
_ => None,
}
}
Expand All @@ -140,7 +140,7 @@ impl Variant {
Variant::Byte(v) => Some(*v as i64),
Variant::Short(v) => Some(*v as i64),
Variant::Int(v) => Some(*v as i64),
Variant::Long(v) => Some(*v as i64),
Variant::Long(v) => Some(*v),
_ => None,
}
}
Expand Down
15 changes: 2 additions & 13 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
use ntex::service::{fn_factory_with_config, Service};
use ntex::util::Ready;
use ntex::service::{boxed::BoxService, fn_factory_with_config};
use ntex_amqp::{error::AmqpError, error::LinkError, server};

async fn server(
link: server::Link<()>,
) -> Result<
Box<
dyn Service<
server::Transfer,
Response = server::Outcome,
Error = AmqpError,
Future = Ready<server::Outcome, AmqpError>,
> + 'static,
>,
LinkError,
> {
) -> Result<BoxService<server::Transfer, server::Outcome, AmqpError>, LinkError> {
println!("OPEN LINK: {:?}", link);
Err(LinkError::force_detach().description("unimplemented"))
}
Expand Down
58 changes: 21 additions & 37 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,11 @@ where
IoBoxed: From<T::Response>,
{
/// Connect to amqp server
pub fn connect(&self, address: A) -> impl Future<Output = Result<Client, ConnectError>> {
pub async fn connect(&self, address: A) -> Result<Client, ConnectError> {
let fut = timeout_checked(self.handshake_timeout, self._connect(address));
async move {
match fut.await {
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
match fut.await {
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
}

Expand All @@ -165,35 +163,27 @@ where
_connect_plain(io, self.config.clone())
}

fn _connect(&self, address: A) -> impl Future<Output = Result<Client, ConnectError>> {
let fut = self.connector.call(Connect::new(address));
async fn _connect(&self, address: A) -> Result<Client, ConnectError> {
let io = self.connector.call(Connect::new(address)).await?;
let config = self.config.clone();
let pool = self.pool;
let disconnect = self.disconnect_timeout;

async move {
trace!("Negotiation client protocol id: Amqp");
trace!("Negotiation client protocol id: Amqp");

let io = IoBoxed::from(fut.await?);
io.set_memory_pool(pool);
io.set_disconnect_timeout(disconnect.into());
let io = IoBoxed::from(io);
io.set_memory_pool(pool);
io.set_disconnect_timeout(disconnect.into());

_connect_plain(io, config).await
}
_connect_plain(io, config).await
}

/// Connect to amqp server
pub fn connect_sasl(
&self,
addr: A,
auth: SaslAuth,
) -> impl Future<Output = Result<Client, ConnectError>> {
pub async fn connect_sasl(&self, addr: A, auth: SaslAuth) -> Result<Client, ConnectError> {
let fut = timeout_checked(self.handshake_timeout, self._connect_sasl(addr, auth));
async move {
match fut.await {
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
match fut.await {
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
}

Expand All @@ -212,23 +202,17 @@ where
_connect_sasl(io, auth, config)
}

fn _connect_sasl(
&self,
addr: A,
auth: SaslAuth,
) -> impl Future<Output = Result<Client, ConnectError>> {
let fut = self.connector.call(Connect::new(addr));
async fn _connect_sasl(&self, addr: A, auth: SaslAuth) -> Result<Client, ConnectError> {
let io = self.connector.call(Connect::new(addr)).await?;
let config = self.config.clone();
let pool = self.pool;
let disconnect = self.disconnect_timeout;

async move {
let io = IoBoxed::from(fut.await?);
io.set_memory_pool(pool);
io.set_disconnect_timeout(disconnect.into());
let io = IoBoxed::from(io);
io.set_memory_pool(pool);
io.set_disconnect_timeout(disconnect.into());

_connect_sasl(io, auth, config).await
}
_connect_sasl(io, auth, config).await
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub enum ControlFrameKind {
SessionEnded(Vec<Either<SenderLink, ReceiverLink>>),
ProtocolError(AmqpProtocolError),
Disconnected(Option<io::Error>),
Closed(bool),
Closed,
}

impl ControlFrame {
Expand Down
28 changes: 9 additions & 19 deletions src/default.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{marker::PhantomData, task::Context, task::Poll};
use std::marker::PhantomData;

use ntex::service::{Service, ServiceFactory};
use ntex::util::Ready;
Expand All @@ -20,25 +20,20 @@ impl<S, E> ServiceFactory<Link<S>, State<S>> for DefaultPublishService<S, E> {
type Error = E;
type InitError = LinkError;
type Service = DefaultPublishService<S, E>;
type Future = Ready<Self::Service, Self::InitError>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;

fn new_service(&self, _: State<S>) -> Self::Future {
fn create(&self, _: State<S>) -> Self::Future<'_> {
Ready::Err(LinkError::force_detach().description("not configured"))
}
}

impl<S, E> Service<Link<S>> for DefaultPublishService<S, E> {
type Response = ();
type Error = E;
type Future = Ready<Self::Response, Self::Error>;
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

#[inline]
fn call(&self, _pkt: Link<S>) -> Self::Future {
fn call(&self, _pkt: Link<S>) -> Self::Future<'_> {
log::warn!("AMQP Publish service is not configured");
Ready::Ok(())
}
Expand All @@ -58,25 +53,20 @@ impl<S, E> ServiceFactory<ControlFrame, State<S>> for DefaultControlService<S, E
type Error = E;
type InitError = E;
type Service = DefaultControlService<S, E>;
type Future = Ready<Self::Service, Self::InitError>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;

fn new_service(&self, _: State<S>) -> Self::Future {
fn create(&self, _: State<S>) -> Self::Future<'_> {
Ready::Ok(DefaultControlService(PhantomData))
}
}

impl<S, E> Service<ControlFrame> for DefaultControlService<S, E> {
type Response = ();
type Error = E;
type Future = Ready<Self::Response, Self::Error>;

#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

#[inline]
fn call(&self, _pkt: ControlFrame) -> Self::Future {
fn call(&self, _pkt: ControlFrame) -> Self::Future<'_> {
Ready::Ok(())
}
}
Loading

0 comments on commit 1afaa60

Please sign in to comment.