diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b4993fbd0..d070dced3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -118,7 +118,7 @@ jobs: RUST_LOG: trace run: | rustup update - cargo test --doc --features=jetstream + cargo test --doc examples: name: examples @@ -144,7 +144,7 @@ jobs: RUST_LOG: trace run: | rustup update - cargo check --examples --features=jetstream + cargo check --examples jetstream: name: JetStream @@ -170,7 +170,7 @@ jobs: RUST_LOG: trace run: | rustup update - cargo test jetstream --features=jetstream + cargo test jetstream msrv: name: MSRV diff --git a/CHANGELOG.md b/CHANGELOG.md index 28091f99e..93e1bd41e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.12.0 + +## Breaking Changes + +- #195 JetStream support is now available without + any feature set, and the jetstream feature has + been removed. + # 0.11.0 ## Breaking Changes diff --git a/Cargo.toml b/Cargo.toml index b9a6adb35..137a3f873 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nats" -version = "0.11.0" +version = "0.12.0" description = "A Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" @@ -12,12 +12,8 @@ readme = "README.md" keywords = ["nats", "client", "messaging", "api"] categories = ["network-programming", "api-bindings"] -[package.metadata.docs.rs] -features = ["jetstream"] - [features] fault_injection = [] -jetstream = ["serde", "chrono", "serde_json"] [badges] maintenance = { status = "actively-developed" } @@ -47,9 +43,9 @@ regex = { version = "1.5.4", default-features = false, features = ["std", "unico rustls = "0.19.1" rustls-native-certs = "0.5.0" webpki = "0.21.0" -serde = { version = "1.0.126", optional = true, features = ["derive"] } -serde_json = { version = "1.0.64", optional = true } -chrono = { version = "0.4.19", optional = true, features = ["serde"] } +serde = { version = "1.0.126", features = ["derive"] } +serde_json = "1.0.64" +chrono = { version = "0.4.19", features = ["serde"] } memchr = "2.4.0" [target.'cfg(unix)'.dependencies] @@ -65,8 +61,6 @@ historian = "4.0.4" lazy_static = "1.4.0" nats_test_server = { path = "nats_test_server" } quicli = "0.4.0" -serde_json = "1.0.64" -serde = { version = "1.0.126", features = ["derive"] } smol = "1.2.5" structopt = "0.3.21" diff --git a/src/client.rs b/src/client.rs index 04ca7f172..b4e67fc23 100644 --- a/src/client.rs +++ b/src/client.rs @@ -76,7 +76,6 @@ pub struct Client { shutdown: Arc>, /// The options that this `Client` was created using. - #[cfg(feature = "jetstream")] pub(crate) options: Arc, } @@ -106,17 +105,11 @@ impl Client { }), server_info: Arc::new(Mutex::new(ServerInfo::default())), shutdown: Arc::new(Mutex::new(false)), - - #[cfg(feature = "jetstream")] options: Arc::new(options), }; - #[cfg(feature = "jetstream")] let options = client.options.clone(); - #[cfg(not(feature = "jetstream"))] - let options = Arc::new(options); - // Connector for creating the initial connection and reconnecting when // it is broken. let connector = Connector::new(url, options.clone())?; diff --git a/src/jetstream.rs b/src/jetstream.rs index 70e0aa049..f26fc6733 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Experimental `JetStream` support enabled via the `jetstream` feature. +//! Support for the `JetStream` at-least-once messaging system. //! //! # Examples //! @@ -280,8 +280,6 @@ where impl NatsClient { /// Create a `JetStream` stream. - /// - /// Requires the `jetstream` feature. pub fn create_stream(&self, stream_config: S) -> io::Result where StreamConfig: From, @@ -300,8 +298,6 @@ impl NatsClient { } /// Update a `JetStream` stream. - /// - /// Requires the `jetstream` feature. pub fn update_stream(&self, cfg: &StreamConfig) -> io::Result { if cfg.name.is_empty() { return Err(Error::new( @@ -317,8 +313,6 @@ impl NatsClient { /// List all `JetStream` stream names. If you also want stream information, /// use the `list_streams` method instead. - /// - /// Requires the `jetstream` feature. pub fn stream_names(&self) -> PagedIterator<'_, String> { PagedIterator { subject: format!("{}STREAM.NAMES", self.api_prefix()), @@ -330,8 +324,6 @@ impl NatsClient { } /// List all `JetStream` streams. - /// - /// Requires the `jetstream` feature. pub fn list_streams(&self) -> PagedIterator<'_, StreamInfo> { PagedIterator { subject: format!("{}STREAM.LIST", self.api_prefix()), @@ -343,8 +335,6 @@ impl NatsClient { } /// List `JetStream` consumers for a stream. - /// - /// Requires the `jetstream` feature. pub fn list_consumers( &self, stream: S, @@ -372,8 +362,6 @@ impl NatsClient { } /// Query `JetStream` stream information. - /// - /// Requires the `jetstream` feature. pub fn stream_info>( &self, stream: S, @@ -391,8 +379,6 @@ impl NatsClient { } /// Purge `JetStream` stream messages. - /// - /// Requires the `jetstream` feature. pub fn purge_stream>( &self, stream: S, @@ -409,8 +395,6 @@ impl NatsClient { } /// Delete message in a `JetStream` stream. - /// - /// Requires the `jetstream` feature. pub fn delete_message>( &self, stream: S, @@ -437,8 +421,6 @@ impl NatsClient { } /// Delete `JetStream` stream. - /// - /// Requires the `jetstream` feature. pub fn delete_stream>(&self, stream: S) -> io::Result { let stream: &str = stream.as_ref(); if stream.is_empty() { @@ -454,8 +436,6 @@ impl NatsClient { } /// Create a `JetStream` consumer. - /// - /// Requires the `jetstream` feature. pub fn create_consumer( &self, stream: S, @@ -503,8 +483,6 @@ impl NatsClient { } /// Delete a `JetStream` consumer. - /// - /// Requires the `jetstream` feature. pub fn delete_consumer( &self, stream: S, @@ -541,8 +519,6 @@ impl NatsClient { } /// Query `JetStream` consumer information. - /// - /// Requires the `jetstream` feature. pub fn consumer_info( &self, stream: S, @@ -570,8 +546,6 @@ impl NatsClient { } /// Query `JetStream` account information. - /// - /// Requires the `jetstream` feature. pub fn account_info(&self) -> io::Result { self.js_request(&format!("{}INFO", self.api_prefix()), b"") } @@ -630,8 +604,6 @@ impl Consumer { /// `ConsumerInfo` that may have been returned /// from the `nats::Connection::list_consumers` /// iterator. - /// - /// Requires the `jetstream` feature. pub fn from_consumer_info( ci: ConsumerInfo, nc: NatsClient, @@ -647,8 +619,6 @@ impl Consumer { /// already exists, and creates it if not. If you want to use an existing /// `Consumer` without this check and creation, use the `Consumer::existing` /// method. - /// - /// Requires the `jetstream` feature. pub fn create_or_open( nc: NatsClient, stream: S, @@ -678,8 +648,6 @@ impl Consumer { } /// Use an existing `JetStream` `Consumer` - /// - /// Requires the `jetstream` feature. pub fn existing( nc: NatsClient, stream: S, @@ -741,8 +709,6 @@ impl Consumer { /// a message that has already been processed is received, it will /// be acked and skipped. Errors for acking deduplicated messages /// are not included in the returned `Vec`. - /// - /// Requires the `jetstream` feature. pub fn process_batch io::Result>( &mut self, batch_size: usize, @@ -782,14 +748,17 @@ impl Consumer { let mut received = 0; - while let Ok(next) = responses.next_timeout(if received == 0 { - // wait "forever" for first message - Duration::new(std::u64::MAX >> 2, 0) - } else { - self.timeout - .checked_sub(start.elapsed()) - .unwrap_or_default() - }) { + while let Some(next) = { + if received == 0 { + responses.next() + } else { + let timeout = self + .timeout + .checked_sub(start.elapsed()) + .unwrap_or_default(); + responses.next_timeout(timeout).ok() + } + } { let next_id = next.jetstream_message_info().unwrap().stream_seq; if self.dedupe_window.already_processed(next_id) { @@ -849,8 +818,6 @@ impl Consumer { /// the `double_ack` method of the argument message. If you require /// both the returned `Ok` from the closure and the `Err` from a /// failed ack, use `process_batch` instead. - /// - /// Requires the `jetstream` feature. pub fn process io::Result>( &mut self, f: F, @@ -918,8 +885,6 @@ impl Consumer { /// the `double_ack` method of the argument message. If you require /// both the returned `Ok` from the closure and the `Err` from a /// failed ack, use `process_batch` instead. - /// - /// Requires the `jetstream` feature. pub fn process_timeout io::Result>( &mut self, f: F, @@ -969,8 +934,6 @@ impl Consumer { /// /// This is a lower-level method and does not filter messages through the `Consumer`'s /// built-in `dedupe_window` as the various `process*` methods do. - /// - /// Requires the `jetstream` feature. pub fn pull(&mut self) -> io::Result { let ret_opt = self .pull_opt(NextRequest { @@ -996,8 +959,6 @@ impl Consumer { /// /// This is a lower-level method and does not filter messages through the `Consumer`'s /// built-in `dedupe_window` as the various `process*` methods do. - /// - /// Requires the `jetstream` feature. pub fn pull_opt( &mut self, next_request: NextRequest, diff --git a/src/lib.rs b/src/lib.rs index 089cce990..7f73cceba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -186,19 +186,16 @@ mod client; mod connect; mod connector; mod headers; +mod jetstream_types; mod message; mod options; mod proto; mod secure_wipe; mod subscription; -#[cfg(feature = "jetstream")] /// `JetStream` stream management and consumers. pub mod jetstream; -#[cfg(feature = "jetstream")] -mod jetstream_types; - #[cfg(feature = "fault_injection")] mod fault_injection; diff --git a/src/message.rs b/src/message.rs index 9deb98d4c..ee31554a6 100644 --- a/src/message.rs +++ b/src/message.rs @@ -65,9 +65,6 @@ impl Message { /// /// Returns immediately if this message has already been /// double-acked. - /// - /// Requires the `jetstream` feature. - #[cfg(feature = "jetstream")] pub fn ack(&self) -> io::Result<()> { if self.double_acked.load(Ordering::Acquire) { return Ok(()); @@ -80,9 +77,6 @@ impl Message { /// server acks your ack, use the `double_ack` method instead. /// /// Does not check whether this message has already been double-acked. - /// - /// Requires the `jetstream` feature. - #[cfg(feature = "jetstream")] pub fn ack_kind( &self, ack_kind: crate::jetstream::AckKind, @@ -95,9 +89,6 @@ impl Message { /// See `AckKind` documentation for details of what each variant means. /// /// Returns immediately if this message has already been double-acked. - /// - /// Requires the `jetstream` feature. - #[cfg(feature = "jetstream")] pub fn double_ack( &self, ack_kind: crate::jetstream::AckKind, @@ -159,9 +150,6 @@ impl Message { /// Returns `None` if this is not /// a `JetStream` message with headers /// set. - /// - /// Requires the `jetstream` feature. - #[cfg(feature = "jetstream")] pub fn jetstream_message_info( &self, ) -> Option> { diff --git a/src/options.rs b/src/options.rs index 3cda99cdb..d1df1f067 100644 --- a/src/options.rs +++ b/src/options.rs @@ -26,8 +26,6 @@ pub struct Options { pub(crate) reconnect_callback: Callback, pub(crate) reconnect_delay_callback: ReconnectDelayCallback, pub(crate) close_callback: Callback, - - #[cfg(feature = "jetstream")] pub(crate) jetstream_prefix: String, } @@ -67,7 +65,6 @@ impl Default for Options { reconnect_callback: Callback(None), reconnect_delay_callback: ReconnectDelayCallback(Box::new(backoff)), close_callback: Callback(None), - #[cfg(feature = "jetstream")] jetstream_prefix: "$JS.API.".to_string(), } } @@ -482,7 +479,6 @@ impl Options { /// # Ok(()) /// # } /// ``` - #[cfg(feature = "jetstream")] pub fn jetstream_api_prefix( mut self, mut jetstream_prefix: String, diff --git a/tests/jetstream.rs b/tests/jetstream.rs index 136aa9807..113831a29 100644 --- a/tests/jetstream.rs +++ b/tests/jetstream.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "jetstream")] use std::io; use std::process::{Child, Command}; use std::sync::atomic::{AtomicU16, Ordering::SeqCst};