Skip to content

Commit

Permalink
Remove recursive calls
Browse files Browse the repository at this point in the history
Co-authored-by: Zano <[email protected]>
  • Loading branch information
segfaultdoc and Zano authored May 21, 2022
1 parent ad0728f commit a7b156e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 78 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
**/.idea
target/
Cargo.lock
48 changes: 26 additions & 22 deletions nats/src/jetstream/pull_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

use std::io;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use crate::jetstream::{ConsumerInfo, ConsumerOwnership, JetStream};
use crate::Message;
Expand Down Expand Up @@ -315,28 +315,32 @@ impl PullSubscription {
/// # Ok(())
/// # }
/// ```
pub fn next_timeout(&self, timeout: Duration) -> io::Result<Message> {
match self.0.messages.recv_timeout(timeout) {
Ok(message) => {
if message.is_no_messages() {
return self.next_timeout(timeout);
pub fn next_timeout(&self, mut timeout: Duration) -> io::Result<Message> {
loop {
let start = Instant::now();
return match self.0.messages.recv_timeout(timeout) {
Ok(message) => {
if message.is_no_messages() {
timeout = timeout.saturating_sub(start.elapsed());
continue;
}
if message.is_request_timeout() {
return Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: Pull Request timed out",
));
}
Ok(message)
}
if message.is_request_timeout() {
return Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: Pull Request timed out",
));
}
Ok(message)
}
Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"next_timeout: timed out",
)),
Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: unsubscribed",
)),
Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"next_timeout: timed out",
)),
Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: unsubscribed",
)),
};
}
}

Expand Down
74 changes: 41 additions & 33 deletions nats/src/jetstream/push_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};

use crossbeam_channel as channel;

Expand Down Expand Up @@ -127,15 +127,17 @@ impl PushSubscription {
/// # }
/// ```
pub fn next(&self) -> Option<Message> {
match self.0.messages.recv().ok() {
Some(message) => {
if self.preprocess(&message) {
return self.next();
}
loop {
return match self.0.messages.recv().ok() {
Some(message) => {
if self.preprocess(&message) {
continue;
}

Some(message)
}
None => None,
Some(message)
}
None => None,
};
}
}

Expand All @@ -159,15 +161,17 @@ impl PushSubscription {
/// # }
/// ```
pub fn try_next(&self) -> Option<Message> {
match self.0.messages.try_recv().ok() {
Some(message) => {
if self.preprocess(&message) {
return self.try_next();
}
loop {
return match self.0.messages.try_recv().ok() {
Some(message) => {
if self.preprocess(&message) {
continue;
}

Some(message)
}
None => None,
Some(message)
}
None => None,
};
}
}

Expand All @@ -187,23 +191,27 @@ impl PushSubscription {
/// # Ok(())
/// # }
/// ```
pub fn next_timeout(&self, timeout: Duration) -> io::Result<Message> {
match self.0.messages.recv_timeout(timeout) {
Ok(message) => {
if self.preprocess(&message) {
return self.next_timeout(timeout);
}
pub fn next_timeout(&self, mut timeout: Duration) -> io::Result<Message> {
loop {
let start = Instant::now();
return match self.0.messages.recv_timeout(timeout) {
Ok(message) => {
if self.preprocess(&message) {
timeout = timeout.saturating_sub(start.elapsed());
continue;
}

Ok(message)
}
Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"next_timeout: timed out",
)),
Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: unsubscribed",
)),
Ok(message)
}
Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"next_timeout: timed out",
)),
Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: unsubscribed",
)),
};
}
}

Expand Down
47 changes: 24 additions & 23 deletions nats/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,33 +780,34 @@ impl Iterator for Keys {
type Item = String;

fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
loop {
if self.done {
return None;
}
return match self.subscription.next() {
Some(message) => {
// If there are no more pending messages we'll stop after delivering the key
// derived from this message.
if let Some(info) = message.jetstream_message_info() {
if info.pending == 0 {
self.done = true;
}
}

match self.subscription.next() {
Some(message) => {
// If there are no more pending messages we'll stop after delivering the key
// derived from this message.
if let Some(info) = message.jetstream_message_info() {
if info.pending == 0 {
self.done = true;
// We are only interested in unique current keys from subjects so we skip delete
// and purge markers.
let operation = kv_operation_from_maybe_headers(message.headers.as_ref());
if operation != Operation::Put {
continue;
}
}

// We are only interested in unique current keys from subjects so we skip delete
// and purge markers.
let operation = kv_operation_from_maybe_headers(message.headers.as_ref());
if operation != Operation::Put {
return self.next();
message
.subject
.strip_prefix(&self.prefix)
.map(|s| s.to_string())
}

message
.subject
.strip_prefix(&self.prefix)
.map(|s| s.to_string())
}
None => None,
None => None,
};
}
}
}
Expand Down

0 comments on commit a7b156e

Please sign in to comment.