Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async fn in trait for Service definition #36

Merged
merged 2 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
version:
- 1.66.0 # MSRV
- 1.75.0 # MSRV
- stable
- nightly

Expand All @@ -34,7 +34,7 @@ jobs:
uses: Swatinem/[email protected]

- name: Cache cargo tarpaulin
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
uses: actions/cache@v1
with:
path: ~/.cargo/bin
Expand All @@ -48,19 +48,19 @@ jobs:
args: --all --no-fail-fast --features=ntex/tokio -- --nocapture

- name: Install tarpaulin
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo install cargo-tarpaulin

- name: Generate coverage report
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo tarpaulin --out Xml --all --features=ntex/tokio

- name: Upload to Codecov
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
uses: codecov/codecov-action@v1
with:
Expand All @@ -69,7 +69,7 @@ jobs:
- name: Install cargo-cache
continue-on-error: true
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo install cargo-cache --no-default-features --features ci-autoclean

- name: Clear the cargo caches
continue-on-error: true
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.0.0-b.0] - 2024-01-07

* Use "async fn" in trait for Service definition

## [0.8.9] - 2024-01-04

* Remove internal circular references
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.9"
version = "1.0.0-b.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.7.16"
ntex = "1.0.0-b.0"
ntex-amqp-codec = "0.9.1"

bitflags = "2.4"
Expand All @@ -36,7 +36,7 @@ uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
env_logger = "0.10"
ntex = { version = "0.7", features = ["tokio"] }
ntex = { version = "1.0.0-b.0", features = ["tokio"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
4 changes: 2 additions & 2 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@
self.remote_config.timeout_remote_secs().into(),
);

IoDispatcher::with_config(
IoDispatcher::new(
self.io,
self.codec,
dispatcher,

Check warning on line 73 in src/client/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/client/connection.rs#L73

Added line #L73 was not covered by tests
&self.remote_config.disp_config,
)
.await
Expand All @@ -86,17 +86,17 @@
{
let dispatcher = Dispatcher::new(
self.connection,
Pipeline::new(fn_service(|_| Ready::<_, S::Error>::Ok(()))),
Pipeline::new(service.into_service()),

Check warning on line 90 in src/client/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/client/connection.rs#L89-L90

Added lines #L89 - L90 were not covered by tests
self.remote_config.timeout_remote_secs().into(),
);

IoDispatcher::with_config(
IoDispatcher::new(
self.io,
self.codec,
dispatcher,
&self.remote_config.disp_config,

Check warning on line 98 in src/client/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/client/connection.rs#L95-L98

Added lines #L95 - L98 were not covered by tests
)
.await

Check warning on line 100 in src/client/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/client/connection.rs#L100

Added line #L100 was not covered by tests
}
}
31 changes: 16 additions & 15 deletions src/default.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::marker::PhantomData;

use ntex::service::{Service, ServiceCtx, ServiceFactory};
use ntex::util::Ready;

use crate::error::LinkError;
use crate::{types::Link, ControlFrame, State};
Expand All @@ -20,22 +19,23 @@
type Error = E;
type InitError = LinkError;
type Service = DefaultPublishService<S, E>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;

fn create(&self, _: State<S>) -> Self::Future<'_> {
Ready::Err(LinkError::force_detach().description("not configured"))
async fn create(&self, _: State<S>) -> Result<Self::Service, Self::InitError> {
Err(LinkError::force_detach().description("not configured"))

Check warning on line 24 in src/default.rs

View check run for this annotation

Codecov / codecov/patch

src/default.rs#L23-L24

Added lines #L23 - L24 were not covered by tests
}
}

impl<S, E> Service<Link<S>> for DefaultPublishService<S, E> {
type Response = ();
type Error = E;
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

#[inline]
fn call<'a>(&'a self, _: Link<S>, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
async fn call(

Check warning on line 32 in src/default.rs

View check run for this annotation

Codecov / codecov/patch

src/default.rs#L32

Added line #L32 was not covered by tests
&self,
_: Link<S>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
log::warn!("AMQP Publish service is not configured");
Ready::Ok(())
Ok(())

Check warning on line 38 in src/default.rs

View check run for this annotation

Codecov / codecov/patch

src/default.rs#L38

Added line #L38 was not covered by tests
}
}

Expand All @@ -53,20 +53,21 @@
type Error = E;
type InitError = E;
type Service = DefaultControlService<S, E>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;

fn create(&self, _: State<S>) -> Self::Future<'_> {
Ready::Ok(DefaultControlService(PhantomData))
async fn create(&self, _: State<S>) -> Result<Self::Service, Self::InitError> {
Ok(DefaultControlService(PhantomData))
}
}

