diff --git a/nats/src/jetstream/types.rs b/nats/src/jetstream/types.rs index 56af145b4..368182d83 100644 --- a/nats/src/jetstream/types.rs +++ b/nats/src/jetstream/types.rs @@ -224,7 +224,12 @@ pub struct ConsumerConfig { #[serde(default, skip_serializing_if = "is_default")] pub rate_limit: u64, /// What percentage of acknowledgments should be samples for observability, 0-100 - #[serde(default, skip_serializing_if = "is_default")] + #[serde( + rename = "sample_freq", + with = "from_str", + default, + skip_serializing_if = "is_default" + )] pub sample_frequency: u8, /// The maximum number of waiting consumers. #[serde(default, skip_serializing_if = "is_default")] @@ -254,6 +259,26 @@ pub struct ConsumerConfig { pub inactive_threshold: Duration, } +mod from_str { + pub fn deserialize<'de, T, D>(deserializer: D) -> Result + where + T: std::str::FromStr, + T::Err: std::fmt::Display, + D: serde::Deserializer<'de>, + { + let s = ::deserialize(deserializer)?; + T::from_str(&s).map_err(serde::de::Error::custom) + } + + pub fn serialize(value: &T, serializer: S) -> Result + where + T: std::fmt::Display, + S: serde::Serializer, + { + serializer.serialize_str(&value.to_string()) + } +} + pub(crate) enum ConsumerKind { Pull, } diff --git a/nats/tests/jetstream.rs b/nats/tests/jetstream.rs index 89a8a8f98..330fdc244 100644 --- a/nats/tests/jetstream.rs +++ b/nats/tests/jetstream.rs @@ -783,6 +783,32 @@ fn jetstream_pull_subscribe_bad_stream() { .expect_err("expected not found stream for a given subject"); } +#[test] +fn jetstream_consumer_configs_sample_frequency() { + let s = nats_server::run_server("tests/configs/jetstream.conf"); + let nc = nats::Options::new() + .error_callback(|err| println!("error!: {err}")) + .connect(s.client_url()) + .unwrap(); + let js = nats::jetstream::new(nc); + + let sconfig = StreamConfig { + name: "SampledStream".into(), + ..Default::default() + }; + js.add_stream(sconfig).unwrap(); + + let cconfig = ConsumerConfig { + durable_name: Some("SampledConsumer".into()), + filter_subject: "SampledSubject".into(), + sample_frequency: 80, + ..Default::default() + }; + let consumer = js.add_consumer("SampledStream", cconfig).unwrap(); + + assert_eq!(80, consumer.config.sample_frequency); +} + // Helper function to return server and client. pub fn run_basic_jetstream() -> (nats_server::Server, Connection, JetStream) { let s = nats_server::run_server("tests/configs/jetstream.conf");