Skip to content

Commit

Permalink
Better handling session end for inflight deliveries (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jun 4, 2024
1 parent 971a188 commit cff739e
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.0.1] - 2024-06-04

* Better handling "session end" for inflight deliveries

## [3.0.0] - 2024-05-28

* Use ntex-service 3.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "3.0.0"
version = "3.0.1"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
7 changes: 6 additions & 1 deletion src/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl Delivery {

pub async fn wait(&self) -> Result<Option<DeliveryState>, AmqpProtocolError> {
if self.flags.get().contains(Flags::LOCAL_SETTLED) {
log::error!("Delivery {:?} is settled locally", self.id);
return Ok(None);
}

Expand All @@ -166,7 +167,8 @@ impl Delivery {
inner.tx = Some(tx);
rx
} else {
return Ok(None);
// session ended
return Err(AmqpProtocolError::LinkDetached(None));
};
if rx.await.is_err() {
return Err(AmqpProtocolError::ConnectionDropped);
Expand All @@ -185,6 +187,9 @@ impl Delivery {
if let Some(st) = self.check_inner(inner) {
return st;
}
} else {
// session ended
return Err(AmqpProtocolError::LinkDetached(None));
}
Ok(None)
}
Expand Down
2 changes: 0 additions & 2 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ async fn test_simple() -> std::io::Result<()> {

#[ntex::test]
async fn test_large_transfer() -> std::io::Result<()> {
env_logger::init();

let mut rng = thread_rng();
let data: String = (0..2048)
.map(|_| rng.sample(Alphanumeric) as char)
Expand Down

0 comments on commit cff739e

Please sign in to comment.