Skip to content

Commit

Permalink
Added Additional Try Commands to KVStore
Browse files Browse the repository at this point in the history
  • Loading branch information
darkwatchuk committed Jan 27, 2025
1 parent a297cb4 commit c299fb1
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 15 deletions.
26 changes: 26 additions & 0 deletions src/NATS.Client.Core/NatsResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@

namespace NATS.Client.Core;

public readonly struct NatsResult
{
private readonly Exception? _error;

public NatsResult()
{
_error = null;
}

public NatsResult(Exception error)
{
_error = error;
}

public Exception Error => _error ?? ThrowErrorIsNotSetException();

public bool Success => _error == null;

public static implicit operator NatsResult(Exception error) => new(error);

private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");

[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message);
}

public readonly struct NatsResult<T>
{
private readonly T? _value;
Expand Down
53 changes: 53 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public interface INatsKVStore
/// <returns>The revision number of the entry</returns>
ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Tries to create a new entry in the bucket only if it doesn't exist
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>A NatsResult object representing the revision number of the created entry or an error.</returns>
/// <remarks>
/// Use this method to avoid exceptions
/// </remarks>
ValueTask<NatsResult<ulong>> TryCreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Update an entry in the bucket only if last update revision matches
/// </summary>
Expand All @@ -49,6 +63,21 @@ public interface INatsKVStore
/// <returns>The revision number of the updated entry</returns>
ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Tries to update an entry in the bucket only if last update revision matches
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="revision">Last revision number to match</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>A NatsResult object representing the revision number of the updated entry or an error.</returns>
/// <remarks>
/// Use this method to avoid exceptions
/// </remarks>
ValueTask<NatsResult<ulong>> TryUpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Delete an entry from the bucket
/// </summary>
Expand All @@ -57,6 +86,18 @@ public interface INatsKVStore
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Delete an entry from the bucket
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="opts">Delete options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>A NatsResult object representing success or an error.</returns>
/// <remarks>
/// Use this method to avoid exceptions
/// </remarks>
ValueTask<NatsResult> TryDeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Purge an entry from the bucket
/// </summary>
Expand All @@ -65,6 +106,18 @@ public interface INatsKVStore
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Tries to purge an entry from the bucket
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="opts">Delete options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>A NatsResult object representing success or an error.</returns>
/// <remarks>
/// Use this method to avoid exceptions
/// </remarks>
ValueTask<NatsResult> TryPurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get an entry from the bucket using the key
/// </summary>
Expand Down
111 changes: 96 additions & 15 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,96 @@ public async ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize
throw new NatsKVCreateException();
}

/// <inheritdoc />
public async ValueTask<NatsResult<ulong>> TryCreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);

// First try to create a new entry
var resultUpdate = await TryUpdateAsync(key, value, revision: 0, serializer, cancellationToken);
if (resultUpdate.Success)
{
return resultUpdate;
}

// If that fails, try to read an existing entry, this will fail if deleted.
var resultReadExisting = await TryGetEntryAsync<T>(key, cancellationToken: cancellationToken);

// If we succeed here, then we've just been returned an entry, so we can't create a new one
if (resultReadExisting.Success)
{
return new NatsKVCreateException();
}
else if (resultReadExisting.Error is NatsKVKeyDeletedException deletedException)
{
// If our previous call errored because the last entry is deleted, then that's ok, we update with the deleted revision
return await TryUpdateAsync(key, value, deletedException.Revision, serializer, cancellationToken);
}
else
{
return resultReadExisting.Error;
}
}

/// <inheritdoc />
public async ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
var result = await TryUpdateAsync(key, value, revision, serializer, cancellationToken);
if (!result.Success)
{
ThrowException(result.Error);
}

return result.Value;
}

/// <inheritdoc />
public async ValueTask<NatsResult<ulong>> TryUpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
var headers = new NatsHeaders { { NatsExpectedLastSubjectSequence, revision.ToString() } };

try
{
var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken);
ack.EnsureSuccess();

return ack.Seq;
}
catch (NatsJSApiException e)
{
if (e.Error is { ErrCode: 10071, Code: 400, Description: not null } && e.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase))
if (ack == null)
{
throw new NatsKVWrongLastRevisionException();
return new ArgumentNullException(nameof(ack));
}
else if (ack.Error is { ErrCode: 10071, Code: 400, Description: not null } && ack.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase))
{
return new NatsKVWrongLastRevisionException();
}
else if (ack.Error != null)
{
return new NatsJSApiException(ack.Error);
}
else if (ack.Duplicate)
{
return new NatsJSDuplicateMessageException(ack.Seq);
}

