Skip to content

Commit

Permalink
Merge pull request #157 from nats-io/tyler_0.9.7
Browse files Browse the repository at this point in the history
Cut 0.9.7
  • Loading branch information
spacejam committed Mar 10, 2021
2 parents 2ed02ab + 9432945 commit e3fb52d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 13 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# 0.9.7

## Improvements

- Improved error log output and avoid panicking
when problems are encountered while parsing the
JetStream reply subject.

# 0.9.6

## New Features
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nats"
version = "0.9.6"
version = "0.9.7"
description = "A Rust NATS client"
authors = ["Derek Collison <[email protected]>", "Tyler Neely <[email protected]>", "Stjepan Glavina <[email protected]>"]
edition = "2018"
Expand Down
4 changes: 2 additions & 2 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "async-nats"
version = "0.9.6"
version = "0.9.7"
description = "An async Rust NATS client"
authors = ["Derek Collison <[email protected]>", "Tyler Neely <[email protected]>", "Stjepan Glavina <[email protected]>"]
edition = "2018"
Expand All @@ -17,7 +17,7 @@ maintenance = { status = "actively-developed" }

[dependencies]
blocking = "1.0.2"
nats = { path = "..", version = "0.9.6" }
nats = { path = "..", version = "0.9.7" }

[dev-dependencies]
smol = "1.2.5"
Expand Down
12 changes: 10 additions & 2 deletions src/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ impl Consumer {
return Err(Error::new(
ErrorKind::InvalidInput,
"process and process_batch are only usable from \
Pull-based Consumers if there is a durable_name set",
Pull-based Consumers if there is a durable_name set",
));
}

Expand All @@ -871,7 +871,15 @@ impl Consumer {
self.nc.request(&subject, AckKind::Ack)?
};

let next_id = next.jetstream_message_info().unwrap().stream_seq;
let next_id = if let Some(jmi) = next.jetstream_message_info() {
jmi.stream_seq
} else {
return Err(Error::new(
ErrorKind::Other,
"failed to process jetstream message info \
from message reply subject. Is your nats-server up to date?"
));
};

if self.dedupe_window.already_processed(next_id) {
let _dont_care = next.ack();
Expand Down
46 changes: 39 additions & 7 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,50 @@ impl Message {
if split.next()? != "$JS" || split.next()? != "ACK" {
return None;
}

macro_rules! try_parse {
() => {
match str::parse(try_parse!(str)) {
Ok(parsed) => parsed,
Err(e) => {
log::error!(
"failed to parse jetstream reply \
subject: {}, error: {:?}. Is your \
nats-server up to date?",
reply,
e
);
return None;
}
}
};
(str) => {
if let Some(next) = split.next() {
next
} else {
log::error!(
"unexpectedly few tokens while parsing \
jetstream reply subject: {}. Is your \
nats-server up to date?",
reply
);
return None;
}
};
}

Some(crate::jetstream::JetStreamMessageInfo {
stream: split.next()?,
consumer: split.next()?,
delivered: str::parse(split.next()?).ok()?,
stream_seq: str::parse(split.next()?).ok()?,
consumer_seq: str::parse(split.next()?).ok()?,
stream: try_parse!(str),
consumer: try_parse!(str),
delivered: try_parse!(),
stream_seq: try_parse!(),
consumer_seq: try_parse!(),
published: {
let nanos: u64 = str::parse(split.next()?).ok()?;
let nanos: u64 = try_parse!();
let offset = std::time::Duration::from_nanos(nanos);
std::time::UNIX_EPOCH + offset
},
pending: str::parse(split.next()?).ok()?,
pending: try_parse!(),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn jetstream_basics() -> io::Result<()> {
nc.stream_info("test2")?;
nc.create_consumer("test2", "consumer1")?;

let mut consumer2_cfg = ConsumerConfig {
let consumer2_cfg = ConsumerConfig {
durable_name: Some("consumer2".to_string()),
ack_policy: AckPolicy::All,
deliver_subject: Some("consumer2_ds".to_string()),
Expand Down

0 comments on commit e3fb52d

Please sign in to comment.