diff --git a/async-nats/src/jetstream/consumer/mod.rs b/async-nats/src/jetstream/consumer/mod.rs index dc2ed42a2..e3b836e15 100644 --- a/async-nats/src/jetstream/consumer/mod.rs +++ b/async-nats/src/jetstream/consumer/mod.rs @@ -162,6 +162,11 @@ pub struct Info { /// Indicates if any client is connected and receiving messages from a push consumer #[serde(default, skip_serializing_if = "is_default")] pub push_bound: bool, + /// Indicates if the consumer is paused + pub paused: bool, + /// The remaining time the consumer is paused + #[serde(with = "serde_nanos")] + pub pause_remaining: Option, } /// Information about a consumer and the stream it is consuming diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index b2a283865..71035b5a5 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -850,6 +850,64 @@ impl Stream { } } + /// Pause a [Consumer] until the given time. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::consumer; + /// use futures::StreamExt; + /// let client = async_nats::connect("localhost:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let pause_until = time::OffsetDateTime::now_utc() + time::Duration::from_secs(60); + /// + /// jetstream + /// .get_stream("events") + /// .await? + /// .pause_consumer("my_consumer", pause_until) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn pause_consumer(&self, name: &str, pause_until: OffsetDateTime) -> Result { + self.request_pause_consumer(name, Some(pause_until)).await + } + + /// Resume a paused [Consumer]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::consumer; + /// use futures::StreamExt; + /// let client = async_nats::connect("localhost:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// jetstream + /// .get_stream("events") + /// .await? + /// .resume_consumer("my_consumer") + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn resume_consumer(&self, name: &str) -> Result { + self.request_pause_consumer(name, None).await + } + + async fn request_pause_consumer(&self, name: &str, pause_until: Option) -> Result { + let subject = format!("CONSUMER.PAUSE.{}.{}", self.info.config.name, name); + let payload = &PauseConsumerRequest{ pause_until }; + match self.context.request(subject, payload).await? { + Response::Ok::(resp) => { Ok(resp) } + Response::Err { error } => Err(error.into()), + } + } + /// Lists names of all consumers for current stream. /// /// # Examples @@ -1024,6 +1082,10 @@ pub struct Config { /// Sets the first sequence for the stream. #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")] pub first_sequence: Option, + + /// PauseUntil is for suspending the consumer until the deadline. + #[sende(with = "rfc3339")] + pub pause_until: Option, } impl From<&Config> for Config { @@ -1167,6 +1229,21 @@ pub struct DeleteStatus { pub success: bool, } +#[derive(Deserialize)] +pub struct PauseResponse { + pub paused: bool, + #[sende(with = "rfc3339")] + pub pause_until: Option, + #[sende(with = "serde_nanos")] + pub pause_remaining: Option, +} + +#[derive(Serialize)] +struct PauseConsumerRequest { + #[serde(with = "rfc3339", skip_serializing_if = "Option::is_none")] + pause_until: Option, +} + /// information about the given stream. #[derive(Debug, Deserialize, Clone, Copy)] pub struct State { diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 5e62df07b..b706d92fb 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -3475,6 +3475,7 @@ mod jetstream { max_ack_pending: 150, }), first_sequence: Some(505), + pause_until: None, }; let stream = jetstream.create_stream(config.clone()).await.unwrap();