From df1a288d7791d5f97ea20c0f6313fe6105c95b19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Wed, 8 Jan 2025 09:55:16 +0000 Subject: [PATCH] Adding support for "Never Expire TTL" Adding TTL to Create and Update --- src/NATS.Client.KeyValueStore/INatsKVStore.cs | 6 ++- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 23 +++++++--- .../KeyValueStoreTest.cs | 45 +++++++++++++++---- 3 files changed, 56 insertions(+), 18 deletions(-) diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs index a56c3ea30..8972d76ad 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -32,11 +32,12 @@ public interface INatsKVStore /// /// Key of the entry /// Value of the entry + /// Time to live for the entry (requires the to be set to true) /// Serializer to use for the message type. /// A used to cancel the API call. /// Serialized value type /// The revision number of the entry - ValueTask CreateAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default); + ValueTask CreateAsync(string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default); /// /// Update an entry in the bucket only if last update revision matches @@ -44,11 +45,12 @@ public interface INatsKVStore /// Key of the entry /// Value of the entry /// Last revision number to match + /// Time to live for the entry (requires the to be set to true) /// Serializer to use for the message type. /// A used to cancel the API call. /// Serialized value type /// The revision number of the updated entry - ValueTask UpdateAsync(string key, T value, ulong revision, INatsSerialize? serializer = default, CancellationToken cancellationToken = default); + ValueTask UpdateAsync(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default); /// /// Delete an entry from the bucket diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index bfe084c4c..e1e04c68e 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -75,11 +75,13 @@ public async ValueTask PutAsync(string key, T value, TimeSpan ttl = de { ValidateKey(key); - NatsHeaders headers = default; + NatsHeaders? headers = default; if (ttl != default) { - headers = new NatsHeaders(); - headers.Add(NatsTtl, ttl.TotalSeconds.ToString("N0")); + headers = new NatsHeaders + { + { NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0") }, + }; } var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, headers: headers, cancellationToken: cancellationToken); @@ -88,14 +90,14 @@ public async ValueTask PutAsync(string key, T value, TimeSpan ttl = de } /// - public async ValueTask CreateAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) + public async ValueTask CreateAsync(string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); // First try to create a new entry try { - return await UpdateAsync(key, value, revision: 0, serializer, cancellationToken); + return await UpdateAsync(key, value, revision: 0, ttl, serializer, cancellationToken); } catch (NatsKVWrongLastRevisionException) { @@ -108,17 +110,24 @@ public async ValueTask CreateAsync(string key, T value, INatsSerialize } catch (NatsKVKeyDeletedException e) { - return await UpdateAsync(key, value, e.Revision, serializer, cancellationToken); + return await UpdateAsync(key, value, e.Revision, ttl, serializer, cancellationToken); } throw new NatsKVCreateException(); } /// - public async ValueTask UpdateAsync(string key, T value, ulong revision, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) + public async ValueTask UpdateAsync(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); var headers = new NatsHeaders { { NatsExpectedLastSubjectSequence, revision.ToString() } }; + if (ttl != default) + { + headers = new NatsHeaders + { + { NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0") }, + }; + } try { diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index cffb9f488..87c88de33 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -465,13 +465,12 @@ public async Task TestMessageTTL() var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; - await using var server = NatsServer.StartJS(); + await using var server = NatsServer.StartJSWithTrace(_output); await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); - await kv.DeleteStoreAsync("kv1"); var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true }, cancellationToken: cancellationToken); for (var i = 0; i < 10; i++) @@ -484,13 +483,6 @@ public async Task TestMessageTTL() Assert.Equal(1ul, state.Info.State.FirstSeq); Assert.Equal(10ul, state.Info.State.LastSeq); - // Sleep for half a second, all the messages should still be there - await Task.Delay(500); - state = await store.GetStatusAsync(); - Assert.Equal(10, state.Info.State.Messages); - Assert.Equal(1ul, state.Info.State.FirstSeq); - Assert.Equal(10ul, state.Info.State.LastSeq); - // Sleep for two seconds, now all the messages should be gone await Task.Delay(2000); state = await store.GetStatusAsync(); @@ -499,6 +491,41 @@ public async Task TestMessageTTL() Assert.Equal(10ul, state.Info.State.LastSeq); } + [Fact] + public async Task TestMessageNeverExpire() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var server = NatsServer.StartJSWithTrace(_output); + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true, MaxAge = TimeSpan.FromSeconds(1) }, cancellationToken: cancellationToken); + + // The first message we publish is set to "never expire", therefore it won't age out with the MaxAge policy. + await store.PutAsync($"k0", $"v0", TimeSpan.MaxValue, cancellationToken: cancellationToken); + + for (var i = 1; i < 11; i++) + { + await store.PutAsync($"k{i}", $"v{i}", TimeSpan.FromSeconds(1), cancellationToken: cancellationToken); + } + + var state = await store.GetStatusAsync(); + Assert.Equal(11, state.Info.State.Messages); + Assert.Equal(1ul, state.Info.State.FirstSeq); + Assert.Equal(11ul, state.Info.State.LastSeq); + + // Sleep for two seconds, only the first message should be there + await Task.Delay(2000); + state = await store.GetStatusAsync(); + Assert.Equal(1, state.Info.State.Messages); + Assert.Equal(1ul, state.Info.State.FirstSeq); + Assert.Equal(11ul, state.Info.State.LastSeq); + } + [Fact] public async Task History() {