From 4e8c52667a2bad90932c8c7f8874873334699454 Mon Sep 17 00:00:00 2001 From: Fletcher Nichol Date: Tue, 13 Aug 2024 17:29:12 -0600 Subject: [PATCH] Fix KV create race after delete/purge This change fixes an issue when using a JetStream K/V store where a user is creating, deleting, and re-creating keys. If the last entry for a key is a `Operation::Delete` or `Operation::Purge`, the initial `self.update()` returns an error, causing the second part of the method to be exercised. Prior to this change, if the entry was deleted or purged a `kv.put()` call is used which ignores the revision of that last entry. A single writer to the K/V store would succeed (as no other writers would write first) so no problem. However, if 2 writers attempt to create a key, then a second writer *could* call the `kv.put()` before the first writer calls `kv.put()`. This means that *both* writers get an `Ok(revision)` and can assume that they won the creation of the key. When using a "distributed lock" pattern (that is many writers race to create a key and the first successful writer wins), this above scenario results in potentially more than one writer who believes they have uniquely acquired the distributed lock. This change replaces the `kv.put()` call to a `kv.update()` call and provides the `revision` from the deleted/purged entry to ensure that no other writer has beaten the caller to this update. This change closes the race period between concurrent writers to between the first update and the second update call with some optimistic write concurrency to detect another writer. It appears as though this strategy is in effect in the Go client code [kv.Create] implementation. [kv.Create]: https://github.com/nats-io/nats.go/blob/278f9f188bca4d7bdee283a0e98ab66b82530c60/jetstream/kv.go#L944-L963 Co-authored-by: John Keiser Signed-off-by: Fletcher Nichol Signed-off-by: Fletcher Nichol --- async-nats/src/jetstream/kv/mod.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index ff99e364a..7f31ca814 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -251,9 +251,10 @@ impl Store { // Deleted or Purged key, we can create it again. Some(Entry { operation: Operation::Delete | Operation::Purge, + revision, .. }) => { - let revision = self.put(key, value).await?; + let revision = self.update(key, value, revision).await?; Ok(revision) } @@ -1250,6 +1251,7 @@ impl From for CreateError { match error.kind() { UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey), UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish), + UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists), UpdateErrorKind::Other => Error::from(CreateErrorKind::Other), } } @@ -1362,6 +1364,7 @@ crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKi pub enum UpdateErrorKind { InvalidKey, TimedOut, + WrongLastRevision, Other, } @@ -1370,6 +1373,7 @@ impl Display for UpdateErrorKind { match self { Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), Self::TimedOut => write!(f, "timed out"), + Self::WrongLastRevision => write!(f, "wrong last revision"), Self::Other => write!(f, "failed getting entry"), } } @@ -1377,7 +1381,17 @@ impl Display for UpdateErrorKind { pub type UpdateError = Error; -crate::from_with_timeout!(UpdateError, UpdateErrorKind, PublishError, PublishErrorKind); +impl From for UpdateError { + fn from(err: PublishError) -> Self { + match err.kind() { + PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut), + PublishErrorKind::WrongLastSequence => { + Self::with_source(UpdateErrorKind::WrongLastRevision, err) + } + _ => Self::with_source(UpdateErrorKind::Other, err), + } + } +} #[derive(Clone, Copy, Debug, PartialEq)] pub enum WatcherErrorKind {