Skip to content

Commit c275f19

Browse files
Support for publish/consuming messages
Co-authored-by: Michael Davis <[email protected]>
1 parent 798cdfa commit c275f19

File tree

4 files changed

+240
-39
lines changed

4 files changed

+240
-39
lines changed

src/blocking.rs

+60-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
commons::{BindingDestinationType, UserLimitTarget, VirtualHostLimitTarget},
33
requests::{
4-
EnforcedLimitParams, ExchangeParams, Permissions, PolicyParams, QueueParams,
4+
self, EnforcedLimitParams, ExchangeParams, Permissions, PolicyParams, QueueParams,
55
RuntimeParameterDefinition, UserParams, VirtualHostParams, XArguments,
66
},
77
responses::{self, BindingInfo},
@@ -1051,6 +1051,65 @@ impl<'a> Client<'a> {
10511051
))
10521052
}
10531053

1054+
//
1055+
// Publish and consume messages
1056+
//
1057+
pub fn publish_message(
1058+
&self,
1059+
vhost: &str,
1060+
exchange: &str,
1061+
routing_key: &str,
1062+
payload: &str,
1063+
properties: requests::MessageProperties,
1064+
) -> Result<responses::MessageRouted> {
1065+
let url = format!(
1066+
"exchanges/{}/{}/publish",
1067+
self.percent_encode(vhost),
1068+
self.percent_encode(exchange)
1069+
);
1070+
1071+
let body = serde_json::json!({
1072+
"routing_key": routing_key,
1073+
"payload": payload,
1074+
"payload_encoding": "string",
1075+
"properties": properties,
1076+
});
1077+
1078+
let response = self.http_post(&url, &body)?;
1079+
1080+
let response2 = self.ok_or_status_code_error(response)?;
1081+
response2
1082+
.json::<responses::MessageRouted>()
1083+
.map_err(Error::from)
1084+
}
1085+
1086+
pub fn get_messages(
1087+
&self,
1088+
vhost: &str,
1089+
queue: &str,
1090+
count: i32,
1091+
ack_mode: &str,
1092+
) -> Result<Vec<responses::GetMessage>> {
1093+
let url = format!(
1094+
"queues/{}/{}/get",
1095+
self.percent_encode(vhost),
1096+
self.percent_encode(queue)
1097+
);
1098+
1099+
let body = serde_json::json!({
1100+
"count": count,
1101+
"ackmode": ack_mode,
1102+
"encoding": "auto"
1103+
});
1104+
1105+
let response = self.http_post(&url, &body)?;
1106+
1107+
let response2 = self.ok_or_status_code_error(response)?;
1108+
response2
1109+
.json::<Vec<responses::GetMessage>>()
1110+
.map_err(Error::from)
1111+
}
1112+
10541113
//
10551114
// Implementation
10561115
//

src/requests.rs

+2
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,5 @@ pub struct Permissions<'a> {
269269
pub read: &'a str,
270270
pub write: &'a str,
271271
}
272+
273+
pub type MessageProperties = Map<String, Value>;

src/responses.rs

+100-38
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ impl fmt::Display for XArguments {
6666
}
6767
}
6868