impl<S, E> Service<ControlFrame> for DefaultControlService<S, E> {
type Response = ();
type Error = E;
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

#[inline]
fn call<'a>(&'a self, _: ControlFrame, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Ready::Ok(())
async fn call(
&self,
_: ControlFrame,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
Ok(())
}
}
50 changes: 22 additions & 28 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::VecDeque;
use std::{cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex::service::{Pipeline, Service, ServiceCall, ServiceCtx};
use ntex::service::{Pipeline, Service, ServiceCtx};
use ntex::time::{sleep, Millis, Sleep};
use ntex::util::{ready, BoxFuture, Either, Ready};
use ntex::util::{ready, BoxFuture, Either};
use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker};

use crate::codec::{protocol::Frame, AmqpCodec, AmqpFrame};
Expand Down Expand Up @@ -153,7 +153,7 @@
let link = link.clone();
let frm = frm.clone();
let fut = self
.service

Check warning on line 156 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L156

Added line #L156 was not covered by tests
.call_static(types::Message::Attached(frm.clone(), link.clone()));
ntex::rt::spawn(async move {
let result = fut.await;
Expand Down Expand Up @@ -201,10 +201,6 @@
{
type Response = Option<AmqpFrame>;
type Error = AmqpDispatcherError;
type Future<'f> = Either<
ServiceResult<'f, ServiceCall<'f, Sr, types::Message>, Sr::Error>,
Ready<Self::Response, Self::Error>,
>;

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.ctl_queue.waker.register(cx.waker());
Expand All @@ -218,10 +214,10 @@
// check readiness
let service_poll = self.service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Publish service readiness check failed: {:?}",
self.sink.tag(),
err

Check warning on line 220 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L217-L220

Added lines #L217 - L220 were not covered by tests
);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
Expand All @@ -229,10 +225,10 @@

let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Control service readiness check failed: {:?}",
self.sink.tag(),
err

Check warning on line 231 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L228-L231

Added lines #L228 - L231 were not covered by tests
);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
Expand Down Expand Up @@ -264,7 +260,7 @@
sink.on_close.notify();
sink.set_error(AmqpProtocolError::Disconnected);
let fut = self
.ctl_service

Check warning on line 263 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L263

Added line #L263 was not covered by tests
.call_static(ControlFrame::new_kind(ControlFrameKind::Closed));
*shutdown = Some(Box::pin(async move {
let _ = fut.await;
Expand All @@ -281,15 +277,15 @@
}
}

fn call<'a>(
&'a self,
async fn call(
&self,
request: DispatchItem<AmqpCodec<AmqpFrame>>,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
match request {
DispatchItem::Item(frame) => {
#[cfg(feature = "frame-trace")]
log::trace!("{}: incoming: {:#?}", self.sink.tag(), frame);

Check warning on line 288 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L288

Added line #L288 was not covered by tests

let action = match self
.sink
Expand All @@ -297,20 +293,20 @@
.map_err(AmqpDispatcherError::Protocol)
{
Ok(a) => a,
Err(e) => return Either::Right(Ready::Err(e)),
Err(e) => return Err(e),

Check warning on line 296 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L296

Added line #L296 was not covered by tests
};

match action {
types::Action::Transfer(link) => {
return if self.sink.is_opened() {
Either::Left(ServiceResult {
link: link.clone(),
fut: self.service.call(types::Message::Transfer(link)),
_t: marker::PhantomData,
})
} else {
Either::Right(Ready::Ok(None))
};
if self.sink.is_opened() {
let lnk = link.clone();
if let Err(e) = self.service.call(types::Message::Transfer(link)).await
{
let e = Error::from(e);
log::trace!("Service error {:?}", e);
let _ = lnk.close_with_error(e);

Check warning on line 307 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L305-L307

Added lines #L305 - L307 were not covered by tests
}
}
}
types::Action::Flow(link, frm) => {
// apply flow to specific link
Expand Down Expand Up @@ -339,9 +335,9 @@
}
types::Action::DetachReceiver(link, frm) => {
let lnk = link.clone();
let fut = self.service.call_static(types::Message::Detached(lnk));

Check warning on line 338 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L338

Added line #L338 was not covered by tests
spawn(async move {
let _ = fut.await;

Check warning on line 340 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L340

Added line #L340 was not covered by tests
});
self.call_control_service(ControlFrame::new(
link.session().inner.clone(),
Expand All @@ -358,7 +354,7 @@
.collect();

let fut = self
.service

Check warning on line 357 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L357

Added line #L357 was not covered by tests
.call_static(types::Message::DetachedAll(receivers));
spawn(async move {
let _ = fut.await;
Expand All @@ -375,35 +371,33 @@
types::Action::None => (),
};

Either::Right(Ready::Ok(None))
Ok(None)
}
DispatchItem::EncoderError(err) | DispatchItem::DecoderError(err) => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
err.into(),
)));
Either::Right(Ready::Ok(None))
Ok(None)

Check warning on line 380 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L380

Added line #L380 was not covered by tests
}
DispatchItem::KeepAliveTimeout => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
AmqpProtocolError::KeepAliveTimeout,
)));
Either::Right(Ready::Ok(None))
Ok(None)

Check warning on line 386 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L386

Added line #L386 was not covered by tests
}
DispatchItem::ReadTimeout => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
AmqpProtocolError::ReadTimeout,

Check warning on line 390 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L388-L390

Added lines #L388 - L390 were not covered by tests
)));
Either::Right(Ready::Ok(None))
Ok(None)

