From ffe2ee1e050ff0ec48d846e635c206630d46f037 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 2 Mar 2021 12:38:03 +0100 Subject: [PATCH 01/13] Update NextRequest based on new expires type --- src/jetstream_types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jetstream_types.rs b/src/jetstream_types.rs index 7863900f2..33b4c7e37 100644 --- a/src/jetstream_types.rs +++ b/src/jetstream_types.rs @@ -464,7 +464,7 @@ pub struct SequencePair { /// for getting next messages for pull based consumers. #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] pub(crate) struct NextRequest { - pub expires: DateTime, + pub expires: Option, pub batch: Option, pub no_wait: Option, } From a17e87b1c12988318fb3c578545eabdd039a31da Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 2 Mar 2021 13:04:22 +0100 Subject: [PATCH 02/13] Add ack and blocking double_ack methods to Message --- src/message.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/message.rs b/src/message.rs index 24564c7cb..8d4fa008c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -34,6 +34,69 @@ impl Message { Some(reply) => self.client.publish(reply, None, None, msg.as_ref()), } } + + /// Acknowledge a `JetStream` message. See `AckKind` documentation for + /// details of what each variant means. + #[cfg(feature = "jetstream")] + pub fn ack(&self, ack_kind: crate::jetstream::AckKind) -> io::Result<()> { + self.respond(ack_kind) + } + + /// Acknowledge a `JetStream` message and wait for acknowledgement from the server + /// that it has received our ack. Retry acknowledgement until we receive a response. + /// See `AckKind` documentation for details of what each variant means. + #[cfg(feature = "jetstream")] + pub fn double_ack( + &self, + ack_kind: crate::jetstream::AckKind, + ) -> io::Result<()> { + let original_reply = match self.reply.as_ref() { + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "no reply subject available", + )) + } + Some(original_reply) => original_reply, + }; + let mut retries = 0; + loop { + retries += 1; + if retries == 2 { + log::warn!("double_ack is retrying until the server connection is reestablished"); + } + let reply = format!("_INBOX.{}", nuid::next()); + let sub_ret = self.client.subscribe(&reply, None); + if sub_ret.is_err() { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + } + let (sid, receiver) = sub_ret?; + let sub = crate::Subscription::new( + sid, + reply.to_string(), + receiver, + self.client.clone(), + ); + + let pub_ret = self.client.publish( + original_reply, + Some(&reply), + None, + ack_kind.as_ref(), + ); + if pub_ret.is_err() { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + } + if sub + .next_timeout(std::time::Duration::from_millis(100)) + .is_ok() + { + return Ok(()); + } + } + } } impl fmt::Debug for Message { From d6d1d70b8610e2f39105128964d9d1db60d07c1c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 2 Mar 2021 13:08:02 +0100 Subject: [PATCH 03/13] Some docs and refactors of JS methods on Message --- src/message.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/message.rs b/src/message.rs index 8d4fa008c..89d2a2279 100644 --- a/src/message.rs +++ b/src/message.rs @@ -37,6 +37,8 @@ impl Message { /// Acknowledge a `JetStream` message. See `AckKind` documentation for /// details of what each variant means. + /// + /// Requires the `jetstream` feature. #[cfg(feature = "jetstream")] pub fn ack(&self, ack_kind: crate::jetstream::AckKind) -> io::Result<()> { self.respond(ack_kind) @@ -45,6 +47,8 @@ impl Message { /// Acknowledge a `JetStream` message and wait for acknowledgement from the server /// that it has received our ack. Retry acknowledgement until we receive a response. /// See `AckKind` documentation for details of what each variant means. + /// + /// Requires the `jetstream` feature. #[cfg(feature = "jetstream")] pub fn double_ack( &self, @@ -54,7 +58,7 @@ impl Message { None => { return Err(io::Error::new( io::ErrorKind::InvalidInput, - "no reply subject available", + "No reply subject available (not a JetStream message)", )) } Some(original_reply) => original_reply, @@ -65,8 +69,8 @@ impl Message { if retries == 2 { log::warn!("double_ack is retrying until the server connection is reestablished"); } - let reply = format!("_INBOX.{}", nuid::next()); - let sub_ret = self.client.subscribe(&reply, None); + let ack_reply = format!("_INBOX.{}", nuid::next()); + let sub_ret = self.client.subscribe(&ack_reply, None); if sub_ret.is_err() { std::thread::sleep(std::time::Duration::from_millis(100)); continue; @@ -74,14 +78,14 @@ impl Message { let (sid, receiver) = sub_ret?; let sub = crate::Subscription::new( sid, - reply.to_string(), + ack_reply.to_string(), receiver, self.client.clone(), ); let pub_ret = self.client.publish( original_reply, - Some(&reply), + Some(&ack_reply), None, ack_kind.as_ref(), ); From aa3a2fd73c8eb6558d4fa6e6a4cbd3b8cbc236d2 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 4 Mar 2021 11:11:05 +0100 Subject: [PATCH 04/13] Rename dedupe window to IntervalTree, other small refactors --- src/jetstream.rs | 59 ++++++++++++++++++++++++------------------------ src/lib.rs | 6 +++-- src/message.rs | 3 ++- 3 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index 080599120..e128ebb37 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -135,7 +135,6 @@ //! //! # Ok(()) } //! ``` -//! use std::{ collections::VecDeque, @@ -591,7 +590,7 @@ pub struct Consumer { /// Contains ranges of processed messages that will be /// filtered out upon future receipt. - pub dedupe_window: RangeTree, + pub dedupe_window: IntervalTree, } impl Consumer { @@ -712,17 +711,17 @@ impl Consumer { ))]; } - let subject = format!( - "{}CONSUMER.MSG.NEXT.{}.{}", - self.api_prefix(), - self.stream, - self.cfg.durable_name.as_ref().unwrap() - ); - let mut _sub_opt = None; let responses = if let Some(ps) = self.push_subscriber.as_ref() { ps } else { + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); + let sub = match self.nc.request_multi(&subject, batch_size.to_string()) { Ok(sub) => sub, @@ -810,16 +809,16 @@ impl Consumer { )); } - let subject = format!( - "{}CONSUMER.MSG.NEXT.{}.{}", - self.api_prefix(), - self.stream, - self.cfg.durable_name.as_ref().unwrap() - ); - let next = if let Some(ps) = &self.push_subscriber { ps.next().unwrap() } else { + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); + self.nc.request(&subject, AckKind::Ack)? }; let ret = f(&next)?; @@ -848,16 +847,16 @@ impl Consumer { )); } - let subject = format!( - "{}CONSUMER.MSG.NEXT.{}.{}", - self.api_prefix(), - self.stream, - self.cfg.durable_name.as_ref().unwrap() - ); - let next = if let Some(ps) = &self.push_subscriber { ps.next_timeout(self.timeout)? } else { + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); + self.nc.request(&subject, b"")? }; let ret = f(&next)?; @@ -872,15 +871,15 @@ impl Consumer { } } -/// Records ranges of acknowledged messages for +/// Records ranges of acknowledged IDs for /// low-memory deduplication. #[derive(Default)] -pub struct RangeTree { +pub struct IntervalTree { // stores interval start-end inner: std::collections::BTreeMap, } -impl RangeTree { +impl IntervalTree { /// Mark this ID as being processed. Returns `true` /// if this ID was not already marked as processed. pub fn mark_processed(&mut self, id: u64) -> bool { @@ -956,7 +955,7 @@ mod test { #[test] fn range_tree() { - let mut rt = RangeTree { + let mut rt = IntervalTree { inner: vec![(0, 0), (6, 6)].into_iter().collect(), }; @@ -970,7 +969,7 @@ mod test { assert!(rt.already_processed(6)); assert!(!rt.already_processed(7)); - let mut rt = RangeTree { + let mut rt = IntervalTree { inner: vec![(3, 3), (6, 6)].into_iter().collect(), }; @@ -982,7 +981,7 @@ mod test { assert!(rt.already_processed(6)); assert!(!rt.already_processed(7)); - let mut rt = RangeTree { + let mut rt = IntervalTree { inner: vec![(0, 0), (5, 5)].into_iter().collect(), }; rt.mark_processed(4); @@ -992,7 +991,7 @@ mod test { assert!(rt.already_processed(5)); assert!(!rt.already_processed(6)); - let mut rt = RangeTree { + let mut rt = IntervalTree { inner: vec![(2, 3), (5, 6)].into_iter().collect(), }; rt.mark_processed(4); diff --git a/src/lib.rs b/src/lib.rs index 66be8a64c..e570fa31b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,14 +104,16 @@ #![cfg_attr(test, deny(warnings))] #![deny( - missing_docs, future_incompatible, + missing_copy_implementations, + missing_docs, nonstandard_style, rust_2018_idioms, - missing_copy_implementations, + rustdoc, trivial_casts, trivial_numeric_casts, unsafe_code, + unused, unused_qualifications )] #![deny( diff --git a/src/message.rs b/src/message.rs index 89d2a2279..2eb902183 100644 --- a/src/message.rs +++ b/src/message.rs @@ -36,7 +36,8 @@ impl Message { } /// Acknowledge a `JetStream` message. See `AckKind` documentation for - /// details of what each variant means. + /// details of what each variant means. If you need to block until the + /// server acks your ack, use the `double_ack` method instead. /// /// Requires the `jetstream` feature. #[cfg(feature = "jetstream")] From c21d06c9e57cb1ef5b0a57c868f947ea513f60f4 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 8 Mar 2021 17:55:24 +0100 Subject: [PATCH 05/13] Add Consumer::pull/pull_opt methods. Rearrange message ack methods. Some small cleanups --- src/jetstream.rs | 105 +++++++++++++++++++++++++++++++---------- src/jetstream_types.rs | 15 ++++-- src/message.rs | 17 ++++++- 3 files changed, 108 insertions(+), 29 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index e128ebb37..23fb04b9d 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -56,9 +56,6 @@ //! //! let consumer: Consumer = nc.create_consumer("my_stream", "my_consumer")?; //! -//! // process a single item, sending an ack if the closure returns `Ok`. -//! consumer.process -//! //! # Ok(()) } //! ``` //! @@ -113,10 +110,12 @@ //! let consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; //! //! // wait indefinitely for the message to arrive -//! let msg_data_len: usize = consumer.process(|msg| { -//! println!("got message {:?}", msg); -//! msg.data.len() -//! })?; +//! let msg = consumer.pull()?; +//! +//! // --- process message --- +//! +//! // tell the server the message has been processed +//! msg.ack()?; //! //! // wait until the consumer's `timeout` field for the message to arrive. //! // This can be set manually, and has a very low default of 5ms. @@ -141,6 +140,7 @@ use std::{ convert::TryFrom, fmt::Debug, io::{self, Error, ErrorKind}, + time::Duration, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -256,7 +256,7 @@ impl NatsClient { let cfg: StreamConfig = stream_config.into(); if cfg.name.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -272,7 +272,7 @@ impl NatsClient { pub fn update_stream(&self, cfg: &StreamConfig) -> io::Result { if cfg.name.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -322,7 +322,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -348,7 +348,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -367,7 +367,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -386,7 +386,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -410,7 +410,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -436,7 +436,7 @@ impl NatsClient { let stream = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -484,14 +484,14 @@ impl NatsClient { let stream = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } let consumer = consumer.as_ref(); if consumer.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the consumer name must not be empty", )); } @@ -522,7 +522,7 @@ impl NatsClient { let stream: &str = stream.as_ref(); if stream.is_empty() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "the stream name must not be empty", )); } @@ -586,7 +586,7 @@ pub struct Consumer { /// out during `process` and `process_batch`. Defaults /// to 5ms, which is likely to be far too low for /// workloads crossing physical sites. - pub timeout: std::time::Duration, + pub timeout: Duration, /// Contains ranges of processed messages that will be /// filtered out upon future receipt. @@ -672,7 +672,7 @@ impl Consumer { stream, cfg, push_subscriber, - timeout: std::time::Duration::from_millis(5), + timeout: Duration::from_millis(5), dedupe_window: Default::default(), }) } @@ -705,7 +705,7 @@ impl Consumer { ) -> Vec> { if self.cfg.durable_name.is_none() { return vec![Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "process and process_batch are only usable from \ Pull-based Consumers with a durable_name set", ))]; @@ -740,7 +740,7 @@ impl Consumer { while let Ok(msg) = responses.next_timeout(if received == 0 { // wait "forever" for first message - std::time::Duration::new(std::u64::MAX >> 2, 0) + Duration::new(std::u64::MAX >> 2, 0) } else { self.timeout .checked_sub(start.elapsed()) @@ -803,7 +803,7 @@ impl Consumer { ) -> io::Result { if self.cfg.durable_name.is_none() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "process and process_batch are only usable from \ Pull-based Consumers with a durable_name set", )); @@ -841,7 +841,7 @@ impl Consumer { ) -> io::Result { if self.cfg.durable_name.is_none() { return Err(Error::new( - ErrorKind::InvalidData, + ErrorKind::InvalidInput, "process and process_batch are only usable from \ Pull-based Consumers with a durable_name set", )); @@ -866,6 +866,63 @@ impl Consumer { Ok(ret) } + /// For pull-based consumers (a consumer where `ConsumerConfig.deliver_subject` is `None`) + /// this can be used to request a single message, and wait forever for a response. + /// If you require specifying the batch size or using a timeout while consuming the + /// responses, use the `pull_opt` method below. + pub fn pull(&mut self) -> io::Result { + let ret_opt = self + .pull_opt(NextRequest { + batch: 1, + ..Default::default() + })? + .next(); + + if let Some(ret) = ret_opt { + Ok(ret) + } else { + Err(Error::new( + ErrorKind::BrokenPipe, + "The nats client is shutting down.", + )) + } + } + + /// For pull-based consumers (a consumer where `ConsumerConfig.deliver_subject` is `None`) + /// this can be used to request a configurable number of messages, as well as specify + /// how the server will keep track of this batch request over time. See the docs for + /// `NextRequest` for more information about the options. + pub fn pull_opt( + &mut self, + next_request: NextRequest, + ) -> io::Result { + if self.cfg.durable_name.is_none() { + return Err(Error::new( + ErrorKind::InvalidInput, + "this method is only usable from \ + Pull-based Consumers with a durable_name set", + )); + } + + if self.cfg.deliver_subject.is_none() { + return Err(Error::new( + ErrorKind::InvalidInput, + "this method is only usable from \ + Pull-based Consumers with a deliver_subject set", + )); + } + + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); + + let req = serde_json::ser::to_vec(&next_request).unwrap(); + self.nc.request_multi(&subject, &req) + } + fn api_prefix(&self) -> &str { &self.nc.0.client.options.jetstream_prefix } diff --git a/src/jetstream_types.rs b/src/jetstream_types.rs index 33b4c7e37..5e243cbc5 100644 --- a/src/jetstream_types.rs +++ b/src/jetstream_types.rs @@ -65,7 +65,7 @@ pub struct ConsumerConfig { /// "exactly once" semantics, it is necessary to implement idempotent /// semantics in any system that is written to as a result of processing /// a message. - pub durable_name: Option, + pub deliver_subject: Option, /// Setting `durable_name` to `Some(...)` will cause this consumer /// to be "durable". This may be a good choice for workloads that @@ -82,7 +82,7 @@ pub struct ConsumerConfig { /// progress in the case of a crash, such as certain "high churn" /// workloads or workloads where a crashed instance is not required /// to recover. - pub deliver_subject: Option, + pub durable_name: Option, /// Allows for a variety of options that determine how this consumer will receive messages pub deliver_policy: DeliverPolicy, /// Used in combination with `DeliverPolicy::ByStartSeq` to only select messages arriving @@ -463,9 +463,16 @@ pub struct SequencePair { /// for getting next messages for pull based consumers. #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] -pub(crate) struct NextRequest { +pub struct NextRequest { + /// The number of messages that are being requested to be delivered. + pub batch: usize, + /// The optional number of nanoseconds that the server will store this next request for + /// before forgetting about the pending batch size. pub expires: Option, - pub batch: Option, + /// This optionally causes the server not to store this pending request at all, but when there are no + /// messages to deliver will send a nil bytes message with a Status header of 404, this way you + /// can know when you reached the end of the stream for example. A 409 is returned if the + /// Consumer has reached MaxAckPending limits. pub no_wait: Option, } diff --git a/src/message.rs b/src/message.rs index 2eb902183..6adbf1ff9 100644 --- a/src/message.rs +++ b/src/message.rs @@ -35,13 +35,28 @@ impl Message { } } + /// Acknowledge a `JetStream` message with a default acknowledgement. + /// See `AckKind` documentation for details of what other types of + /// acks are available. If you need to send a non-default ack, use + /// the `ack_kind` method below. If you need to block until the + /// server acks your ack, use the `double_ack` method instead. + /// + /// Requires the `jetstream` feature. + #[cfg(feature = "jetstream")] + pub fn ack(&self) -> io::Result<()> { + self.respond(b"") + } + /// Acknowledge a `JetStream` message. See `AckKind` documentation for /// details of what each variant means. If you need to block until the /// server acks your ack, use the `double_ack` method instead. /// /// Requires the `jetstream` feature. #[cfg(feature = "jetstream")] - pub fn ack(&self, ack_kind: crate::jetstream::AckKind) -> io::Result<()> { + pub fn ack_kind( + &self, + ack_kind: crate::jetstream::AckKind, + ) -> io::Result<()> { self.respond(ack_kind) } From ada35b6943134136ee3af6887a0aa4d0c0c2a261 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 8 Mar 2021 18:02:32 +0100 Subject: [PATCH 06/13] Fix doctests --- src/jetstream.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index 23fb04b9d..065f35bda 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -54,7 +54,7 @@ //! //! nc.create_stream("my_stream")?; //! -//! let consumer: Consumer = nc.create_consumer("my_stream", "my_consumer")?; +//! let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", "my_consumer")?; //! //! # Ok(()) } //! ``` @@ -69,7 +69,7 @@ //! //! nc.create_stream("my_stream")?; //! -//! let consumer: Consumer = nc.create_consumer("my_stream", ConsumerConfig { +//! let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", ConsumerConfig { //! durable_name: Some("my_consumer".to_string()), //! deliver_subject: Some("my_push_consumer_subject".to_string()), //! ack_policy: AckPolicy::All, @@ -107,7 +107,7 @@ //! let nc = nats::connect("my_server::4222")?; //! //! // this will create the consumer if it does not exist already -//! let consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; +//! let mut consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; //! //! // wait indefinitely for the message to arrive //! let msg = consumer.pull()?; @@ -121,16 +121,18 @@ //! // This can be set manually, and has a very low default of 5ms. //! let msg_data_len: usize = consumer.process_timeout(|msg| { //! println!("got message {:?}", msg); -//! msg.data.len() +//! Ok(msg.data.len()) //! })?; //! //! // wait indefinitely for the first message in a batch, then process //! // more messages until the configured timeout is expired //! let batch_size = 128; -//! let results: Vec = consumer.process_batch(batch_size, |msg| { +//! let results: Vec> = consumer.process_batch(batch_size, |msg| { //! println!("got message {:?}", msg); -//! msg.data.len() -//! })?; +//! Ok(msg.data.len()) +//! }); +//! let flipped: std::io::Result> = results.into_iter().collect(); +//! let sizes: Vec = flipped?; //! //! # Ok(()) } //! ``` From efef0059ce233c8eb6ff00272fe8ebc82f84e946 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 8 Mar 2021 18:04:21 +0100 Subject: [PATCH 07/13] Add doctests to gha --- .github/workflows/test.yml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9247a8399..7d48b807f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -94,6 +94,32 @@ jobs: rustup update cargo test reconnect_test --features=fault_injection -- --ignored + doctests: + name: doctests + runs-on: ubuntu-latest + steps: + - name: Cache Rust + uses: actions/cache@v1 + env: + cache-name: cache-rust + with: + path: target + key: ${{ runner.os }}-${{ env.cache-name }} + restore-keys: | + ${{ runner.os }}- + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: '1.14' + - name: install nats-server + run: go get github.com/nats-io/nats-server + - name: JetStream test + env: + RUST_LOG: trace + run: | + rustup update + cargo test --doc --features=jetstream + jetstream: name: JetStream runs-on: ubuntu-latest From a3803646641f857d1bdfb61ee3dc42eb773d9323 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 9 Mar 2021 11:36:43 +0100 Subject: [PATCH 08/13] Update docs for Consumer --- src/jetstream.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/jetstream.rs b/src/jetstream.rs index 065f35bda..88706bbc6 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -872,6 +872,8 @@ impl Consumer { /// this can be used to request a single message, and wait forever for a response. /// If you require specifying the batch size or using a timeout while consuming the /// responses, use the `pull_opt` method below. + /// + /// Requires the `jetstream` feature. pub fn pull(&mut self) -> io::Result { let ret_opt = self .pull_opt(NextRequest { @@ -894,6 +896,8 @@ impl Consumer { /// this can be used to request a configurable number of messages, as well as specify /// how the server will keep track of this batch request over time. See the docs for /// `NextRequest` for more information about the options. + /// + /// Requires the `jetstream` feature. pub fn pull_opt( &mut self, next_request: NextRequest, From 24e3465a699ee0da93b8cded5d2a76f7785203a8 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 9 Mar 2021 15:42:24 +0100 Subject: [PATCH 09/13] Improve processing functions. Improve docs. Fix bug in IntervalTree. Deduplicate messages in process methods --- src/client.rs | 2 + src/jetstream.rs | 250 ++++++++++++++++++++++++++++------------- src/jetstream_types.rs | 19 ++++ src/message.rs | 71 +++++++++++- tests/jetstream.rs | 10 +- 5 files changed, 266 insertions(+), 86 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8254ce6e0..c609ebba8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -619,6 +619,7 @@ impl Client { data: payload, headers: None, client: self.clone(), + double_acked: Default::default(), }; // Send a message or drop it if the channel is @@ -642,6 +643,7 @@ impl Client { data: payload, headers: Some(headers), client: self.clone(), + double_acked: Default::default(), }; // Send a message or drop it if the channel is diff --git a/src/jetstream.rs b/src/jetstream.rs index 88706bbc6..13419ff9a 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -106,7 +106,11 @@ //! //! let nc = nats::connect("my_server::4222")?; //! -//! // this will create the consumer if it does not exist already +//! // this will create the consumer if it does not exist already. +//! // consumer must be mut because the `process*` methods perform +//! // message deduplication using an interval tree, which is +//! // also publicly accessible via the `Consumer`'s `dedupe_window` +//! // field. //! let mut consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; //! //! // wait indefinitely for the message to arrive @@ -117,15 +121,37 @@ //! // tell the server the message has been processed //! msg.ack()?; //! -//! // wait until the consumer's `timeout` field for the message to arrive. -//! // This can be set manually, and has a very low default of 5ms. +//! // The `Consumer::process` method executes a closure +//! // on both push- and pull-based consumers, and if +//! // the closure returns `Ok` then the message is acked. +//! // If no message is available, it will wait forever +//! // for one to arrive. +//! let msg_data_len: usize = consumer.process(|msg| { +//! println!("got message {:?}", msg); +//! Ok(msg.data.len()) +//! })?; +//! +//! // Similar to `Consumer::process` except wait until the +//! // consumer's `timeout` field for the message to arrive. +//! // This can and should be set manually, as it has a low +//! // default of 5ms. //! let msg_data_len: usize = consumer.process_timeout(|msg| { //! println!("got message {:?}", msg); //! Ok(msg.data.len()) //! })?; //! -//! // wait indefinitely for the first message in a batch, then process -//! // more messages until the configured timeout is expired +//! // For consumers operating with `AckPolicy::All`, batch +//! // processing can provide nice throughput optimizations. +//! // `Consumer::process_batch` will wait indefinitely for +//! // the first message in a batch, then process +//! // more messages until the configured timeout is expired. +//! // It will batch acks if running with `AckPolicy::All`. +//! // If there is an error with acking, the last item in the +//! // returned `Vec` will be the io error. Terminates early +//! // without acking if the closure returns an `Err`, which +//! // is included in the final element of the `Vec`. If a +//! // Timeout happens before the batch size is reached, then +//! // there will be no errors included in the response `Vec`. //! let batch_size = 128; //! let results: Vec> = consumer.process_batch(batch_size, |msg| { //! println!("got message {:?}", msg); @@ -696,27 +722,35 @@ impl Consumer { /// If an error is encountered while subscribing or acking messages /// that may have returned `Ok` from the closure, that Ok will be /// present in the returned vector but the last item in the vector - /// will be the encountered error. + /// will be the encountered error. If the consumer's timeout expires + /// before the entire batch is processed, there will be no error + /// pushed to the returned `Vec`, it will just be shorter than the + /// specified batch size. + /// + /// All messages are deduplicated using the `Consumer`'s built-in + /// `dedupe_window` before being fed to the provided closure. If + /// a message that has already been processed is received, it will + /// be acked and skipped. Errors for acking deduplicated messages + /// are not included in the returned `Vec`. /// /// Requires the `jetstream` feature. - #[doc(hidden)] pub fn process_batch io::Result>( - &self, + &mut self, batch_size: usize, mut f: F, ) -> Vec> { - if self.cfg.durable_name.is_none() { - return vec![Err(Error::new( - ErrorKind::InvalidInput, - "process and process_batch are only usable from \ - Pull-based Consumers with a durable_name set", - ))]; - } - let mut _sub_opt = None; let responses = if let Some(ps) = self.push_subscriber.as_ref() { ps } else { + if self.cfg.durable_name.is_none() { + return vec![Err(Error::new( + ErrorKind::InvalidInput, + "process and process_batch are only usable from \ + Pull-based Consumers if there is a durable_name set", + ))]; + } + let subject = format!( "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), @@ -738,9 +772,8 @@ impl Consumer { let start = std::time::Instant::now(); let mut received = 0; - let mut acked = 0; - while let Ok(msg) = responses.next_timeout(if received == 0 { + while let Ok(next) = responses.next_timeout(if received == 0 { // wait "forever" for first message Duration::new(std::u64::MAX >> 2, 0) } else { @@ -748,13 +781,15 @@ impl Consumer { .checked_sub(start.elapsed()) .unwrap_or_default() }) { - let ret = f(&msg); + let next_id = next.jetstream_message_info().unwrap().stream_seq; - if ret.is_err() { - rets.push(ret); - return rets; + if self.dedupe_window.already_processed(next_id) { + let _dont_care_about_success = next.ack(); + continue; } + let ret = f(&next); + let is_err = ret.is_err(); rets.push(ret); @@ -763,27 +798,24 @@ impl Consumer { // if our ack policy is `All`, after breaking. break; } else if self.cfg.ack_policy == AckPolicy::Explicit { - let res = msg.respond(AckKind::Ack); + self.dedupe_window.mark_processed(next_id); + let res = next.ack(); if let Err(e) = res { - rets.truncate(acked); rets.push(Err(e)); - } else { - acked += 1; } } - last = Some(msg); + last = Some(next); received += 1; if received == batch_size { break; } } - if let Some(msg) = last { + if let Some(last) = last { if self.cfg.ack_policy == AckPolicy::All { - let res = msg.respond(AckKind::Ack); + let res = last.ack(); if let Err(e) = res { - rets.truncate(acked); rets.push(Err(e)); } } @@ -797,37 +829,60 @@ impl Consumer { /// /// Does not ack the processed message if the internal closure returns an `Err`. /// + /// All messages are deduplicated using the `Consumer`'s built-in + /// `dedupe_window` before being fed to the provided closure. If + /// a message that has already been processed is received, it will + /// be acked and skipped. + /// + /// Does not return an `Err` if acking the message is unsuccessful, + /// but the message is still marked in the dedupe window. If you + /// require stronger processing guarantees, you can manually call + /// the `double_ack` method of the argument message. If you require + /// both the returned `Ok` from the closure and the `Err` from a + /// failed ack, use `process_batch` instead. + /// /// Requires the `jetstream` feature. - #[doc(hidden)] pub fn process io::Result>( - &self, + &mut self, f: F, ) -> io::Result { - if self.cfg.durable_name.is_none() { - return Err(Error::new( - ErrorKind::InvalidInput, - "process and process_batch are only usable from \ - Pull-based Consumers with a durable_name set", - )); - } + loop { + let next = if let Some(ps) = &self.push_subscriber { + ps.next().unwrap() + } else { + if self.cfg.durable_name.is_none() { + return Err(Error::new( + ErrorKind::InvalidInput, + "process and process_batch are only usable from \ + Pull-based Consumers if there is a durable_name set", + )); + } - let next = if let Some(ps) = &self.push_subscriber { - ps.next().unwrap() - } else { - let subject = format!( - "{}CONSUMER.MSG.NEXT.{}.{}", - self.api_prefix(), - self.stream, - self.cfg.durable_name.as_ref().unwrap() - ); + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); - self.nc.request(&subject, AckKind::Ack)? - }; - let ret = f(&next)?; - if self.cfg.ack_policy != AckPolicy::None { - next.respond(AckKind::Ack)?; + self.nc.request(&subject, AckKind::Ack)? + }; + + let next_id = next.jetstream_message_info().unwrap().stream_seq; + + if self.dedupe_window.already_processed(next_id) { + let _dont_care = next.ack(); + continue; + } + + let ret = f(&next)?; + if self.cfg.ack_policy != AckPolicy::None { + let _dont_care = next.ack(); + } + + self.dedupe_window.mark_processed(next_id); + return Ok(ret); } - Ok(ret) } /// Process and acknowledge a single message, waiting up to the `Consumer`'s @@ -835,37 +890,59 @@ impl Consumer { /// /// Does not ack the processed message if the internal closure returns an `Err`. /// + /// All messages are deduplicated using the `Consumer`'s built-in + /// `dedupe_window` before being fed to the provided closure. If + /// a message that has already been processed is received, it will + /// be acked and skipped. + /// + /// Does not return an `Err` if acking the message is unsuccessful, + /// but the message is still marked in the dedupe window. If you + /// require stronger processing guarantees, you can manually call + /// the `double_ack` method of the argument message. If you require + /// both the returned `Ok` from the closure and the `Err` from a + /// failed ack, use `process_batch` instead. + /// /// Requires the `jetstream` feature. - #[doc(hidden)] pub fn process_timeout io::Result>( - &self, + &mut self, f: F, ) -> io::Result { - if self.cfg.durable_name.is_none() { - return Err(Error::new( - ErrorKind::InvalidInput, - "process and process_batch are only usable from \ - Pull-based Consumers with a durable_name set", - )); - } + loop { + let next = if let Some(ps) = &self.push_subscriber { + ps.next_timeout(self.timeout)? + } else { + if self.cfg.durable_name.is_none() { + return Err(Error::new( + ErrorKind::InvalidInput, + "process and process_batch are only usable from \ + Pull-based Consumers if there is a a durable_name set", + )); + } - let next = if let Some(ps) = &self.push_subscriber { - ps.next_timeout(self.timeout)? - } else { - let subject = format!( - "{}CONSUMER.MSG.NEXT.{}.{}", - self.api_prefix(), - self.stream, - self.cfg.durable_name.as_ref().unwrap() - ); + let subject = format!( + "{}CONSUMER.MSG.NEXT.{}.{}", + self.api_prefix(), + self.stream, + self.cfg.durable_name.as_ref().unwrap() + ); - self.nc.request(&subject, b"")? - }; - let ret = f(&next)?; - if self.cfg.ack_policy != AckPolicy::None { - next.respond(AckKind::Ack)?; + self.nc.request(&subject, b"")? + }; + + let next_id = next.jetstream_message_info().unwrap().stream_seq; + + if self.dedupe_window.already_processed(next_id) { + self.dedupe_window.mark_processed(next_id); + let _dont_care = next.ack(); + continue; + } + + let ret = f(&next)?; + if self.cfg.ack_policy != AckPolicy::None { + let _dont_care = next.ack(); + } + return Ok(ret); } - Ok(ret) } /// For pull-based consumers (a consumer where `ConsumerConfig.deliver_subject` is `None`) @@ -873,6 +950,9 @@ impl Consumer { /// If you require specifying the batch size or using a timeout while consuming the /// responses, use the `pull_opt` method below. /// + /// This is a lower-level method and does not filter messages through the `Consumer`'s + /// built-in `dedupe_window` as the various `process*` methods do. + /// /// Requires the `jetstream` feature. pub fn pull(&mut self) -> io::Result { let ret_opt = self @@ -897,6 +977,9 @@ impl Consumer { /// how the server will keep track of this batch request over time. See the docs for /// `NextRequest` for more information about the options. /// + /// This is a lower-level method and does not filter messages through the `Consumer`'s + /// built-in `dedupe_window` as the various `process*` methods do. + /// /// Requires the `jetstream` feature. pub fn pull_opt( &mut self, @@ -946,11 +1029,17 @@ impl IntervalTree { /// Mark this ID as being processed. Returns `true` /// if this ID was not already marked as processed. pub fn mark_processed(&mut self, id: u64) -> bool { + if self.inner.is_empty() { + self.inner.insert(1, 1); + return true; + } + let (prev_start, prev_end) = self .inner .range(..=&id) .next_back() - .map_or((0, 0), |(s, e)| (*s, *e)); + .map(|(s, e)| (*s, *e)) + .unwrap(); if (prev_start..=prev_end).contains(&id) { // range already includes id @@ -1018,6 +1107,9 @@ mod test { #[test] fn range_tree() { + let mut rt = IntervalTree::default(); + assert!(rt.mark_processed(1)); + let mut rt = IntervalTree { inner: vec![(0, 0), (6, 6)].into_iter().collect(), }; diff --git a/src/jetstream_types.rs b/src/jetstream_types.rs index 5e243cbc5..14fbf7b28 100644 --- a/src/jetstream_types.rs +++ b/src/jetstream_types.rs @@ -191,6 +191,25 @@ pub struct StreamInfo { pub state: StreamState, } +/// Information about a received message +#[derive(Debug, Clone)] +pub struct JetStreamMessageInfo<'a> { + /// The stream name + pub stream: &'a str, + /// The consumer name + pub consumer: &'a str, + /// + pub stream_seq: u64, + /// + pub consumer_seq: u64, + /// + pub delivered: i64, + /// + pub pending: u64, + /// + pub published: std::time::SystemTime, +} + /// information about the given stream. #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] pub struct StreamState { diff --git a/src/message.rs b/src/message.rs index 6adbf1ff9..ed666f562 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,9 +1,11 @@ -use std::{fmt, io}; +use std::{ + fmt, io, + sync::atomic::{AtomicBool, Ordering}, +}; use crate::{client::Client, Headers}; /// A message received on a subject. -#[derive(Clone)] pub struct Message { /// The subject this message came from. pub subject: String, @@ -21,6 +23,26 @@ pub struct Message { /// Client for publishing on the reply subject. #[doc(hidden)] pub client: Client, + + /// Whether this message has already been successfully double-acked + /// using JetStream. + #[doc(hidden)] + pub double_acked: AtomicBool, +} + +impl Clone for Message { + fn clone(&self) -> Message { + Message { + subject: self.subject.clone(), + reply: self.reply.clone(), + data: self.data.clone(), + headers: self.headers.clone(), + client: self.client.clone(), + double_acked: AtomicBool::new( + self.double_acked.load(Ordering::Acquire), + ), + } + } } impl Message { @@ -41,9 +63,15 @@ impl Message { /// the `ack_kind` method below. If you need to block until the /// server acks your ack, use the `double_ack` method instead. /// + /// Returns immediately if this message has already been + /// double-acked. + /// /// Requires the `jetstream` feature. #[cfg(feature = "jetstream")] pub fn ack(&self) -> io::Result<()> { + if self.double_acked.load(Ordering::Acquire) { + return Ok(()); + } self.respond(b"") } @@ -51,6 +79,8 @@ impl Message { /// details of what each variant means. If you need to block until the /// server acks your ack, use the `double_ack` method instead. /// + /// Does not check whether this message has already been double-acked. + /// /// Requires the `jetstream` feature. #[cfg(feature = "jetstream")] pub fn ack_kind( @@ -64,12 +94,17 @@ impl Message { /// that it has received our ack. Retry acknowledgement until we receive a response. /// See `AckKind` documentation for details of what each variant means. /// + /// Returns immediately if this message has already been double-acked. + /// /// Requires the `jetstream` feature. #[cfg(feature = "jetstream")] pub fn double_ack( &self, ack_kind: crate::jetstream::AckKind, ) -> io::Result<()> { + if self.double_acked.load(Ordering::Acquire) { + return Ok(()); + } let original_reply = match self.reply.as_ref() { None => { return Err(io::Error::new( @@ -113,10 +148,42 @@ impl Message { .next_timeout(std::time::Duration::from_millis(100)) .is_ok() { + self.double_acked.store(true, Ordering::Release); return Ok(()); } } } + + /// Returns the JetStream message ID + /// if this is a JetStream message. + /// Returns `None` if this is not + /// a JetStream message with headers + /// set. + /// + /// Requires the `jetstream` feature. + #[cfg(feature = "jetstream")] + pub fn jetstream_message_info( + &self, + ) -> Option> { + let reply = self.reply.as_ref()?; + let mut split = reply.split('.'); + if split.next()? != "$JS" || split.next()? != "ACK" { + return None; + } + Some(crate::jetstream::JetStreamMessageInfo { + stream: split.next()?, + consumer: split.next()?, + delivered: str::parse(split.next()?).ok()?, + stream_seq: str::parse(split.next()?).ok()?, + consumer_seq: str::parse(split.next()?).ok()?, + published: { + let nanos: u64 = str::parse(split.next()?).ok()?; + let offset = std::time::Duration::from_nanos(nanos); + std::time::UNIX_EPOCH + offset + }, + pending: str::parse(split.next()?).ok()?, + }) + } } impl fmt::Debug for Message { diff --git a/tests/jetstream.rs b/tests/jetstream.rs index 0985dc403..3a24444a4 100644 --- a/tests/jetstream.rs +++ b/tests/jetstream.rs @@ -49,7 +49,7 @@ fn jetstream_create_consumer() -> io::Result<()> { let nc = nats::connect(&format!("localhost:{}", server.port)).unwrap(); nc.create_stream("stream1")?; - let consumer = nc.create_consumer("stream1", "consumer1")?; + let mut consumer = nc.create_consumer("stream1", "consumer1")?; Ok(()) } @@ -72,7 +72,7 @@ fn jetstream_basics() -> io::Result<()> { nc.stream_info("test2")?; nc.create_consumer("test2", "consumer1")?; - let consumer2_cfg = ConsumerConfig { + let mut consumer2_cfg = ConsumerConfig { durable_name: Some("consumer2".to_string()), ack_policy: AckPolicy::All, deliver_subject: Some("consumer2_ds".to_string()), @@ -87,13 +87,13 @@ fn jetstream_basics() -> io::Result<()> { assert_eq!(nc.stream_info("test2")?.state.messages, 1000); - let consumer1 = Consumer::existing(nc.clone(), "test2", "consumer1")?; + let mut consumer1 = Consumer::existing(nc.clone(), "test2", "consumer1")?; for _ in 1..=1000 { consumer1.process(|_msg| Ok(()))?; } - let consumer2 = Consumer::existing(nc.clone(), "test2", consumer2_cfg)?; + let mut consumer2 = Consumer::existing(nc.clone(), "test2", consumer2_cfg)?; let mut count = 0; while count != 1000 { @@ -116,7 +116,7 @@ fn jetstream_basics() -> io::Result<()> { nc.create_consumer("test2", "consumer3")?; - let consumer3 = Consumer::existing(nc.clone(), "test2", "consumer3")?; + let mut consumer3 = Consumer::existing(nc.clone(), "test2", "consumer3")?; let _ = dbg!(nc.account_info()); From 9722a20dee94500d73aaf6c6793bf788d72be648 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 9 Mar 2021 16:04:51 +0100 Subject: [PATCH 10/13] Clean up some unused variables --- src/jetstream.rs | 1 - tests/jetstream.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index 13419ff9a..4f0175b8f 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -576,7 +576,6 @@ impl NatsClient { Res: DeserializeOwned, { let res_msg = self.request(subject, req)?; - println!("got response: {:?}", std::str::from_utf8(&res_msg.data)); let res: ApiResponse = serde_json::de::from_slice(&res_msg.data)?; match res { ApiResponse::Ok(stream_info) => Ok(stream_info), diff --git a/tests/jetstream.rs b/tests/jetstream.rs index 3a24444a4..ed375a6e3 100644 --- a/tests/jetstream.rs +++ b/tests/jetstream.rs @@ -49,7 +49,7 @@ fn jetstream_create_consumer() -> io::Result<()> { let nc = nats::connect(&format!("localhost:{}", server.port)).unwrap(); nc.create_stream("stream1")?; - let mut consumer = nc.create_consumer("stream1", "consumer1")?; + nc.create_consumer("stream1", "consumer1")?; Ok(()) } @@ -116,7 +116,7 @@ fn jetstream_basics() -> io::Result<()> { nc.create_consumer("test2", "consumer3")?; - let mut consumer3 = Consumer::existing(nc.clone(), "test2", "consumer3")?; + Consumer::existing(nc.clone(), "test2", "consumer3")?; let _ = dbg!(nc.account_info()); From 80942a21b1b3dc838b33591481147022f237a453 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 9 Mar 2021 16:06:52 +0100 Subject: [PATCH 11/13] Update docs --- src/jetstream.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index 4f0175b8f..85444aa90 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -113,14 +113,6 @@ //! // field. //! let mut consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; //! -//! // wait indefinitely for the message to arrive -//! let msg = consumer.pull()?; -//! -//! // --- process message --- -//! -//! // tell the server the message has been processed -//! msg.ack()?; -//! //! // The `Consumer::process` method executes a closure //! // on both push- and pull-based consumers, and if //! // the closure returns `Ok` then the message is acked. @@ -160,6 +152,18 @@ //! let flipped: std::io::Result> = results.into_iter().collect(); //! let sizes: Vec = flipped?; //! +//! // For lower-level control for use cases that are not +//! // well-served by the high-level process* methods, +//! // there are a number of lower level primitives that +//! // can be used, such as `Consumer::pull` for pull-based +//! // consumers and `Message::ack` for manually acking things: +//! let msg = consumer.pull()?; +//! +//! // --- process message --- +//! +//! // tell the server the message has been processed +//! msg.ack()?; +//! //! # Ok(()) } //! ``` From 26b12c57152a0022fbffe54aacfd2e8d81a10d19 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 9 Mar 2021 16:47:50 +0100 Subject: [PATCH 12/13] Clippy feedback --- src/message.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/message.rs b/src/message.rs index ed666f562..755363ad4 100644 --- a/src/message.rs +++ b/src/message.rs @@ -25,7 +25,7 @@ pub struct Message { pub client: Client, /// Whether this message has already been successfully double-acked - /// using JetStream. + /// using `JetStream`. #[doc(hidden)] pub double_acked: AtomicBool, } @@ -154,10 +154,10 @@ impl Message { } } - /// Returns the JetStream message ID - /// if this is a JetStream message. + /// Returns the `JetStream` message ID + /// if this is a `JetStream` message. /// Returns `None` if this is not - /// a JetStream message with headers + /// a `JetStream` message with headers /// set. /// /// Requires the `jetstream` feature. From 055e95b00c0f73ebf78ea082f643e01402755bdb Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 9 Mar 2021 16:54:35 +0100 Subject: [PATCH 13/13] Cut 0.9.6 --- CHANGELOG.md | 9 +++++++++ Cargo.toml | 2 +- async-nats/Cargo.toml | 4 ++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 856e555cd..6ea764176 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# 0.9.6 + +## New Features + +- JetStream consumers are now better supported + through the `Consumer::process*` methods, + which also perform message deduplication + backed by an interval tree. + # 0.9.5 ## New Features diff --git a/Cargo.toml b/Cargo.toml index 1d739ce44..26d59beff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nats" -version = "0.9.5" +version = "0.9.6" description = "A Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index b12dc6b81..b52fe2af8 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-nats" -version = "0.9.5" +version = "0.9.6" description = "An async Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" @@ -17,7 +17,7 @@ maintenance = { status = "actively-developed" } [dependencies] blocking = "1.0.2" -nats = { path = "..", version = "0.9.5" } +nats = { path = "..", version = "0.9.6" } [dev-dependencies] smol = "1.2.5"