69-
#[derive(Debug, Deserialize, Clone)]
69+
#[derive(Debug, Deserialize, Clone, Default)]
70+
#[serde(transparent)]
7071
pub struct RuntimeParameterValue(pub Map<String, serde_json::Value>);
7172
impl fmt::Display for RuntimeParameterValue {
7273
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -524,43 +525,6 @@ pub struct RuntimeParameter {
524525
pub value: RuntimeParameterValue,
525526
}
526527

527-
fn deserialize_runtime_parameter_value<'de, D>(
528-
deserializer: D,
529-
) -> Result<RuntimeParameterValue, D::Error>
530-
where
531-
D: serde::Deserializer<'de>,
532-
{
533-
struct RuntimeParameterValueVisitor;
534-
535-
impl<'de> Visitor<'de> for RuntimeParameterValueVisitor {
536-
type Value = RuntimeParameterValue;
537-
538-
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
539-
formatter.write_str("a runtime parameter")
540-
}
541-
542-
fn visit_seq<A>(self, _seq: A) -> Result<Self::Value, A::Error>
543-
where
544-
A: serde::de::SeqAccess<'de>,
545-
{
546-
// Always deserialize the value as a map, even if the server
547-
// sends a sequence.
548-
Ok(RuntimeParameterValue(Map::new()))
549-
}
550-
551-
fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
552-
where
553-
A: MapAccess<'de>,
554-
{
555-
let deserializer = serde::de::value::MapAccessDeserializer::new(map);
556-
let m = Deserialize::deserialize(deserializer)?;
557-
Ok(RuntimeParameterValue(m))
558-
}
559-
}
560-
561-
deserializer.deserialize_any(RuntimeParameterValueVisitor)
562-
}
563-
564528
#[derive(Debug, Deserialize, Clone)]
565529
#[allow(dead_code)]
566530
pub struct ClusterIdentity {
@@ -639,6 +603,104 @@ pub struct QuorumEndangeredQueue {
639603
pub queue_type: String,
640604
}
641605

606+
#[derive(Debug, Deserialize, Clone, Eq, PartialEq)]
607+
#[cfg_attr(feature = "tabled", derive(Tabled))]
608+
#[allow(dead_code)]
609+
pub struct GetMessage {
610+
pub payload_bytes: i32,
611+
pub redelivered: bool,
612+
pub exchange: String,
613+
pub routing_key: String,
614+
pub message_count: i32,
615+
#[serde(deserialize_with = "deserialize_message_properties")]
616+
pub properties: MessageProperties,
617+
pub payload: String,
618+
pub payload_encoding: String,
619+
}
620+
621+
#[derive(Debug, Deserialize, Clone, Eq, PartialEq)]
622+
pub struct MessageRouted {
623+
pub routed: bool,
624+
}
625+
626+
impl fmt::Display for MessageRouted {
627+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
628+
match self.routed {
629+
true => write!(f, "Message published and routed successfully"),
630+
false => write!(f, "Message published but NOT routed"),
631+
}
632+
}
633+
}
634+
635+
#[derive(Debug, Deserialize, Clone, Eq, PartialEq, Default)]
636+
#[serde(transparent)]
637+
pub struct MessageProperties(pub Map<String, serde_json::Value>);
638+
639+
impl fmt::Display for MessageProperties {
640+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641+
for (k, v) in &self.0 {
642+
writeln!(f, "{}: {}", k, v)?;
643+
}
644+
645+
Ok(())
646+
}
647+
}
648+
642649
fn undefined() -> String {
643650
"?".to_string()
644651
}
652+
653+
fn deserialize_map_or_seq<'de, T, D>(deserializer: D) -> Result<T, D::Error>
654+
where
655+
T: Default + serde::Deserialize<'de>,
656+
D: serde::Deserializer<'de>,
657+
{
658+
struct MapVisitor<T> {
659+
default: T,
660+
}
661+
662+
impl<'de, T: serde::Deserialize<'de>> Visitor<'de> for MapVisitor<T> {
663+
type Value = T;
664+
665+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
666+
formatter.write_str("map")
667+
}
668+
669+
fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
670+
where
671+
A: MapAccess<'de>,
672+
{
673+
let deserializer = serde::de::value::MapAccessDeserializer::new(map);
674+
let m = Deserialize::deserialize(deserializer)?;
675+
Ok(m)
676+
}
677+
678+
fn visit_seq<A>(self, _seq: A) -> Result<Self::Value, A::Error>
679+
where
680+
A: serde::de::SeqAccess<'de>,
681+
{
682+
// Treat a sequence as the default for the type.
683+
Ok(self.default)
684+
}
685+
}
686+
687+
deserializer.deserialize_any(MapVisitor {
688+
default: T::default(),
689+
})
690+
}
691+
692+
fn deserialize_message_properties<'de, D>(deserializer: D) -> Result<MessageProperties, D::Error>
693+
where
694+
D: serde::Deserializer<'de>,
695+
{
696+
deserialize_map_or_seq::<MessageProperties, D>(deserializer)
697+
}
698+
699+
fn deserialize_runtime_parameter_value<'de, D>(
700+
deserializer: D,
701+
) -> Result<RuntimeParameterValue, D::Error>
702+
where
703+
D: serde::Deserializer<'de>,
704+
{
705+
deserialize_map_or_seq::<RuntimeParameterValue, D>(deserializer)
706+
}

