Skip to content

Commit

Permalink
Add extended purge
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Nov 23, 2022
1 parent 08667ef commit ef0bc38
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 36 deletions.
158 changes: 122 additions & 36 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@

use std::{
fmt::Debug,
future::IntoFuture,
io::{self, ErrorKind},
pin::Pin,
str::FromStr,
time::Duration,
};

use crate::{header::HeaderName, HeaderMap, HeaderValue};
use crate::{Error, StatusCode};
use bytes::Bytes;
use futures::Future;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::{serde::rfc3339, OffsetDateTime};
Expand Down Expand Up @@ -515,20 +518,8 @@ impl Stream {
/// # Ok(())
/// # }
/// ```
pub async fn purge(&self) -> Result<PurgeResponse, Error> {
let subject = format!("STREAM.PURGE.{}", self.info.config.name);

let response: Response<PurgeResponse> = self.context.request(subject, &()).await?;
match response {
Response::Err { error } => Err(Box::new(io::Error::new(
ErrorKind::Other,
format!(
"error while purging stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(response) => Ok(response),
}
pub fn purge(&self) -> Purge<No, No> {
Purge::build(self.clone())
}

/// Purge `Stream` messages for a matching subject.
Expand All @@ -550,28 +541,7 @@ impl Stream {
where
T: Into<String>,
{
let request_subject = format!("STREAM.PURGE.{}", self.info.config.name);

let response: Response<PurgeResponse> = self
.context
.request(
request_subject,
&PurgeRequest {
filter: Some(subject.into()),
..Default::default()
},
)
.await?;
match response {
Response::Err { error } => Err(Box::new(io::Error::new(
ErrorKind::Other,
format!(
"error while purging stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(response) => Ok(response),
}
self.purge().filter(subject).await
}

/// Create a new `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
Expand Down Expand Up @@ -1262,3 +1232,119 @@ pub struct External {
#[serde(rename = "deliver", skip_serializing_if = "is_default")]
pub delivery_prefix: Option<String>,
}

use std::marker::PhantomData;

#[derive(Debug, Default)]
pub struct Yes;
#[derive(Debug, Default)]
pub struct No;

pub trait ToAssign: Debug {}

impl ToAssign for Yes {}
impl ToAssign for No {}

#[derive(Debug)]
pub struct Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
stream: Stream,
inner: PurgeRequest,
sequence_set: PhantomData<SEQUENCE>,
keep_set: PhantomData<KEEP>,
}

impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
/// Adds subject filter to [PurgeRequest]
pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
self.inner.filter = Some(filter.into());
self
}
}

impl Purge<No, No> {
pub(crate) fn build(stream: Stream) -> Purge<No, No> {
Purge {
stream,
inner: Default::default(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
}
}
}

impl<KEEP> Purge<No, KEEP>
where
KEEP: ToAssign,
{
/// Creates a new [PurgeRequest].
/// `keep` and `sequence` are exclusive, enforced compile time by generics.
pub fn keep(self, keep: u64) -> Purge<No, Yes> {
Purge {
stream: self.stream,
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
keep: Some(keep),
..self.inner
},
}
}
}
impl<SEQUENCE> Purge<SEQUENCE, No>
where
SEQUENCE: ToAssign,
{
/// Creates a new [PurgeRequest].
/// `keep` and `sequence` are exclusive, enforces compile time by generics.
pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
Purge {
stream: self.stream,
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
sequence: Some(sequence),
..self.inner
},
}
}
}

impl<S, K> IntoFuture for Purge<S, K>
where
S: ToAssign + std::marker::Send,
K: ToAssign + std::marker::Send,
{
type Output = Result<PurgeResponse, Error>;

type IntoFuture = Pin<Box<dyn Future<Output = Result<PurgeResponse, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(std::future::IntoFuture::into_future(async move {
let request_subject = format!("STREAM.PURGE.{}", self.stream.info.config.name);

let response: Response<PurgeResponse> = self
.stream
.context
.request(request_subject, &self.inner)
.await?;
match response {
Response::Err { error } => Err(Box::from(io::Error::new(
ErrorKind::Other,
format!(
"error while purging stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(response) => Ok(response),
}
}))
}
}
28 changes: 28 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,34 @@ mod jetstream {

assert_eq!(stream.info().await.unwrap().state.messages, 3);
}
#[tokio::test]
async fn purge() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

context
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.*".to_string()],
..Default::default()
})
.await
.unwrap();

for _ in 0..100 {
context
.publish("events.two".to_string(), "data".into())
.await
.unwrap();
}
let mut stream = context.get_stream("events").await.unwrap();

stream.purge().sequence(90).await.unwrap();
assert_eq!(stream.info().await.unwrap().state.messages, 11);
stream.purge().keep(5).await.unwrap();
assert_eq!(stream.info().await.unwrap().state.messages, 5);
}

#[tokio::test]
async fn get_or_create_stream() {
Expand Down

0 comments on commit ef0bc38

Please sign in to comment.