Skip to content

Commit

Permalink
Merge pull request #195 from nats-io/tyler_nonconditional_js
Browse files Browse the repository at this point in the history
Make jetstream generally available, and remove the jetstream feature
  • Loading branch information
spacejam authored Aug 11, 2021
2 parents 3a10268 + e3b3aca commit 29535f7
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 92 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ jobs:
RUST_LOG: trace
run: |
rustup update
cargo test --doc --features=jetstream
cargo test --doc
examples:
name: examples
Expand All @@ -144,7 +144,7 @@ jobs:
RUST_LOG: trace
run: |
rustup update
cargo check --examples --features=jetstream
cargo check --examples
jetstream:
name: JetStream
Expand All @@ -170,7 +170,7 @@ jobs:
RUST_LOG: trace
run: |
rustup update
cargo test jetstream --features=jetstream
cargo test jetstream
msrv:
name: MSRV
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# 0.12.0

## Breaking Changes

- #195 JetStream support is now available without
any feature set, and the jetstream feature has
been removed.

# 0.11.0

## Breaking Changes
Expand Down
14 changes: 4 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nats"
version = "0.11.0"
version = "0.12.0"
description = "A Rust NATS client"
authors = ["Derek Collison <[email protected]>", "Tyler Neely <[email protected]>", "Stjepan Glavina <[email protected]>"]
edition = "2018"
Expand All @@ -12,12 +12,8 @@ readme = "README.md"
keywords = ["nats", "client", "messaging", "api"]
categories = ["network-programming", "api-bindings"]

[package.metadata.docs.rs]
features = ["jetstream"]

[features]
fault_injection = []
jetstream = ["serde", "chrono", "serde_json"]

[badges]
maintenance = { status = "actively-developed" }
Expand Down Expand Up @@ -47,9 +43,9 @@ regex = { version = "1.5.4", default-features = false, features = ["std", "unico
rustls = "0.19.1"
rustls-native-certs = "0.5.0"
webpki = "0.21.0"
serde = { version = "1.0.126", optional = true, features = ["derive"] }
serde_json = { version = "1.0.64", optional = true }
chrono = { version = "0.4.19", optional = true, features = ["serde"] }
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
chrono = { version = "0.4.19", features = ["serde"] }
memchr = "2.4.0"

[target.'cfg(unix)'.dependencies]
Expand All @@ -65,8 +61,6 @@ historian = "4.0.4"
lazy_static = "1.4.0"
nats_test_server = { path = "nats_test_server" }
quicli = "0.4.0"
serde_json = "1.0.64"
serde = { version = "1.0.126", features = ["derive"] }
smol = "1.2.5"
structopt = "0.3.21"

Expand Down
7 changes: 0 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ pub struct Client {
shutdown: Arc<Mutex<bool>>,

/// The options that this `Client` was created using.
#[cfg(feature = "jetstream")]
pub(crate) options: Arc<Options>,
}

Expand Down Expand Up @@ -106,17 +105,11 @@ impl Client {
}),
server_info: Arc::new(Mutex::new(ServerInfo::default())),
shutdown: Arc::new(Mutex::new(false)),

#[cfg(feature = "jetstream")]
options: Arc::new(options),
};

#[cfg(feature = "jetstream")]
let options = client.options.clone();

#[cfg(not(feature = "jetstream"))]
let options = Arc::new(options);

// Connector for creating the initial connection and reconnecting when
// it is broken.
let connector = Connector::new(url, options.clone())?;
Expand Down
63 changes: 12 additions & 51 deletions src/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Experimental `JetStream` support enabled via the `jetstream` feature.
//! Support for the `JetStream` at-least-once messaging system.
//!
//! # Examples
//!
Expand Down Expand Up @@ -280,8 +280,6 @@ where

