Skip to content

Commit

Permalink
Adding support for "Never Expire TTL"
Browse files Browse the repository at this point in the history
Adding TTL to Create and Update
  • Loading branch information
stebet committed Jan 8, 2025
1 parent 02e0703 commit df1a288
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 18 deletions.
6 changes: 4 additions & 2 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,25 @@ public interface INatsKVStore
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="ttl">Time to live for the entry (requires the <see cref="NatsKVConfig.AllowMsgTTL"/> to be set to true)</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>The revision number of the entry</returns>
ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
ValueTask<ulong> CreateAsync<T>(string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Update an entry in the bucket only if last update revision matches
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="revision">Last revision number to match</param>
/// <param name="ttl">Time to live for the entry (requires the <see cref="NatsKVConfig.AllowMsgTTL"/> to be set to true)</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>The revision number of the updated entry</returns>
ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Delete an entry from the bucket
Expand Down
23 changes: 16 additions & 7 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ public async ValueTask<ulong> PutAsync<T>(string key, T value, TimeSpan ttl = de
{
ValidateKey(key);

NatsHeaders headers = default;
NatsHeaders? headers = default;
if (ttl != default)
{
headers = new NatsHeaders();
headers.Add(NatsTtl, ttl.TotalSeconds.ToString("N0"));
headers = new NatsHeaders
{
{ NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0") },
};
}

var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, headers: headers, cancellationToken: cancellationToken);
Expand All @@ -88,14 +90,14 @@ public async ValueTask<ulong> PutAsync<T>(string key, T value, TimeSpan ttl = de
}

/// <inheritdoc />
public async ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
public async ValueTask<ulong> CreateAsync<T>(string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);

// First try to create a new entry
try
{
return await UpdateAsync(key, value, revision: 0, serializer, cancellationToken);
return await UpdateAsync(key, value, revision: 0, ttl, serializer, cancellationToken);
}
catch (NatsKVWrongLastRevisionException)
{
Expand All @@ -108,17 +110,24 @@ public async ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize
}
catch (NatsKVKeyDeletedException e)
{
return await UpdateAsync(key, value, e.Revision, serializer, cancellationToken);
return await UpdateAsync(key, value, e.Revision, ttl, serializer, cancellationToken);
}

throw new NatsKVCreateException();
}

/// <inheritdoc />
public async ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
public async ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
var headers = new NatsHeaders { { NatsExpectedLastSubjectSequence, revision.ToString() } };
if (ttl != default)
{
headers = new NatsHeaders
{
{ NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0") },
};
}

try
{
Expand Down
45 changes: 36 additions & 9 deletions tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -465,13 +465,12 @@ public async Task TestMessageTTL()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJS();
await using var server = NatsServer.StartJSWithTrace(_output);
await using var nats = server.CreateClientConnection();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

await kv.DeleteStoreAsync("kv1");
var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true }, cancellationToken: cancellationToken);

for (var i = 0; i < 10; i++)
Expand All @@ -484,13 +483,6 @@ public async Task TestMessageTTL()
Assert.Equal(1ul, state.Info.State.FirstSeq);
Assert.Equal(10ul, state.Info.State.LastSeq);

// Sleep for half a second, all the messages should still be there
await Task.Delay(500);
state = await store.GetStatusAsync();
Assert.Equal(10, state.Info.State.Messages);
Assert.Equal(1ul, state.Info.State.FirstSeq);
Assert.Equal(10ul, state.Info.State.LastSeq);

// Sleep for two seconds, now all the messages should be gone
await Task.Delay(2000);
state = await store.GetStatusAsync();
Expand All @@ -499,6 +491,41 @@ public async Task TestMessageTTL()
Assert.Equal(10ul, state.Info.State.LastSeq);
}

[Fact]
public async Task TestMessageNeverExpire()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJSWithTrace(_output);
await using var nats = server.CreateClientConnection();

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / check

Code should not contain trailing whitespace

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 502 in tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true, MaxAge = TimeSpan.FromSeconds(1) }, cancellationToken: cancellationToken);

// The first message we publish is set to "never expire", therefore it won't age out with the MaxAge policy.
await store.PutAsync($"k0", $"v0", TimeSpan.MaxValue, cancellationToken: cancellationToken);

for (var i = 1; i < 11; i++)
{
await store.PutAsync($"k{i}", $"v{i}", TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
}

var state = await store.GetStatusAsync();
Assert.Equal(11, state.Info.State.Messages);
Assert.Equal(1ul, state.Info.State.FirstSeq);
Assert.Equal(11ul, state.Info.State.LastSeq);

// Sleep for two seconds, only the first message should be there
await Task.Delay(2000);
state = await store.GetStatusAsync();
Assert.Equal(1, state.Info.State.Messages);
Assert.Equal(1ul, state.Info.State.FirstSeq);
Assert.Equal(11ul, state.Info.State.LastSeq);
}

[Fact]
public async Task History()
{
Expand Down

0 comments on commit df1a288

Please sign in to comment.