Skip to content

Commit

Permalink
Fix Consumer Management names validation and use concrete errors there
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Feb 20, 2024
1 parent dde5b1e commit 8696d42
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 12 deletions.
35 changes: 27 additions & 8 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,26 +798,34 @@ impl Context {
&self,
consumer: C,
stream: S,
) -> Result<Consumer<T>, crate::Error>
) -> Result<Consumer<T>, ConsumerError>
where
T: FromConsumer + IntoConsumerConfig,
S: AsRef<str>,
C: AsRef<str>,
{
if !is_valid_name(stream.as_ref()) {
return Err(ConsumerError::with_source(
ConsumerErrorKind::InvalidName,
"invalid stream",
));
}

if !is_valid_name(consumer.as_ref()) {
return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
}

let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());

let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
Response::Ok(info) => info,
Response::Err { error } => {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!("nats: error while getting consumer info: {}", error),
)))
}
Response::Err { error } => return Err(error.into()),
};

Ok(Consumer::new(
T::try_from_consumer_config(info.config.clone())?,
T::try_from_consumer_config(info.config.clone()).map_err(|err| {
ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
})?,
info,
self.clone(),
))
Expand Down Expand Up @@ -850,6 +858,17 @@ impl Context {
consumer: C,
stream: S,
) -> Result<DeleteStatus, ConsumerError> {
if !is_valid_name(consumer.as_ref()) {
return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
}

if !is_valid_name(stream.as_ref()) {
return Err(ConsumerError::with_source(
ConsumerErrorKind::Other,
"invalid stream name",
));
}

let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());

match self.request(subject, &json!({})).await? {
Expand Down
10 changes: 6 additions & 4 deletions async-nats/src/jetstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ pub fn with_prefix(client: Client, prefix: &str) -> Context {
context::Context::with_prefix(client, prefix)
}

/// Checks if a name passed in JS API is valid one.
/// Checks if a name passed in JetStream API is valid one.
/// The restrictions are there because some fields in the JetStream configs are passed as part of the subject to apply permissions.
/// Examples are stream names, consumer names, etc.
pub(crate) fn is_valid_name(name: &str) -> bool {
!name.is_empty()
&& !name.contains(|c| char::is_ascii_whitespace(&c) || c == '.')
&& name != ">"
&& name != "*"
&& name
.bytes()
.all(|c| !c.is_ascii_whitespace() && c != b'.' && c != b'*' && c != b'>')
}

#[cfg(test)]
Expand All @@ -216,6 +216,8 @@ mod tests {
#[test]
fn test_is_valid_name() {
assert!(is_valid_name("stream"));
assert!(!is_valid_name("str>eam"));
assert!(!is_valid_name("str*eam"));
assert!(!is_valid_name("name.name"));
assert!(!is_valid_name("name name"));
assert!(!is_valid_name(">"));
Expand Down
2 changes: 2 additions & 0 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,7 @@ pub enum ConsumerErrorKind {
TimedOut,
Request,
InvalidConsumerType,
InvalidName,
JetStream(super::errors::Error),
Other,
}
Expand All @@ -1806,6 +1807,7 @@ impl Display for ConsumerErrorKind {
Self::JetStream(err) => write!(f, "JetStream error: {}", err),
Self::Other => write!(f, "consumer error"),
Self::InvalidConsumerType => write!(f, "invalid consumer type"),
Self::InvalidName => write!(f, "invalid consumer name"),
}
}
}
Expand Down

0 comments on commit 8696d42

Please sign in to comment.