Skip to content

Commit

Permalink
Allow to set Transfer format
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jan 8, 2025
1 parent 48236a8 commit dc587e9
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.4.0] - 2025-01-08

* Allow to set Transfer format

## [3.3.1] - 2025-01-02

* Fix rcv-settle-mode for sender links
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "3.3.1"
version = "3.4.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -26,7 +26,6 @@ frame-trace = []

[dependencies]
ntex = "2.10"
ntex-io = "2.9.2"
ntex-amqp-codec = "0.9"

bitflags = "2"
Expand Down
11 changes: 9 additions & 2 deletions src/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cell::Cell as StdCell;
use ntex::{channel::pool, util::Bytes};
use ntex_amqp_codec::protocol::{
DeliveryNumber, DeliveryState, Disposition, DispositionInner, Error, ErrorCondition, Handle,
Rejected, Role, TransferBody,
MessageFormat, Rejected, Role, TransferBody,
};
use ntex_amqp_codec::types::{Str, Symbol};

Expand Down Expand Up @@ -292,6 +292,7 @@ pub struct DeliveryBuilder {
tag: Option<Bytes>,
settled: bool,
data: TransferBody,
format: Option<MessageFormat>,
sender: Cell<SenderLinkInner>,
}

Expand All @@ -300,6 +301,7 @@ impl DeliveryBuilder {
Self {
tag: None,
settled: false,
format: None,
data,
sender,
}
Expand All @@ -315,6 +317,11 @@ impl DeliveryBuilder {
self
}

pub fn format(mut self, fmt: MessageFormat) -> Self {
self.format = Some(fmt);
self
}

pub async fn send(self) -> Result<Delivery, AmqpProtocolError> {
let inner = self.sender.get_ref();

Expand All @@ -332,7 +339,7 @@ impl DeliveryBuilder {
let (id, tag) = self
.sender
.get_mut()
.send(self.data, self.tag, self.settled)
.send(self.data, self.tag, self.settled, self.format)
.await?;

Ok(Delivery {
Expand Down
11 changes: 8 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use slab::Slab;

use ntex_amqp_codec::protocol::{
self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End,
Error, Flow, Frame, Handle, ReceiverSettleMode, Role, SenderSettleMode, Source, Transfer,
TransferBody, TransferNumber,
Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source,
Transfer, TransferBody, TransferNumber,
};
use ntex_amqp_codec::{AmqpFrame, Encode};

Expand Down Expand Up @@ -1217,6 +1217,7 @@ impl SessionInner {
tag: Bytes,
body: TransferBody,
settled: bool,
format: Option<MessageFormat>,
) -> Result<DeliveryNumber, AmqpProtocolError> {
loop {
if self.remote_incoming_window == 0 {
Expand Down Expand Up @@ -1247,7 +1248,11 @@ impl SessionInner {
} else {
None
};
let message_format = body.message_format();
let message_format = if format.is_none() {
body.message_format()
} else {
format
};

let max_frame_size = self.max_frame_size();
let max_frame_size = if max_frame_size > 2048 {
Expand Down
7 changes: 4 additions & 3 deletions src/sndlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::{collections::VecDeque, future::Future};
use ntex::channel::{condition, oneshot, pool};
use ntex::util::{BufMut, ByteString, Bytes, Either, PoolRef, Ready};
use ntex_amqp_codec::protocol::{
self as codec, Attach, DeliveryNumber, Error, Flow, ReceiverSettleMode, Role, SenderSettleMode,
SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
self as codec, Attach, DeliveryNumber, Error, Flow, MessageFormat, ReceiverSettleMode, Role,
SenderSettleMode, SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
};

use crate::delivery::DeliveryBuilder;
Expand Down Expand Up @@ -321,6 +321,7 @@ impl SenderLinkInner {
body: T,
tag: Option<Bytes>,
settled: bool,
format: Option<MessageFormat>,
) -> Result<(DeliveryNumber, Bytes), AmqpProtocolError> {
if let Some(ref err) = self.error {
Err(err.clone())
Expand Down Expand Up @@ -355,7 +356,7 @@ impl SenderLinkInner {
.session
.inner
.get_mut()
.send_transfer(self.id as u32, tag.clone(), body, settled)
.send_transfer(self.id as u32, tag.clone(), body, settled, format)
.await?;

Ok((id, tag))
Expand Down

0 comments on commit dc587e9

Please sign in to comment.