diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9247a8399..7d48b807f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -94,6 +94,32 @@ jobs: rustup update cargo test reconnect_test --features=fault_injection -- --ignored + doctests: + name: doctests + runs-on: ubuntu-latest + steps: + - name: Cache Rust + uses: actions/cache@v1 + env: + cache-name: cache-rust + with: + path: target + key: ${{ runner.os }}-${{ env.cache-name }} + restore-keys: | + ${{ runner.os }}- + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: '1.14' + - name: install nats-server + run: go get github.com/nats-io/nats-server + - name: JetStream test + env: + RUST_LOG: trace + run: | + rustup update + cargo test --doc --features=jetstream + jetstream: name: JetStream runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index 856e555cd..6ea764176 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# 0.9.6 + +## New Features + +- JetStream consumers are now better supported + through the `Consumer::process*` methods, + which also perform message deduplication + backed by an interval tree. + # 0.9.5 ## New Features diff --git a/Cargo.toml b/Cargo.toml index 1d739ce44..26d59beff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nats" -version = "0.9.5" +version = "0.9.6" description = "A Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index b12dc6b81..b52fe2af8 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-nats" -version = "0.9.5" +version = "0.9.6" description = "An async Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" @@ -17,7 +17,7 @@ maintenance = { status = "actively-developed" } [dependencies] blocking = "1.0.2" -nats = { path = "..", version = "0.9.5" } +nats = { path = "..", version = "0.9.6" } [dev-dependencies] smol = "1.2.5" diff --git a/src/client.rs b/src/client.rs index 8254ce6e0..c609ebba8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -619,6 +619,7 @@ impl Client { data: payload, headers: None, client: self.clone(), + double_acked: Default::default(), }; // Send a message or drop it if the channel is @@ -642,6 +643,7 @@ impl Client { data: payload, headers: Some(headers), client: self.clone(), + double_acked: Default::default(), }; // Send a message or drop it if the channel is diff --git a/src/jetstream.rs b/src/jetstream.rs index 080599120..85444aa90 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -54,10 +54,7 @@ //! //! nc.create_stream("my_stream")?; //! -//! let consumer: Consumer = nc.create_consumer("my_stream", "my_consumer")?; -//! -//! // process a single item, sending an ack if the closure returns `Ok`. -//! consumer.process +//! let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", "my_consumer")?; //! //! # Ok(()) } //! ``` @@ -72,7 +69,7 @@ //! //! nc.create_stream("my_stream")?; //! -//! let consumer: Consumer = nc.create_consumer("my_stream", ConsumerConfig { +//! let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", ConsumerConfig { //! durable_name: Some("my_consumer".to_string()), //! deliver_subject: Some("my_push_consumer_subject".to_string()), //! ack_policy: AckPolicy::All, @@ -109,39 +106,73 @@ //! //! let nc = nats::connect("my_server::4222")?; //! -//! // this will create the consumer if it does not exist already -//! let consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; +//! // this will create the consumer if it does not exist already. +//! // consumer must be mut because the `process*` methods perform +//! // message deduplication using an interval tree, which is +//! // also publicly accessible via the `Consumer`'s `dedupe_window` +//! // field. +//! let mut consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; //! -//! // wait indefinitely for the message to arrive +//! // The `Consumer::process` method executes a closure +//! // on both push- and pull-based consumers, and if +//! // the closure returns `Ok` then the message is acked. +//! // If no message is available, it will wait forever +//! // for one to arrive. //! let msg_data_len: usize = consumer.process(|msg| { //! println!("got message {:?}", msg); -//! msg.data.len() +//! Ok(msg.data.len()) //! })?; //! -//! // wait until the consumer's `timeout` field for the message to arrive. -//! // This can be set manually, and has a very low default of 5ms. +//! // Similar to `Consumer::process` except wait until the +//! // consumer's `timeout` field for the message to arrive. +//! // This can and should be set manually, as it has a low +//! // default of 5ms. //! let msg_data_len: usize = consumer.process_timeout(|msg| { //! println!("got message {:?}", msg); -//! msg.data.len() +//! Ok(msg.data.len()) //! })?; //! -//! // wait indefinitely for the first message in a batch, then process -//! // more messages until the configured timeout is expired +//! // For consumers operating with `AckPolicy::All`, batch +//! // processing can provide nice throughput optimizations. +//! // `Consumer::process_batch` will wait indefinitely for +//! // the first message in a batch, then process +//! // more messages until the configured timeout is expired. +//! // It will batch acks if running with `AckPolicy::All`. +//! // If there is an error with acking, the last item in the +//! // returned `Vec` will be the io error. Terminates early +//! // without acking if the closure returns an `Err`, which +//! // is included in the final element of the `Vec`. If a +//! // Timeout happens before the batch size is reached, then +//! // there will be no errors included in the response `Vec`. //! let batch_size = 128; -//! let results: Vec = consumer.process_batch(batch_size, |msg| { +//! let results: Vec> = consumer.process_batch(batch_size, |msg| { //! println!("got message {:?}", msg); -//! msg.data.len() -//! })?; +//! Ok(msg.data.len()) +//! }); +//! let flipped: std::io::Result> = results.into_iter().collect(); +//! let sizes: Vec = flipped?; +//! +//! // For lower-level control for use cases that are not +//! // well-served by the high-level process* methods, +//! // there are a number of lower level primitives that +//! // can be used, such as `Consumer::pull` for pull-based +//! // consumers and `Message::ack` for manually acking things: +//! let msg = consumer.pull()?; +//! +//! // --- process message --- +//! +//! // tell the server the message has been processed +//! msg.ack()?; //! //! # Ok(()) } //! ``` -//! use std::{ collections::VecDeque, convert::TryFrom, fmt::Debug, io::{self, Error, ErrorKind}, + time::Duration, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -257,7 +288,7 @@ impl NatsClient { let cfg: StreamConfig = stream_config.into(); if cfg.name.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -273,7 +304,7 @@ impl NatsClient { pub fn update_stream(&self, cfg: &StreamConfig) -> io::Result { if cfg.name.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -323,7 +354,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -349,7 +380,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -368,7 +399,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -387,7 +418,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -411,7 +442,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -437,7 +468,7 @@ impl NatsClient { let stream = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -485,14 +516,14 @@ impl NatsClient { let stream = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } let consumer = consumer.as_ref(); if consumer.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the consumer name must not be empty", )); } @@ -523,7 +554,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -549,7 +580,6 @@ impl NatsClient { Res: DeserializeOwned, { let res_msg = self.request(subject, req)?; - println!("got response: {:?}", std::str::from_utf8(&res_msg.data)); let res: ApiResponse = serde_json::de::from_slice(&res_msg.data)?; match res { ApiResponse::Ok(stream_info) => Ok(stream_info), @@ -587,11 +617,11 @@ pub struct Consumer { /// out during `process` and `process_batch`. Defaults /// to 5ms, which is likely to be far too low for /// workloads crossing physical sites. - pub timeout: std::time::Duration, + pub timeout: Duration, /// Contains ranges of processed messages that will be /// filtered out upon future receipt. - pub dedupe_window: RangeTree, + pub dedupe_window: IntervalTree, } impl Consumer { @@ -673,7 +703,7 @@ impl Consumer { stream, cfg, push_subscriber, - timeout: std::time::Duration::from_millis(5), + timeout: Duration::from_millis(5), dedupe_window: Default::default(), }) } @@ -695,34 +725,42 @@ impl Consumer { /// If an error is encountered while subscribing or acking messages /// that may have returned `Ok` from the closure, that Ok will be /// present in the returned vector but the last item in the vector - /// will be the encountered error. + /// will be the encountered error. If the consumer's timeout expires + /// before the entire batch is processed, there will be no error + /// pushed to the returned `Vec`, it will just be shorter than the + /// specified batch size. + /// + /// All messages are deduplicated using the `Consumer`'s built-in + /// `dedupe_window` before being fed to the provided closure. If + /// a message that has already been processed is received, it will + /// be acked and skipped. Errors for acking deduplicated messages + /// are not included in the returned `Vec`. /// /// Requires the `jetstream` feature. - #[doc(hidden)] pub fn process_batch io::Result>( - &self, + &mut self, batch_size: usize, mut f: F, ) -> Vec> { - if self.cfg.durable_name.is_none() { - return vec![Err(Error::new( - ErrorKind::InvalidData, - "process and process_batch are only usable from \ - Pull-based Consumers with a durable_name set", - ))]; - } - - let subject = format!( - "{}CONSUMER.MSG.NEXT.{}.{}", - self.api_prefix(), - self.stream, - self.cfg.durable_name.as_ref().unwrap() - ); - let mut _sub_opt = None; let responses = if let Some(ps) = self.push_subscriber.as_ref() { ps } else { + if self.cfg.durable_name.is_none() { + return vec![Err(Error::new( + ErrorKind::InvalidInput, + "process and process_batch are only usable from \ + Pull-based Consumers if there is a durable_name set", + ))]; + } + + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); + let sub = match self.nc.request_multi(&subject, batch_size.to_string()) { Ok(sub) => sub, @@ -737,23 +775,24 @@ impl Consumer { let start = std::time::Instant::now(); let mut received = 0; - let mut acked = 0; - while let Ok(msg) = responses.next_timeout(if received == 0 { + while let Ok(next) = responses.next_timeout(if received == 0 { // wait "forever" for first message - std::time::Duration::new(std::u64::MAX >> 2, 0) + Duration::new(std::u64::MAX >> 2, 0) } else { self.timeout .checked_sub(start.elapsed()) .unwrap_or_default() }) { - let ret = f(&msg); + let next_id = next.jetstream_message_info().unwrap().stream_seq; - if ret.is_err() { - rets.push(ret); - return rets; + if self.dedupe_window.already_processed(next_id) { + let _dont_care_about_success = next.ack(); + continue; } + let ret = f(&next); + let is_err = ret.is_err(); rets.push(ret); @@ -762,27 +801,24 @@ impl Consumer { // if our ack policy is `All`, after breaking. break; } else if self.cfg.ack_policy == AckPolicy::Explicit { - let res = msg.respond(AckKind::Ack); + self.dedupe_window.mark_processed(next_id); + let res = next.ack(); if let Err(e) = res { - rets.truncate(acked); rets.push(Err(e)); - } else { - acked += 1; } } - last = Some(msg); + last = Some(next); received += 1; if received == batch_size { break; } } - if let Some(msg) = last { + if let Some(last) = last { if self.cfg.ack_policy == AckPolicy::All { - let res = msg.respond(AckKind::Ack); + let res = last.ack(); if let Err(e) = res { - rets.truncate(acked); rets.push(Err(e)); } } @@ -796,37 +832,60 @@ impl Consumer { /// /// Does not ack the processed message if the internal closure returns an `Err`. /// + /// All messages are deduplicated using the `Consumer`'s built-in + /// `dedupe_window` before being fed to the provided closure. If + /// a message that has already been processed is received, it will + /// be acked and skipped. + /// + /// Does not return an `Err` if acking the message is unsuccessful, + /// but the message is still marked in the dedupe window. If you + /// require stronger processing guarantees, you can manually call + /// the `double_ack` method of the argument message. If you require + /// both the returned `Ok` from the closure and the `Err` from a + /// failed ack, use `process_batch` instead. + /// /// Requires the `jetstream` feature. - #[doc(hidden)] pub fn process io::Result>( - &self, + &mut self, f: F, ) -> io::Result { - if self.cfg.durable_name.is_none() { - return Err(Error::new( - ErrorKind::InvalidData, - "process and process_batch are only usable from \ - Pull-based Consumers with a durable_name set", - )); - } + loop { + let next = if let Some(ps) = &self.push_subscriber { + ps.next().unwrap() + } else { + if self.cfg.durable_name.is_none() { + return Err(Error::new( + ErrorKind::InvalidInput, + "process and process_batch are only usable from \ + Pull-based Consumers if there is a durable_name set", + )); + } - let subject = format!( - "{}CONSUMER.MSG.NEXT.{}.{}", - self.api_prefix(), - self.stream, - self.cfg.durable_name.as_ref().unwrap() - ); + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); - let next = if let Some(ps) = &self.push_subscriber { - ps.next().unwrap() - } else { - self.nc.request(&subject, AckKind::Ack)? - }; - let ret = f(&next)?; - if self.cfg.ack_policy != AckPolicy::None { - next.respond(AckKind::Ack)?; + self.nc.request(&subject, AckKind::Ack)? + }; + + let next_id = next.jetstream_message_info().unwrap().stream_seq; + + if self.dedupe_window.already_processed(next_id) { + let _dont_care = next.ack(); + continue; + } + + let ret = f(&next)?; + if self.cfg.ack_policy != AckPolicy::None { + let _dont_care = next.ack(); + } + + self.dedupe_window.mark_processed(next_id); + return Ok(ret); } - Ok(ret) } /// Process and acknowledge a single message, waiting up to the `Consumer`'s @@ -834,20 +893,117 @@ impl Consumer { /// /// Does not ack the processed message if the internal closure returns an `Err`. /// + /// All messages are deduplicated using the `Consumer`'s built-in + /// `dedupe_window` before being fed to the provided closure. If + /// a message that has already been processed is received, it will + /// be acked and skipped. + /// + /// Does not return an `Err` if acking the message is unsuccessful, + /// but the message is still marked in the dedupe window. If you + /// require stronger processing guarantees, you can manually call + /// the `double_ack` method of the argument message. If you require + /// both the returned `Ok` from the closure and the `Err` from a + /// failed ack, use `process_batch` instead. + /// /// Requires the `jetstream` feature. - #[doc(hidden)] pub fn process_timeout io::Result>( - &self, + &mut self, f: F, ) -> io::Result { + loop { + let next = if let Some(ps) = &self.push_subscriber { + ps.next_timeout(self.timeout)? + } else { + if self.cfg.durable_name.is_none() { + return Err(Error::new( + ErrorKind::InvalidInput, + "process and process_batch are only usable from \ + Pull-based Consumers if there is a a durable_name set", + )); + } + + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); + + self.nc.request(&subject, b"")? + }; + + let next_id = next.jetstream_message_info().unwrap().stream_seq; + + if self.dedupe_window.already_processed(next_id) { + self.dedupe_window.mark_processed(next_id); + let _dont_care = next.ack(); + continue; + } + + let ret = f(&next)?; + if self.cfg.ack_policy != AckPolicy::None { + let _dont_care = next.ack(); + } + return Ok(ret); + } + } + + /// For pull-based consumers (a consumer where `ConsumerConfig.deliver_subject` is `None`) + /// this can be used to request a single message, and wait forever for a response. + /// If you require specifying the batch size or using a timeout while consuming the + /// responses, use the `pull_opt` method below. + /// + /// This is a lower-level method and does not filter messages through the `Consumer`'s + /// built-in `dedupe_window` as the various `process*` methods do. + /// + /// Requires the `jetstream` feature. + pub fn pull(&mut self) -> io::Result { + let ret_opt = self + .pull_opt(NextRequest { + batch: 1, + ..Default::default() + })? + .next(); + + if let Some(ret) = ret_opt { + Ok(ret) + } else { + Err(Error::new( + ErrorKind::BrokenPipe, + "The nats client is shutting down.", + )) + } + } + + /// For pull-based consumers (a consumer where `ConsumerConfig.deliver_subject` is `None`) + /// this can be used to request a configurable number of messages, as well as specify + /// how the server will keep track of this batch request over time. See the docs for + /// `NextRequest` for more information about the options. + /// + /// This is a lower-level method and does not filter messages through the `Consumer`'s + /// built-in `dedupe_window` as the various `process*` methods do. + /// + /// Requires the `jetstream` feature. + pub fn pull_opt( + &mut self, + next_request: NextRequest, + ) -> io::Result { if self.cfg.durable_name.is_none() { return Err(Error::new( - ErrorKind::InvalidData, - "process and process_batch are only usable from \ + ErrorKind::InvalidInput, + "this method is only usable from \ Pull-based Consumers with a durable_name set", )); } + if self.cfg.deliver_subject.is_none() { + return Err(Error::new( + ErrorKind::InvalidInput, + "this method is only usable from \ + Pull-based Consumers with a deliver_subject set", + )); + } + let subject = format!( "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), @@ -855,16 +1011,8 @@ impl Consumer { self.cfg.durable_name.as_ref().unwrap() ); - let next = if let Some(ps) = &self.push_subscriber { - ps.next_timeout(self.timeout)? - } else { - self.nc.request(&subject, b"")? - }; - let ret = f(&next)?; - if self.cfg.ack_policy != AckPolicy::None { - next.respond(AckKind::Ack)?; - } - Ok(ret) + let req = serde_json::ser::to_vec(&next_request).unwrap(); + self.nc.request_multi(&subject, &req) } fn api_prefix(&self) -> &str { @@ -872,23 +1020,29 @@ impl Consumer { } } -/// Records ranges of acknowledged messages for +/// Records ranges of acknowledged IDs for /// low-memory deduplication. #[derive(Default)] -pub struct RangeTree { +pub struct IntervalTree { // stores interval start-end inner: std::collections::BTreeMap, } -impl RangeTree { +impl IntervalTree { /// Mark this ID as being processed. Returns `true` /// if this ID was not already marked as processed. pub fn mark_processed(&mut self, id: u64) -> bool { + if self.inner.is_empty() { + self.inner.insert(1, 1); + return true; + } + let (prev_start, prev_end) = self .inner .range(..=&id) .next_back() - .map_or((0, 0), |(s, e)| (*s, *e)); + .map(|(s, e)| (*s, *e)) + .unwrap(); if (prev_start..=prev_end).contains(&id) { // range already includes id @@ -956,7 +1110,10 @@ mod test { #[test] fn range_tree() { - let mut rt = RangeTree { + let mut rt = IntervalTree::default(); + assert!(rt.mark_processed(1)); + + let mut rt = IntervalTree { inner: vec![(0, 0), (6, 6)].into_iter().collect(), }; @@ -970,7 +1127,7 @@ mod test { assert!(rt.already_processed(6)); assert!(!rt.already_processed(7)); - let mut rt = RangeTree { + let mut rt = IntervalTree { inner: vec![(3, 3), (6, 6)].into_iter().collect(), }; @@ -982,7 +1139,7 @@ mod test { assert!(rt.already_processed(6)); assert!(!rt.already_processed(7)); - let mut rt = RangeTree { + let mut rt = IntervalTree { inner: vec![(0, 0), (5, 5)].into_iter().collect(), }; rt.mark_processed(4); @@ -992,7 +1149,7 @@ mod test { assert!(rt.already_processed(5)); assert!(!rt.already_processed(6)); - let mut rt = RangeTree { + let mut rt = IntervalTree { inner: vec![(2, 3), (5, 6)].into_iter().collect(), }; rt.mark_processed(4); diff --git a/src/jetstream_types.rs b/src/jetstream_types.rs index 7863900f2..14fbf7b28 100644 --- a/src/jetstream_types.rs +++ b/src/jetstream_types.rs @@ -65,7 +65,7 @@ pub struct ConsumerConfig { /// "exactly once" semantics, it is necessary to implement idempotent /// semantics in any system that is written to as a result of processing /// a message. - pub durable_name: Option, + pub deliver_subject: Option, /// Setting `durable_name` to `Some(...)` will cause this consumer /// to be "durable". This may be a good choice for workloads that @@ -82,7 +82,7 @@ pub struct ConsumerConfig { /// progress in the case of a crash, such as certain "high churn" /// workloads or workloads where a crashed instance is not required /// to recover. - pub deliver_subject: Option, + pub durable_name: Option, /// Allows for a variety of options that determine how this consumer will receive messages pub deliver_policy: DeliverPolicy, /// Used in combination with `DeliverPolicy::ByStartSeq` to only select messages arriving @@ -191,6 +191,25 @@ pub struct StreamInfo { pub state: StreamState, } +/// Information about a received message +#[derive(Debug, Clone)] +pub struct JetStreamMessageInfo<'a> { + /// The stream name + pub stream: &'a str, + /// The consumer name + pub consumer: &'a str, + /// + pub stream_seq: u64, + /// + pub consumer_seq: u64, + /// + pub delivered: i64, + /// + pub pending: u64, + /// + pub published: std::time::SystemTime, +} + /// information about the given stream. #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] pub struct StreamState { @@ -463,9 +482,16 @@ pub struct SequencePair { /// for getting next messages for pull based consumers. #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] -pub(crate) struct NextRequest { - pub expires: DateTime, - pub batch: Option, +pub struct NextRequest { + /// The number of messages that are being requested to be delivered. + pub batch: usize, + /// The optional number of nanoseconds that the server will store this next request for + /// before forgetting about the pending batch size. + pub expires: Option, + /// This optionally causes the server not to store this pending request at all, but when there are no + /// messages to deliver will send a nil bytes message with a Status header of 404, this way you + /// can know when you reached the end of the stream for example. A 409 is returned if the + /// Consumer has reached MaxAckPending limits. pub no_wait: Option, } diff --git a/src/lib.rs b/src/lib.rs index 66be8a64c..e570fa31b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,14 +104,16 @@ #![cfg_attr(test, deny(warnings))] #![deny( - missing_docs, future_incompatible, + missing_copy_implementations, + missing_docs, nonstandard_style, rust_2018_idioms, - missing_copy_implementations, + rustdoc, trivial_casts, trivial_numeric_casts, unsafe_code, + unused, unused_qualifications )] #![deny( diff --git a/src/message.rs b/src/message.rs index 24564c7cb..755363ad4 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,9 +1,11 @@ -use std::{fmt, io}; +use std::{ + fmt, io, + sync::atomic::{AtomicBool, Ordering}, +}; use crate::{client::Client, Headers}; /// A message received on a subject. -#[derive(Clone)] pub struct Message { /// The subject this message came from. pub subject: String, @@ -21,6 +23,26 @@ pub struct Message { /// Client for publishing on the reply subject. #[doc(hidden)] pub client: Client, + + /// Whether this message has already been successfully double-acked + /// using `JetStream`. + #[doc(hidden)] + pub double_acked: AtomicBool, +} + +impl Clone for Message { + fn clone(&self) -> Message { + Message { + subject: self.subject.clone(), + reply: self.reply.clone(), + data: self.data.clone(), + headers: self.headers.clone(), + client: self.client.clone(), + double_acked: AtomicBool::new( + self.double_acked.load(Ordering::Acquire), + ), + } + } } impl Message { @@ -34,6 +56,134 @@ impl Message { Some(reply) => self.client.publish(reply, None, None, msg.as_ref()), } } + + /// Acknowledge a `JetStream` message with a default acknowledgement. + /// See `AckKind` documentation for details of what other types of + /// acks are available. If you need to send a non-default ack, use + /// the `ack_kind` method below. If you need to block until the + /// server acks your ack, use the `double_ack` method instead. + /// + /// Returns immediately if this message has already been + /// double-acked. + /// + /// Requires the `jetstream` feature. + #[cfg(feature = "jetstream")] + pub fn ack(&self) -> io::Result<()> { + if self.double_acked.load(Ordering::Acquire) { + return Ok(()); + } + self.respond(b"") + } + + /// Acknowledge a `JetStream` message. See `AckKind` documentation for + /// details of what each variant means. If you need to block until the + /// server acks your ack, use the `double_ack` method instead. + /// + /// Does not check whether this message has already been double-acked. + /// + /// Requires the `jetstream` feature. + #[cfg(feature = "jetstream")] + pub fn ack_kind( + &self, + ack_kind: crate::jetstream::AckKind, + ) -> io::Result<()> { + self.respond(ack_kind) + } + + /// Acknowledge a `JetStream` message and wait for acknowledgement from the server + /// that it has received our ack. Retry acknowledgement until we receive a response. + /// See `AckKind` documentation for details of what each variant means. + /// + /// Returns immediately if this message has already been double-acked. + /// + /// Requires the `jetstream` feature. + #[cfg(feature = "jetstream")] + pub fn double_ack( + &self, + ack_kind: crate::jetstream::AckKind, + ) -> io::Result<()> { + if self.double_acked.load(Ordering::Acquire) { + return Ok(()); + } + let original_reply = match self.reply.as_ref() { + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "No reply subject available (not a JetStream message)", + )) + } + Some(original_reply) => original_reply, + }; + let mut retries = 0; + loop { + retries += 1; + if retries == 2 { + log::warn!("double_ack is retrying until the server connection is reestablished"); + } + let ack_reply = format!("_INBOX.{}", nuid::next()); + let sub_ret = self.client.subscribe(&ack_reply, None); + if sub_ret.is_err() { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + } + let (sid, receiver) = sub_ret?; + let sub = crate::Subscription::new( + sid, + ack_reply.to_string(), + receiver, + self.client.clone(), + ); + + let pub_ret = self.client.publish( + original_reply, + Some(&ack_reply), + None, + ack_kind.as_ref(), + ); + if pub_ret.is_err() { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + } + if sub + .next_timeout(std::time::Duration::from_millis(100)) + .is_ok() + { + self.double_acked.store(true, Ordering::Release); + return Ok(()); + } + } + } + + /// Returns the `JetStream` message ID + /// if this is a `JetStream` message. + /// Returns `None` if this is not + /// a `JetStream` message with headers + /// set. + /// + /// Requires the `jetstream` feature. + #[cfg(feature = "jetstream")] + pub fn jetstream_message_info( + &self, + ) -> Option> { + let reply = self.reply.as_ref()?; + let mut split = reply.split('.'); + if split.next()? != "$JS" || split.next()? != "ACK" { + return None; + } + Some(crate::jetstream::JetStreamMessageInfo { + stream: split.next()?, + consumer: split.next()?, + delivered: str::parse(split.next()?).ok()?, + stream_seq: str::parse(split.next()?).ok()?, + consumer_seq: str::parse(split.next()?).ok()?, + published: { + let nanos: u64 = str::parse(split.next()?).ok()?; + let offset = std::time::Duration::from_nanos(nanos); + std::time::UNIX_EPOCH + offset + }, + pending: str::parse(split.next()?).ok()?, + }) + } } impl fmt::Debug for Message { diff --git a/tests/jetstream.rs b/tests/jetstream.rs index 0985dc403..ed375a6e3 100644 --- a/tests/jetstream.rs +++ b/tests/jetstream.rs @@ -49,7 +49,7 @@ fn jetstream_create_consumer() -> io::Result<()> { let nc = nats::connect(&format!("localhost:{}", server.port)).unwrap(); nc.create_stream("stream1")?; - let consumer = nc.create_consumer("stream1", "consumer1")?; + nc.create_consumer("stream1", "consumer1")?; Ok(()) } @@ -72,7 +72,7 @@ fn jetstream_basics() -> io::Result<()> { nc.stream_info("test2")?; nc.create_consumer("test2", "consumer1")?; - let consumer2_cfg = ConsumerConfig { + let mut consumer2_cfg = ConsumerConfig { durable_name: Some("consumer2".to_string()), ack_policy: AckPolicy::All, deliver_subject: Some("consumer2_ds".to_string()), @@ -87,13 +87,13 @@ fn jetstream_basics() -> io::Result<()> { assert_eq!(nc.stream_info("test2")?.state.messages, 1000); - let consumer1 = Consumer::existing(nc.clone(), "test2", "consumer1")?; + let mut consumer1 = Consumer::existing(nc.clone(), "test2", "consumer1")?; for _ in 1..=1000 { consumer1.process(|_msg| Ok(()))?; } - let consumer2 = Consumer::existing(nc.clone(), "test2", consumer2_cfg)?; + let mut consumer2 = Consumer::existing(nc.clone(), "test2", consumer2_cfg)?; let mut count = 0; while count != 1000 { @@ -116,7 +116,7 @@ fn jetstream_basics() -> io::Result<()> { nc.create_consumer("test2", "consumer3")?; - let consumer3 = Consumer::existing(nc.clone(), "test2", "consumer3")?; + Consumer::existing(nc.clone(), "test2", "consumer3")?; let _ = dbg!(nc.account_info());