Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pause consumer #1234

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ pub struct Info {
/// Indicates if any client is connected and receiving messages from a push consumer
#[serde(default, skip_serializing_if = "is_default")]
pub push_bound: bool,
/// Indicates if the consumer is paused
pub paused: bool,
/// The remaining time the consumer is paused
#[serde(with = "serde_nanos")]
pub pause_remaining: Option<Duration>,
}

/// Information about a consumer and the stream it is consuming
Expand Down
77 changes: 77 additions & 0 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,64 @@ impl Stream {
}
}

/// Pause a [Consumer] until the given time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a note about what it means, briefly.

///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer;
/// use futures::StreamExt;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let pause_until = time::OffsetDateTime::now_utc() + time::Duration::from_secs(60);
///
/// jetstream
/// .get_stream("events")
/// .await?
/// .pause_consumer("my_consumer", pause_until)
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn pause_consumer(&self, name: &str, pause_until: OffsetDateTime) -> Result<PauseResponse, ConsumerError> {
self.request_pause_consumer(name, Some(pause_until)).await
}

/// Resume a paused [Consumer].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer;
/// use futures::StreamExt;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// jetstream
/// .get_stream("events")
/// .await?
/// .resume_consumer("my_consumer")
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
self.request_pause_consumer(name, None).await
}

async fn request_pause_consumer(&self, name: &str, pause_until: Option<OffsetDateTime>) -> Result<PauseResponse, ConsumerError> {
let subject = format!("CONSUMER.PAUSE.{}.{}", self.info.config.name, name);
let payload = &PauseConsumerRequest{ pause_until };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use literal json! as the struct is used only once.

match self.context.request(subject, payload).await? {
Response::Ok::<PauseResponse>(resp) => { Ok(resp) }
Response::Err { error } => Err(error.into()),
}
}

/// Lists names of all consumers for current stream.
///
/// # Examples
Expand Down Expand Up @@ -1024,6 +1082,10 @@ pub struct Config {
/// Sets the first sequence for the stream.
#[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
pub first_sequence: Option<u64>,

/// PauseUntil is for suspending the consumer until the deadline.
#[sende(with = "rfc3339")]
pub pause_until: Option<OffsetDateTime>,
}

impl From<&Config> for Config {
Expand Down Expand Up @@ -1167,6 +1229,21 @@ pub struct DeleteStatus {
pub success: bool,
}

#[derive(Deserialize)]
pub struct PauseResponse {
pub paused: bool,
#[sende(with = "rfc3339")]
pub pause_until: Option<OffsetDateTime>,
#[sende(with = "serde_nanos")]
pub pause_remaining: Option<Duration>,
}

#[derive(Serialize)]
struct PauseConsumerRequest {
#[serde(with = "rfc3339", skip_serializing_if = "Option::is_none")]
pause_until: Option<OffsetDateTime>,
}

/// information about the given stream.
#[derive(Debug, Deserialize, Clone, Copy)]
pub struct State {
Expand Down
1 change: 1 addition & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3475,6 +3475,7 @@ mod jetstream {
max_ack_pending: 150,
}),
first_sequence: Some(505),
pause_until: None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add test for pause/unpause.
I'm happy to help with that.

};

let stream = jetstream.create_stream(config.clone()).await.unwrap();
Expand Down