tests/message_tests.rs

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use rabbitmq_http_client::{
2+
blocking::Client,
3+
requests::{self, QueueParams},
4+
responses::{GetMessage, MessageProperties, MessageRouted},
5+
};
6+
use serde_json::{json, Map, Value};
7+
8+
mod common;
9+
use crate::common::{endpoint, PASSWORD, USERNAME};
10+
11+
#[test]
12+
fn test_publish_and_get() {
13+
let endpoint = endpoint();
14+
let rc = Client::new(&endpoint).with_basic_auth_credentials(USERNAME, PASSWORD);
15+
let vhost = "/";
16+
let queue = "rust.tests.cq.publish_and_get";
17+
18+
let _ = rc.delete_queue(vhost, queue);
19+
20+
let params = QueueParams::new_durable_classic_queue(queue, None);
21+
let result2 = rc.declare_queue(vhost, &params);
22+
assert!(result2.is_ok(), "declare_queue returned {:?}", result2);
23+
24+
let result3 = rc.publish_message(
25+
vhost,
26+
"",
27+
queue,
28+
"rust test 1",
29+
requests::MessageProperties::default(),
30+
);
31+
assert!(result3.is_ok(), "get_messages returned {:?}", result3);
32+
assert_eq!(result3.unwrap(), MessageRouted { routed: true });
33+
34+
let mut props = Map::<String, Value>::new();
35+
props.insert(String::from("timestamp"), json!(123456789));
36+
let result4 = rc.publish_message(vhost, "", queue, "rust test 2", props.clone());
37+
assert!(result4.is_ok(), "get_messages returned {:?}", result4);
38+
assert_eq!(result4.unwrap(), MessageRouted { routed: true });
39+
40+
let result5 = rc.get_messages(vhost, queue, 1, "ack_requeue_false");
41+
assert!(result5.is_ok(), "get_messages returned {:?}", result5);
42+
43+
let result6 = result5.unwrap();
44+
assert_eq!(
45+
result6,
46+
[GetMessage {
47+
payload_bytes: 11,
48+
redelivered: false,
49+
exchange: "".to_owned(),
50+
routing_key: "rust.tests.cq.publish_and_get".to_owned(),
51+
message_count: 1,
52+
properties: MessageProperties::default(),
53+
payload: "rust test 1".to_owned(),
54+
payload_encoding: "string".to_owned()
55+
}]
56+
);
57+
58+
let result7 = rc.get_messages(vhost, queue, 1, "ack_requeue_false");
59+
assert!(result7.is_ok(), "get_messages returned {:?}", result7);
60+
61+
let props = rabbitmq_http_client::responses::MessageProperties(props);
62+
let result8 = result7.unwrap();
63+
assert_eq!(
64+
result8,
65+
[GetMessage {
66+
payload_bytes: 11,
67+
redelivered: false,
68+
exchange: "".to_owned(),
69+
routing_key: "rust.tests.cq.publish_and_get".to_owned(),
70+
message_count: 0,
71+
properties: props,
72+
payload: "rust test 2".to_owned(),
73+
payload_encoding: "string".to_owned()
74+
}]
75+
);
76+
77+
rc.delete_queue(vhost, queue).unwrap();
78+
}

0 commit comments

Comments
 (0)