Check warning on line 392 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L392

Added line #L392 was not covered by tests
}
DispatchItem::Disconnect(e) => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::Disconnected(
e,
)));
Either::Right(Ready::Ok(None))
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => {
Either::Right(Ready::Ok(None))
Ok(None)
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => Ok(None),

Check warning on line 400 in src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/dispatcher.rs#L400

Added line #L400 was not covered by tests
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{collections::VecDeque, future::Future, hash, pin::Pin, task::Context, task::Poll};
use std::{
collections::VecDeque, future::poll_fn, future::Future, hash, pin::Pin, task::Context,
task::Poll,
};

use ntex::util::{poll_fn, ByteString, BytesMut, PoolRef, Stream};
use ntex::util::{ByteString, BytesMut, PoolRef, Stream};
use ntex::{channel::oneshot, task::LocalWaker};
use ntex_amqp_codec::protocol::{
self as codec, Attach, DeliveryNumber, Disposition, Error, Handle, LinkError,
Expand Down Expand Up @@ -156,7 +159,7 @@
let inner = self.inner.get_mut();
log::trace!(
"{}: Receiver link has been closed remotely handle: {:?} name: {:?}",
inner.session.tag(),

Check warning on line 162 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L162

Added line #L162 was not covered by tests
inner.remote_handle,
inner.name
);
Expand Down Expand Up @@ -361,7 +364,7 @@
inner: inner.clone(),
}))
} else {
log::error!("{}: Inconsistent state, bug", self.session.tag());

Check warning on line 367 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L367

Added line #L367 was not covered by tests
let err = Error(Box::new(codec::ErrorInner {
condition: LinkError::DetachForced.into(),
description: Some(ByteString::from_static("Internal error")),
Expand Down
Loading
Loading