From cff739ee609db824aac73097b69993c508cc133b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 4 Jun 2024 14:58:31 +0500 Subject: [PATCH] Better handling session end for inflight deliveries (#54) --- CHANGES.md | 4 ++++ Cargo.toml | 2 +- src/delivery.rs | 7 ++++++- tests/test_server.rs | 2 -- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9b88447..790aa23 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [3.0.1] - 2024-06-04 + +* Better handling "session end" for inflight deliveries + ## [3.0.0] - 2024-05-28 * Use ntex-service 3.0 diff --git a/Cargo.toml b/Cargo.toml index 68619fc..0593496 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "3.0.0" +version = "3.0.1" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" diff --git a/src/delivery.rs b/src/delivery.rs index d833aca..5799aae 100644 --- a/src/delivery.rs +++ b/src/delivery.rs @@ -148,6 +148,7 @@ impl Delivery { pub async fn wait(&self) -> Result, AmqpProtocolError> { if self.flags.get().contains(Flags::LOCAL_SETTLED) { + log::error!("Delivery {:?} is settled locally", self.id); return Ok(None); } @@ -166,7 +167,8 @@ impl Delivery { inner.tx = Some(tx); rx } else { - return Ok(None); + // session ended + return Err(AmqpProtocolError::LinkDetached(None)); }; if rx.await.is_err() { return Err(AmqpProtocolError::ConnectionDropped); @@ -185,6 +187,9 @@ impl Delivery { if let Some(st) = self.check_inner(inner) { return st; } + } else { + // session ended + return Err(AmqpProtocolError::LinkDetached(None)); } Ok(None) } diff --git a/tests/test_server.rs b/tests/test_server.rs index 1897455..dc96deb 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -94,8 +94,6 @@ async fn test_simple() -> std::io::Result<()> { #[ntex::test] async fn test_large_transfer() -> std::io::Result<()> { - env_logger::init(); - let mut rng = thread_rng(); let data: String = (0..2048) .map(|_| rng.sample(Alphanumeric) as char)