Skip to content

Commit

Permalink
Update ntex deps (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Aug 10, 2023
1 parent aa96e95 commit 4bd13b5
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.8.2] - 2023-08-10

* Update ntex deps

## [0.8.1] - 2023-06-23

* Fix client connector usage, fixes lifetime constraint
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.1"
version = "0.8.2"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.7.2"
ntex = "0.7.3"
ntex-amqp-codec = "0.9.0"

bitflags = "1.3"
Expand All @@ -36,7 +36,7 @@ uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
env_logger = "0.10"
ntex = { version = "0.7.2", features = ["tokio"] }
ntex = { version = "0.7.3", features = ["tokio"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
12 changes: 2 additions & 10 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,7 @@ where
}

async fn _connect(&self, address: A) -> Result<Client, ConnectError> {
let io = self
.connector
.clone()
.service_call(Connect::new(address))
.await?;
let io = self.connector.call(Connect::new(address)).await?;
let config = self.config.clone();
let pool = self.pool;
let disconnect = self.disconnect_timeout;
Expand Down Expand Up @@ -207,11 +203,7 @@ where
}

async fn _connect_sasl(&self, addr: A, auth: SaslAuth) -> Result<Client, ConnectError> {
let io = self
.connector
.clone()
.service_call(Connect::new(addr))
.await?;
let io = self.connector.call(Connect::new(addr)).await?;
let config = self.config.clone();
let pool = self.pool;
let disconnect = self.disconnect_timeout;
Expand Down
37 changes: 19 additions & 18 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ where
}

fn call_control_service(&self, frame: ControlFrame) {
let ctl = self.ctl_service.clone();
self.ctl_fut.borrow_mut().push((
frame.clone(),
Box::pin(async move { ctl.call(frame).await }),
));
let fut = self.ctl_service.call_static(frame.clone());
self.ctl_fut
.borrow_mut()
.push((frame, Box::pin(async move { fut.await })));

Check warning on line 69 in src/dispatcher.rs

View workflow job for this annotation

GitHub Actions / clippy

this async expression only awaits a single future

warning: this async expression only awaits a single future --> src/dispatcher.rs:69:36 | 69 | .push((frame, Box::pin(async move { fut.await }))); | ^^^^^^^^^^^^^^^^^^^^^^^^ help: you can reduce it to: `fut` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_async_block = note: `#[warn(clippy::redundant_async_block)]` on by default
self.ctl_queue.waker.wake();
}

Expand Down Expand Up @@ -154,11 +153,11 @@ where
ControlFrameKind::AttachReceiver(ref frm, ref link) => {
let link = link.clone();
let frm = frm.clone();
let service = self.service.clone();
let fut = self
.service
.call_static(types::Message::Attached(frm.clone(), link.clone()));
ntex::rt::spawn(async move {
let result = service
.call(types::Message::Attached(frm.clone(), link.clone()))
.await;
let result = fut.await;
if let Err(err) = result {
let _ = link.close_with_error(Error::from(err)).await;
} else {
Expand Down Expand Up @@ -257,11 +256,11 @@ where
let sink = self.sink.0.get_mut();
sink.on_close.notify();
sink.set_error(AmqpProtocolError::Disconnected);
let ctl_service = self.ctl_service.clone();
let fut = self
.ctl_service
.call_static(ControlFrame::new_kind(ControlFrameKind::Closed));
*shutdown = Some(Box::pin(async move {
let _ = ctl_service
.call(ControlFrame::new_kind(ControlFrameKind::Closed))
.await;
let _ = fut.await;
}));
}

Expand Down Expand Up @@ -298,7 +297,7 @@ where
types::Action::Transfer(link) => {
return Either::Left(ServiceResult {
link: link.clone(),
fut: self.service.service_call(types::Message::Transfer(link)),
fut: self.service.call(types::Message::Transfer(link)),
_t: marker::PhantomData,
});
}
Expand Down Expand Up @@ -329,9 +328,9 @@ where
}
types::Action::DetachReceiver(link, frm) => {
let lnk = link.clone();
let svc = self.service.clone();
let fut = self.service.call_static(types::Message::Detached(lnk));
spawn(async move {
let _ = svc.call(types::Message::Detached(lnk)).await;
let _ = fut.await;
});
self.call_control_service(ControlFrame::new(
link.session().inner.clone(),
Expand All @@ -347,9 +346,11 @@ where
})
.collect();

let svc = self.service.clone();
let fut = self
.service
.call_static(types::Message::DetachedAll(receivers));
spawn(async move {
let _ = svc.call(types::Message::DetachedAll(receivers)).await;
let _ = fut.await;
});
self.call_control_service(ControlFrame::new_kind(
ControlFrameKind::RemoteSessionEnded(links),
Expand Down
2 changes: 1 addition & 1 deletion src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl<'f, S> Future for RouterServiceResponse<'f, S> {
};

this.state =
RouterServiceResponseState::Transfer(srv.service_call(tr), delivery_id);
RouterServiceResponseState::Transfer(srv.call(tr), delivery_id);
} else {
return Poll::Ready(Ok(()));
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ where

// handshake protocol
let ack = handshake
.service_call(if protocol == ProtocolId::Amqp {
.call(if protocol == ProtocolId::Amqp {
Handshake::new_plain(state, inner.config.clone())
} else {
Handshake::new_sasl(state, inner.config.clone())
Expand Down

0 comments on commit 4bd13b5

Please sign in to comment.