throw;
return ack.Seq;
}
catch (Exception ex)
{
return ex;
}
}

/// <inheritdoc />
public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default)
{
var result = await TryDeleteAsync(key, opts, cancellationToken);
if (!result.Success)
{
ThrowException(result.Error);
}
}

/// <inheritdoc />
public async ValueTask<NatsResult> TryDeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
opts ??= new NatsKVDeleteOpts();
Expand All @@ -157,23 +221,40 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default,
try
{
var ack = await JetStreamContext.PublishAsync<object?>(subject, null, headers: headers, cancellationToken: cancellationToken);
ack.EnsureSuccess();
}
catch (NatsJSApiException e)
{
if (e.Error is { ErrCode: 10071, Code: 400, Description: not null } && e.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase))

if (ack == null)
{
return new ArgumentNullException(nameof(ack));
}
else if (ack.Error is { ErrCode: 10071, Code: 400, Description: not null } && ack.Error.Description.StartsWith("wrong last sequence", StringComparison.OrdinalIgnoreCase))
{
throw new NatsKVWrongLastRevisionException();
return new NatsKVWrongLastRevisionException();
}
else if (ack.Error != null)
{
return new NatsJSApiException(ack.Error);
}
else if (ack.Duplicate)
{
return new NatsJSDuplicateMessageException(ack.Seq);
}

throw;
return new NatsResult();
}
catch (Exception ex)
{
return ex;
}
}

/// <inheritdoc />
public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) =>
DeleteAsync(key, (opts ?? new NatsKVDeleteOpts()) with { Purge = true }, cancellationToken);

/// <inheritdoc />
public ValueTask<NatsResult> TryPurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) =>
TryDeleteAsync(key, (opts ?? new NatsKVDeleteOpts()) with { Purge = true }, cancellationToken);

/// <inheritdoc />
public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
Expand Down
69 changes: 69 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -695,4 +695,73 @@ await Retry.Until(
Assert.Equal("a_fromStore1", entryA.Value);
Assert.Equal("b_fromStore2", entryB.Value);
}

[Fact]
public async Task Try_Create()
{
await using var server = await NatsServer.StartJSAsync();
await using var nats = await server.CreateClientConnectionAsync();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var store = await kv.CreateStoreAsync("b1");

NatsResult<ulong> result = default;

result = await store.TryCreateAsync("k1", "v1");
Assert.True(result.Success);

result = await store.TryCreateAsync("k1", "v2");
Assert.False(result.Success);

var deleteResult = await store.TryDeleteAsync("k1");
Assert.True(deleteResult.Success);

result = await store.TryCreateAsync("k1", "v3");
Assert.True(result.Success);

var finalValue = await store.TryGetEntryAsync<string>("k1");
Assert.True(finalValue.Success && finalValue.Value.Value == "v3");
}

[Fact]
public async Task Try_Delete()
{
await using var server = await NatsServer.StartJSAsync();
await using var nats = await server.CreateClientConnectionAsync();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var store = await kv.CreateStoreAsync("b1");

var entry = await store.PutAsync("k1", "v1");

var updateResultFail = await store.TryUpdateAsync("k1", "v2", revision: entry + 1);
Assert.False(updateResultFail.Success);

var updateResultSuccess = await store.TryUpdateAsync("k1", "v2", revision: entry);
Assert.True(updateResultSuccess.Success);
}

[Fact]
public async Task Try_Update()
{
await using var server = await NatsServer.StartJSAsync();
await using var nats = await server.CreateClientConnectionAsync();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var store = await kv.CreateStoreAsync("b1");

var entry = await store.PutAsync("k1", "v1");

var deleteResultFail = await store.TryDeleteAsync("k1", new NatsKVDeleteOpts { Revision = entry + 1 });
Assert.False(deleteResultFail.Success);

var deleteResultSuccess = await store.TryDeleteAsync("k1", new NatsKVDeleteOpts { Revision = entry });
Assert.True(deleteResultSuccess.Success);
}
}

0 comments on commit c299fb1

Please sign in to comment.