Skip to content

Commit

Permalink
Refactor KV operations
Browse files Browse the repository at this point in the history
Further refactor of KV operations
  • Loading branch information
n1ghtmare committed Jul 25, 2023
1 parent 5f26606 commit 0f30c8b
Showing 1 changed file with 46 additions and 56 deletions.
102 changes: 46 additions & 56 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@

pub mod bucket;

use std::{fmt::Display, task::Poll};
use std::{
fmt::Display,
io::{self, ErrorKind},
str::FromStr,
task::Poll,
};

use crate::{HeaderValue, StatusCode};
use bytes::Bytes;
Expand All @@ -38,22 +43,27 @@ use super::{
},
};

// Helper to extract key value operation from message headers
fn kv_operation_from_maybe_headers(maybe_headers: Option<&str>) -> Operation {
if let Some(headers) = maybe_headers {
return match headers {
KV_OPERATION_DELETE => Operation::Delete,
KV_OPERATION_PURGE => Operation::Purge,
_ => Operation::Put,
};
fn kv_operation_from_stream_message(message: &RawMessage) -> Operation {
match message.headers.as_deref() {
Some(headers) => headers.parse().unwrap_or_default(),
None => Operation::Put,
}

Operation::Put
}

fn kv_operation_from_stream_message(message: &RawMessage) -> Operation {
kv_operation_from_maybe_headers(message.headers.as_deref())
fn kv_operation_from_message(message: &Message) -> Result<Operation, EntryError> {
let headers = message
.headers
.as_ref()
.ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?;

headers
.get(KV_OPERATION)
.map(|x| x.iter().next().unwrap().as_str())
.unwrap_or(KV_OPERATION_PUT)
.parse()
.map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
}

static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap());
static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap());

Expand Down Expand Up @@ -110,16 +120,33 @@ pub struct Config {
}

/// Describes what kind of operation and entry represents
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub enum Operation {
/// A value was put into the bucket
#[default]
Put,
/// A value was deleted from a bucket
Delete,
/// A value was purged from a bucket
Purge,
}

impl FromStr for Operation {
type Err = io::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
KV_OPERATION_DELETE => Ok(Operation::Delete),
KV_OPERATION_PURGE => Ok(Operation::Purge),
KV_OPERATION_PUT => Ok(Operation::Put),
_ => Err(io::Error::new(
ErrorKind::InvalidInput,
format!("Invalid KV operation: {s}"),
)),
}
}
}

/// A struct used as a handle for the bucket.
#[derive(Debug, Clone)]
pub struct Store {
Expand Down Expand Up @@ -258,20 +285,9 @@ impl Store {
let headers = message.headers.as_ref().ok_or_else(|| {
EntryError::with_source(EntryErrorKind::Other, "missing headers")
})?;
let operation = headers.get(KV_OPERATION).map_or_else(
|| Operation::Put,
|operation| match operation
.iter()
.next()
.cloned()
.unwrap_or_else(|| KV_OPERATION_PUT.to_string())
.as_ref()
{
KV_OPERATION_PURGE => Operation::Purge,
KV_OPERATION_DELETE => Operation::Delete,
_ => Operation::Put,
},
);

let operation = kv_operation_from_message(&message).unwrap_or_default();

let sequence = headers
.get(header::NATS_SEQUENCE)
.ok_or_else(|| {
Expand Down Expand Up @@ -860,20 +876,7 @@ impl<'a> futures::Stream for Watch<'a> {
)
})?;

let operation = match message
.headers
.as_ref()
.and_then(|headers| headers.get(KV_OPERATION))
.unwrap_or(&HeaderValue::from(KV_OPERATION_PUT))
.iter()
.next()
.unwrap()
.as_str()
{
KV_OPERATION_DELETE => Operation::Delete,
KV_OPERATION_PURGE => Operation::Purge,
_ => Operation::Put,
};
let operation = kv_operation_from_message(&message).unwrap_or_default();

let key = message
.subject
Expand Down Expand Up @@ -934,20 +937,7 @@ impl<'a> futures::Stream for History<'a> {
self.done = true;
}

let operation = match message
.headers
.as_ref()
.and_then(|headers| headers.get(KV_OPERATION))
.unwrap_or(&HeaderValue::from(KV_OPERATION_PUT))
.iter()
.next()
.unwrap()
.as_str()
{
KV_OPERATION_DELETE => Operation::Delete,
KV_OPERATION_PURGE => Operation::Purge,
_ => Operation::Put,
};
let operation = kv_operation_from_message(&message).unwrap_or_default();

let key = message
.subject
Expand Down

0 comments on commit 0f30c8b

Please sign in to comment.