From 114f1dc1820189b93ceb228c229a64838e7a2867 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 9 Oct 2023 14:04:42 +0600 Subject: [PATCH] Fix cradit handling --- CHANGES.md | 4 ++++ Cargo.toml | 6 +++--- src/lib.rs | 2 +- src/session.rs | 13 ++++++++----- src/sndlink.rs | 19 ++++++++++++------- tests/test_server.rs | 2 +- 6 files changed, 29 insertions(+), 17 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 96f1cff..3b0c469 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.8.4] - 2023-10-09 + +* Fix cradit handling + ## [0.8.2] - 2023-08-10 * Update ntex deps diff --git a/Cargo.toml b/Cargo.toml index 01d0179..7925261 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "0.8.3" +version = "0.8.4" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -24,7 +24,7 @@ default = [] frame-trace = [] [dependencies] -ntex = "0.7.3" +ntex = "0.7.4" ntex-amqp-codec = "0.9.0" bitflags = "2.4" @@ -36,7 +36,7 @@ uuid = { version = "1", features = ["v4"] } [dev-dependencies] env_logger = "0.10" -ntex = { version = "0.7.3", features = ["tokio"] } +ntex = { version = "0.7.4", features = ["tokio"] } [patch.crates-io] ntex-amqp = { path = "." } diff --git a/src/lib.rs b/src/lib.rs index 50ac22d..3418311 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![deny(rust_2018_idioms, warnings, unreachable_pub)] -#![allow(clippy::type_complexity)] +#![allow(clippy::type_complexity, clippy::let_underscore_future)] #[macro_use] extern crate derive_more; diff --git a/src/session.rs b/src/session.rs index cba4bc5..4ffb5ac 100644 --- a/src/session.rs +++ b/src/session.rs @@ -285,7 +285,7 @@ pub(crate) enum TransferState { } impl TransferState { - fn more(&self) -> bool { + pub(super) fn more(&self) -> bool { !matches!(self, TransferState::Only(_, _) | TransferState::Last) } } @@ -1094,8 +1094,8 @@ impl SessionInner { self.remote_incoming_window = flow .next_incoming_id() .unwrap_or(INITIAL_OUTGOING_ID) - .saturating_add(flow.incoming_window()) - .saturating_sub(self.next_outgoing_id); + .wrapping_add(flow.incoming_window()) + .wrapping_sub(self.next_outgoing_id); trace!( "Session received credit {:?}. window: {}, pending: {}", @@ -1181,7 +1181,10 @@ impl SessionInner { message_format, }); } else { - self.remote_incoming_window -= 1; + let more = state.more(); + if !more { + self.remote_incoming_window -= 1; + } let settled2 = settled.unwrap_or(false); let tr_settled = if settled2 { @@ -1210,7 +1213,7 @@ impl SessionInner { TransferState::First(promise, delivery_tag) | TransferState::Only(promise, delivery_tag) => { let delivery_id = self.next_outgoing_id; - self.next_outgoing_id += 1; + self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1); transfer.0.more = more; transfer.0.batchable = more; diff --git a/src/sndlink.rs b/src/sndlink.rs index 03cfcc0..a10e524 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -341,8 +341,8 @@ impl SenderLinkInner { let delta = flow .delivery_count() .unwrap_or(0) - .saturating_add(credit) - .saturating_sub(self.delivery_count); + .wrapping_add(credit) + .wrapping_sub(self.delivery_count); trace!( "Apply sender link {:?} flow, credit: {:?}({:?}) flow count: {:?}, delivery count: {:?} pending: {:?} new credit {:?}", @@ -361,8 +361,10 @@ impl SenderLinkInner { // credit became available => drain pending_transfers while self.link_credit > 0 { if let Some(transfer) = self.pending_transfers.pop_front() { - self.link_credit -= 1; - self.delivery_count = self.delivery_count.saturating_add(1); + if !transfer.state.more() { + self.link_credit -= 1; + } + self.delivery_count = self.delivery_count.wrapping_add(1); session.send_transfer( self.id as u32, transfer.body, @@ -545,8 +547,11 @@ impl SenderLinkInner { body: Some(body), }); } else { - self.link_credit -= 1; - self.delivery_count = self.delivery_count.saturating_add(1); + // reduce link credit only if transfer is last + if !state.more() { + self.link_credit -= 1; + } + self.delivery_count = self.delivery_count.wrapping_add(1); self.session.inner.get_mut().send_transfer( self.id as u32, Some(body), @@ -572,7 +577,7 @@ impl SenderLinkInner { fn get_tag(&mut self, tag: Option) -> Bytes { tag.unwrap_or_else(|| { let delivery_tag = self.delivery_tag; - self.delivery_tag = delivery_tag.saturating_add(1); + self.delivery_tag = delivery_tag.wrapping_add(1); let mut buf = self.pool.buf_with_capacity(16); buf.put_u32(delivery_tag); diff --git a/tests/test_server.rs b/tests/test_server.rs index cc3aa1a..f4ec650 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -183,9 +183,9 @@ async fn test_session_end() -> std::io::Result<()> { .await .unwrap(); link.send(Bytes::from(b"test".as_ref())).await.unwrap(); + session.end().await.unwrap(); sleep(Millis(150)).await; - session.end().await.unwrap(); assert_eq!(link_names.lock().unwrap()[0], "test"); assert!(sink.is_opened());