From c299fb113a38c86ea5c77423e74f3eced31cea0b Mon Sep 17 00:00:00 2001 From: Matt Funnell <darkwatch@hotmail.co.uk> Date: Mon, 27 Jan 2025 11:52:22 +0000 Subject: [PATCH] Added Additional Try Commands to KVStore --- src/NATS.Client.Core/NatsResult.cs | 26 ++++ src/NATS.Client.KeyValueStore/INatsKVStore.cs | 53 +++++++++ src/NATS.Client.KeyValueStore/NatsKVStore.cs | 111 +++++++++++++++--- .../KeyValueStoreTest.cs | 69 +++++++++++ 4 files changed, 244 insertions(+), 15 deletions(-) diff --git a/src/NATS.Client.Core/NatsResult.cs b/src/NATS.Client.Core/NatsResult.cs index 3b246aa6b..d9180fe7d 100644 --- a/src/NATS.Client.Core/NatsResult.cs +++ b/src/NATS.Client.Core/NatsResult.cs @@ -2,6 +2,32 @@ namespace NATS.Client.Core; +public readonly struct NatsResult +{ + private readonly Exception? _error; + + public NatsResult() + { + _error = null; + } + + public NatsResult(Exception error) + { + _error = error; + } + + public Exception Error => _error ?? ThrowErrorIsNotSetException(); + + public bool Success => _error == null; + + public static implicit operator NatsResult(Exception error) => new(error); + + private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set"); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message); +} + public readonly struct NatsResult<T> { private readonly T? _value; diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs index ecbf0d072..df5d77496 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -37,6 +37,20 @@ public interface INatsKVStore /// <returns>The revision number of the entry</returns> ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default); + /// <summary> + /// Tries to create a new entry in the bucket only if it doesn't exist + /// </summary> + /// <param name="key">Key of the entry</param> + /// <param name="value">Value of the entry</param> + /// <param name="serializer">Serializer to use for the message type.</param> + /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param> + /// <typeparam name="T">Serialized value type</typeparam> + /// <returns>A NatsResult object representing the revision number of the created entry or an error.</returns> + /// <remarks> + /// Use this method to avoid exceptions + /// </remarks> + ValueTask<NatsResult<ulong>> TryCreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default); + /// <summary> /// Update an entry in the bucket only if last update revision matches /// </summary> @@ -49,6 +63,21 @@ public interface INatsKVStore /// <returns>The revision number of the updated entry</returns> ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default); + /// <summary> + /// Tries to update an entry in the bucket only if last update revision matches + /// </summary> + /// <param name="key">Key of the entry</param> + /// <param name="value">Value of the entry</param> + /// <param name="revision">Last revision number to match</param> + /// <param name="serializer">Serializer to use for the message type.</param> + /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param> + /// <typeparam name="T">Serialized value type</typeparam> + /// <returns>A NatsResult object representing the revision number of the updated entry or an error.</returns> + /// <remarks> + /// Use this method to avoid exceptions + /// </remarks> + ValueTask<NatsResult<ulong>> TryUpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default); + /// <summary> /// Delete an entry from the bucket /// </summary> @@ -57,6 +86,18 @@ public interface INatsKVStore /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param> ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default); + /// <summary> + /// Delete an entry from the bucket + /// </summary> + /// <param name="key">Key of the entry</param> + /// <param name="opts">Delete options</param> + /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param> + /// <returns>A NatsResult object representing success or an error.</returns> + /// <remarks> + /// Use this method to avoid exceptions + /// </remarks> + ValueTask<NatsResult> TryDeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default); + /// <summary> /// Purge an entry from the bucket /// </summary> @@ -65,6 +106,18 @@ public interface INatsKVStore /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param> ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default); + /// <summary> + /// Tries to purge an entry from the bucket + /// </summary> + /// <param name="key">Key of the entry</param> + /// <param name="opts">Delete options</param> + /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param> + /// <returns>A NatsResult object representing success or an error.</returns> + /// <remarks> + /// Use this method to avoid exceptions + /// </remarks> + ValueTask<NatsResult> TryPurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default); + /// <summary> /// Get an entry from the bucket using the key /// </summary> diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index cd2717fc3..db3dc9135 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -105,8 +105,51 @@ public async ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize throw new NatsKVCreateException(); } + /// <inheritdoc /> + public async ValueTask<NatsResult<ulong>> TryCreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default) + { + ValidateKey(key); + + // First try to create a new entry + var resultUpdate = await TryUpdateAsync(key, value, revision: 0, serializer, cancellationToken); + if (resultUpdate.Success) + { + return resultUpdate; + } + + // If that fails, try to read an existing entry, this will fail if deleted. + var resultReadExisting = await TryGetEntryAsync<T>(key, cancellationToken: cancellationToken); + + // If we succeed here, then we've just been returned an entry, so we can't create a new one + if (resultReadExisting.Success) + { + return new NatsKVCreateException(); + } + else if (resultReadExisting.Error is NatsKVKeyDeletedException deletedException) + { + // If our previous call errored because the last entry is deleted, then that's ok, we update with the deleted revision + return await TryUpdateAsync(key, value, deletedException.Revision, serializer, cancellationToken); + } + else + { + return resultReadExisting.Error; + } + } + /// <inheritdoc /> public async ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default) + { + var result = await TryUpdateAsync(key, value, revision, serializer, cancellationToken); + if (!result.Success) + { + ThrowException(result.Error); + } + + return result.Value; + } + + /// <inheritdoc /> + public async ValueTask<NatsResult<ulong>> TryUpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); var headers = new NatsHeaders { { NatsExpectedLastSubjectSequence, revision.ToString() } }; @@ -114,23 +157,44 @@ public async ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision try { var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken); - ack.EnsureSuccess(); - return ack.Seq; - } - catch (NatsJSApiException e) - { - if (e.Error is { ErrCode: 10071, Code: 400, Description: not null } && e.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase)) + if (ack == null) { - throw new NatsKVWrongLastRevisionException(); + return new ArgumentNullException(nameof(ack)); + } + else if (ack.Error is { ErrCode: 10071, Code: 400, Description: not null } && ack.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase)) + { + return new NatsKVWrongLastRevisionException(); + } + else if (ack.Error != null) + { + return new NatsJSApiException(ack.Error); + } + else if (ack.Duplicate) + { + return new NatsJSDuplicateMessageException(ack.Seq); } - throw; + return ack.Seq; + } + catch (Exception ex) + { + return ex; } } /// <inheritdoc /> public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) + { + var result = await TryDeleteAsync(key, opts, cancellationToken); + if (!result.Success) + { + ThrowException(result.Error); + } + } + + /// <inheritdoc /> + public async ValueTask<NatsResult> TryDeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) { ValidateKey(key); opts ??= new NatsKVDeleteOpts(); @@ -157,16 +221,29 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, try { var ack = await JetStreamContext.PublishAsync<object?>(subject, null, headers: headers, cancellationToken: cancellationToken); - ack.EnsureSuccess(); - } - catch (NatsJSApiException e) - { - if (e.Error is { ErrCode: 10071, Code: 400, Description: not null } && e.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase)) + + if (ack == null) + { + return new ArgumentNullException(nameof(ack)); + } + else if (ack.Error is { ErrCode: 10071, Code: 400, Description: not null } && ack.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase)) { - throw new NatsKVWrongLastRevisionException(); + return new NatsKVWrongLastRevisionException(); + } + else if (ack.Error != null) + { + return new NatsJSApiException(ack.Error); + } + else if (ack.Duplicate) + { + return new NatsJSDuplicateMessageException(ack.Seq); } - throw; + return new NatsResult(); + } + catch (Exception ex) + { + return ex; } } @@ -174,6 +251,10 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) => DeleteAsync(key, (opts ?? new NatsKVDeleteOpts()) with { Purge = true }, cancellationToken); + /// <inheritdoc /> + public ValueTask<NatsResult> TryPurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) => + TryDeleteAsync(key, (opts ?? new NatsKVDeleteOpts()) with { Purge = true }, cancellationToken); + /// <inheritdoc /> public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default) { diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index e638ef028..6de246fab 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -695,4 +695,73 @@ await Retry.Until( Assert.Equal("a_fromStore1", entryA.Value); Assert.Equal("b_fromStore2", entryB.Value); } + + [Fact] + public async Task Try_Create() + { + await using var server = await NatsServer.StartJSAsync(); + await using var nats = await server.CreateClientConnectionAsync(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var store = await kv.CreateStoreAsync("b1"); + + NatsResult<ulong> result = default; + + result = await store.TryCreateAsync("k1", "v1"); + Assert.True(result.Success); + + result = await store.TryCreateAsync("k1", "v2"); + Assert.False(result.Success); + + var deleteResult = await store.TryDeleteAsync("k1"); + Assert.True(deleteResult.Success); + + result = await store.TryCreateAsync("k1", "v3"); + Assert.True(result.Success); + + var finalValue = await store.TryGetEntryAsync<string>("k1"); + Assert.True(finalValue.Success && finalValue.Value.Value == "v3"); + } + + [Fact] + public async Task Try_Delete() + { + await using var server = await NatsServer.StartJSAsync(); + await using var nats = await server.CreateClientConnectionAsync(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var store = await kv.CreateStoreAsync("b1"); + + var entry = await store.PutAsync("k1", "v1"); + + var updateResultFail = await store.TryUpdateAsync("k1", "v2", revision: entry + 1); + Assert.False(updateResultFail.Success); + + var updateResultSuccess = await store.TryUpdateAsync("k1", "v2", revision: entry); + Assert.True(updateResultSuccess.Success); + } + + [Fact] + public async Task Try_Update() + { + await using var server = await NatsServer.StartJSAsync(); + await using var nats = await server.CreateClientConnectionAsync(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var store = await kv.CreateStoreAsync("b1"); + + var entry = await store.PutAsync("k1", "v1"); + + var deleteResultFail = await store.TryDeleteAsync("k1", new NatsKVDeleteOpts { Revision = entry + 1 }); + Assert.False(deleteResultFail.Success); + + var deleteResultSuccess = await store.TryDeleteAsync("k1", new NatsKVDeleteOpts { Revision = entry }); + Assert.True(deleteResultSuccess.Success); + } }