Skip to content

Commit

Permalink
Fix control queue handling (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Dec 3, 2024
1 parent 6513599 commit 2f3f1aa
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 191 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.2.0] - 2024-12-03

* Fix control queue handling

## [3.1.0] - 2024-12-01

* Set "next_incoming_id" for Flow frame
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "3.1.0"
version = "3.2.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -25,7 +25,8 @@ default = []
frame-trace = []

[dependencies]
ntex = "2"
ntex = "2.9"
ntex-util = "2.7"
ntex-amqp-codec = "0.9"

bitflags = "2"
Expand Down
2 changes: 1 addition & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ntex::util::{HashMap, PoolRef, Ready};

use crate::codec::protocol::{self as codec, Begin, Close, End, Error, Frame, Role};
use crate::codec::{AmqpCodec, AmqpFrame};
use crate::dispatcher::ControlQueue;
use crate::control::ControlQueue;
use crate::session::{Session, SessionInner, INITIAL_NEXT_OUTGOING_ID};
use crate::sndlink::{SenderLink, SenderLinkInner};
use crate::{cell::Cell, error::AmqpProtocolError, types::Action, Configuration};
Expand Down
17 changes: 15 additions & 2 deletions src/control.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fmt, io};
use std::{cell::RefCell, collections::VecDeque, fmt, io};

use ntex::util::Either;
use ntex::{task::LocalWaker, util::Either};
use ntex_amqp_codec::protocol;

use crate::cell::Cell;
Expand Down Expand Up @@ -72,3 +72,16 @@ impl ControlFrame {
self.0.get_ref().session.clone().map(Session::new)
}
}

#[derive(Default, Debug)]
pub(crate) struct ControlQueue {
pub(crate) pending: RefCell<VecDeque<ControlFrame>>,
pub(crate) waker: LocalWaker,
}

impl ControlQueue {
pub(crate) fn enqueue_frame(&self, frame: ControlFrame) {
self.pending.borrow_mut().push_back(frame);
self.waker.wake();
}
}
Loading

0 comments on commit 2f3f1aa

Please sign in to comment.