impl NatsClient {
/// Create a `JetStream` stream.
///
/// Requires the `jetstream` feature.
pub fn create_stream<S>(&self, stream_config: S) -> io::Result<StreamInfo>
where
StreamConfig: From<S>,
Expand All @@ -300,8 +298,6 @@ impl NatsClient {
}

/// Update a `JetStream` stream.
///
/// Requires the `jetstream` feature.
pub fn update_stream(&self, cfg: &StreamConfig) -> io::Result<StreamInfo> {
if cfg.name.is_empty() {
return Err(Error::new(
Expand All @@ -317,8 +313,6 @@ impl NatsClient {

/// List all `JetStream` stream names. If you also want stream information,
/// use the `list_streams` method instead.
///
/// Requires the `jetstream` feature.
pub fn stream_names(&self) -> PagedIterator<'_, String> {
PagedIterator {
subject: format!("{}STREAM.NAMES", self.api_prefix()),
Expand All @@ -330,8 +324,6 @@ impl NatsClient {
}

/// List all `JetStream` streams.
///
/// Requires the `jetstream` feature.
pub fn list_streams(&self) -> PagedIterator<'_, StreamInfo> {
PagedIterator {
subject: format!("{}STREAM.LIST", self.api_prefix()),
Expand All @@ -343,8 +335,6 @@ impl NatsClient {
}

/// List `JetStream` consumers for a stream.
///
/// Requires the `jetstream` feature.
pub fn list_consumers<S>(
&self,
stream: S,
Expand Down Expand Up @@ -372,8 +362,6 @@ impl NatsClient {
}

/// Query `JetStream` stream information.
///
/// Requires the `jetstream` feature.
pub fn stream_info<S: AsRef<str>>(
&self,
stream: S,
Expand All @@ -391,8 +379,6 @@ impl NatsClient {
}

/// Purge `JetStream` stream messages.
///
/// Requires the `jetstream` feature.
pub fn purge_stream<S: AsRef<str>>(
&self,
stream: S,
Expand All @@ -409,8 +395,6 @@ impl NatsClient {
}

/// Delete message in a `JetStream` stream.
///
/// Requires the `jetstream` feature.
pub fn delete_message<S: AsRef<str>>(
&self,
stream: S,
Expand All @@ -437,8 +421,6 @@ impl NatsClient {
}

/// Delete `JetStream` stream.
///
/// Requires the `jetstream` feature.
pub fn delete_stream<S: AsRef<str>>(&self, stream: S) -> io::Result<bool> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
Expand All @@ -454,8 +436,6 @@ impl NatsClient {
}

/// Create a `JetStream` consumer.
///
/// Requires the `jetstream` feature.
pub fn create_consumer<S, C>(
&self,
stream: S,
Expand Down Expand Up @@ -503,8 +483,6 @@ impl NatsClient {
}

/// Delete a `JetStream` consumer.
///
/// Requires the `jetstream` feature.
pub fn delete_consumer<S, C>(
&self,
stream: S,
Expand Down Expand Up @@ -541,8 +519,6 @@ impl NatsClient {
}

/// Query `JetStream` consumer information.
///
/// Requires the `jetstream` feature.
pub fn consumer_info<S, C>(
&self,
stream: S,
Expand Down Expand Up @@ -570,8 +546,6 @@ impl NatsClient {
}

/// Query `JetStream` account information.
///
/// Requires the `jetstream` feature.
pub fn account_info(&self) -> io::Result<AccountInfo> {
self.js_request(&format!("{}INFO", self.api_prefix()), b"")
}
Expand Down Expand Up @@ -630,8 +604,6 @@ impl Consumer {
/// `ConsumerInfo` that may have been returned
/// from the `nats::Connection::list_consumers`
/// iterator.
///
/// Requires the `jetstream` feature.
pub fn from_consumer_info(
ci: ConsumerInfo,
nc: NatsClient,
Expand All @@ -647,8 +619,6 @@ impl Consumer {
/// already exists, and creates it if not. If you want to use an existing
/// `Consumer` without this check and creation, use the `Consumer::existing`
/// method.
///
/// Requires the `jetstream` feature.
pub fn create_or_open<S, C>(
nc: NatsClient,
stream: S,
Expand Down Expand Up @@ -678,8 +648,6 @@ impl Consumer {
}

/// Use an existing `JetStream` `Consumer`
///
/// Requires the `jetstream` feature.
pub fn existing<S, C>(
nc: NatsClient,
stream: S,
Expand Down Expand Up @@ -741,8 +709,6 @@ impl Consumer {
/// a message that has already been processed is received, it will
/// be acked and skipped. Errors for acking deduplicated messages
/// are not included in the returned `Vec`.
///
/// Requires the `jetstream` feature.
pub fn process_batch<R, F: FnMut(&Message) -> io::Result<R>>(
&mut self,
batch_size: usize,
Expand Down Expand Up @@ -782,14 +748,17 @@ impl Consumer {

let mut received = 0;

while let Ok(next) = responses.next_timeout(if received == 0 {
// wait "forever" for first message
Duration::new(std::u64::MAX >> 2, 0)
} else {
self.timeout
.checked_sub(start.elapsed())
.unwrap_or_default()
}) {
while let Some(next) = {
if received == 0 {
responses.next()
} else {
let timeout = self
.timeout
.checked_sub(start.elapsed())
.unwrap_or_default();
responses.next_timeout(timeout).ok()
}
} {
let next_id = next.jetstream_message_info().unwrap().stream_seq;

if self.dedupe_window.already_processed(next_id) {
Expand Down Expand Up @@ -849,8 +818,6 @@ impl Consumer {
/// the `double_ack` method of the argument message. If you require
/// both the returned `Ok` from the closure and the `Err` from a
/// failed ack, use `process_batch` instead.
///
/// Requires the `jetstream` feature.
pub fn process<R, F: Fn(&Message) -> io::Result<R>>(
&mut self,
f: F,
Expand Down Expand Up @@ -918,8 +885,6 @@ impl Consumer {
/// the `double_ack` method of the argument message. If you require
/// both the returned `Ok` from the closure and the `Err` from a
/// failed ack, use `process_batch` instead.
///
/// Requires the `jetstream` feature.
pub fn process_timeout<R, F: Fn(&Message) -> io::Result<R>>(
&mut self,
f: F,
Expand Down Expand Up @@ -969,8 +934,6 @@ impl Consumer {
///
/// This is a lower-level method and does not filter messages through the `Consumer`'s
/// built-in `dedupe_window` as the various `process*` methods do.
///
/// Requires the `jetstream` feature.
pub fn pull(&mut self) -> io::Result<Message> {
let ret_opt = self
.pull_opt(NextRequest {
Expand All @@ -996,8 +959,6 @@ impl Consumer {
///
/// This is a lower-level method and does not filter messages through the `Consumer`'s
/// built-in `dedupe_window` as the various `process*` methods do.
///
/// Requires the `jetstream` feature.
pub fn pull_opt(
&mut self,
next_request: NextRequest,
Expand Down
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,16 @@ mod client;
mod connect;
mod connector;
mod headers;
mod jetstream_types;
mod message;
mod options;
mod proto;
mod secure_wipe;
mod subscription;

#[cfg(feature = "jetstream")]
/// `JetStream` stream management and consumers.
pub mod jetstream;

#[cfg(feature = "jetstream")]
mod jetstream_types;

#[cfg(feature = "fault_injection")]
mod fault_injection;

Expand Down
12 changes: 0 additions & 12 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ impl Message {
///
/// Returns immediately if this message has already been
/// double-acked.
///
/// Requires the `jetstream` feature.
#[cfg(feature = "jetstream")]
pub fn ack(&self) -> io::Result<()> {
if self.double_acked.load(Ordering::Acquire) {
return Ok(());
Expand All @@ -80,9 +77,6 @@ impl Message {
/// server acks your ack, use the `double_ack` method instead.
///
/// Does not check whether this message has already been double-acked.
///
/// Requires the `jetstream` feature.
#[cfg(feature = "jetstream")]
pub fn ack_kind(
&self,
ack_kind: crate::jetstream::AckKind,
Expand All @@ -95,9 +89,6 @@ impl Message {
/// See `AckKind` documentation for details of what each variant means.
///
/// Returns immediately if this message has already been double-acked.
///
/// Requires the `jetstream` feature.
#[cfg(feature = "jetstream")]
pub fn double_ack(
&self,
ack_kind: crate::jetstream::AckKind,
Expand Down Expand Up @@ -159,9 +150,6 @@ impl Message {
/// Returns `None` if this is not
/// a `JetStream` message with headers
/// set.
///
/// Requires the `jetstream` feature.
#[cfg(feature = "jetstream")]
pub fn jetstream_message_info(
&self,
) -> Option<crate::jetstream::JetStreamMessageInfo<'_>> {
Expand Down
Loading

0 comments on commit 29535f7

Please sign in to comment.