diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index ff99e364a..7f31ca814 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -251,9 +251,10 @@ impl Store { // Deleted or Purged key, we can create it again. Some(Entry { operation: Operation::Delete | Operation::Purge, + revision, .. }) => { - let revision = self.put(key, value).await?; + let revision = self.update(key, value, revision).await?; Ok(revision) } @@ -1250,6 +1251,7 @@ impl From for CreateError { match error.kind() { UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey), UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish), + UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists), UpdateErrorKind::Other => Error::from(CreateErrorKind::Other), } } @@ -1362,6 +1364,7 @@ crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKi pub enum UpdateErrorKind { InvalidKey, TimedOut, + WrongLastRevision, Other, } @@ -1370,6 +1373,7 @@ impl Display for UpdateErrorKind { match self { Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), Self::TimedOut => write!(f, "timed out"), + Self::WrongLastRevision => write!(f, "wrong last revision"), Self::Other => write!(f, "failed getting entry"), } } @@ -1377,7 +1381,17 @@ impl Display for UpdateErrorKind { pub type UpdateError = Error; -crate::from_with_timeout!(UpdateError, UpdateErrorKind, PublishError, PublishErrorKind); +impl From for UpdateError { + fn from(err: PublishError) -> Self { + match err.kind() { + PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut), + PublishErrorKind::WrongLastSequence => { + Self::with_source(UpdateErrorKind::WrongLastRevision, err) + } + _ => Self::with_source(UpdateErrorKind::Other, err), + } + } +} #[derive(Clone, Copy, Debug, PartialEq)] pub enum WatcherErrorKind {