diff --git a/NATS.Net.sln b/NATS.Net.sln
index 9c78cc340..c6846b35b 100644
--- a/NATS.Net.sln
+++ b/NATS.Net.sln
@@ -17,23 +17,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NATS.Client.Core.Tests", "t
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MicroBenchmark", "sandbox\MicroBenchmark\MicroBenchmark.csproj", "{A10F0D89-13F3-49B3-ACF7-66E45DC95225}"
EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".", ".", "{899BE3EA-C5CA-4394-9B62-C45CBFF3AF4E}"
- ProjectSection(SolutionItems) = preProject
- .editorconfig = .editorconfig
- Directory.Build.props = Directory.Build.props
- README.md = README.md
- version.txt = version.txt
- .gitattributes = .gitattributes
- .gitignore = .gitignore
- CODE-OF-CONDUCT.md = CODE-OF-CONDUCT.md
- CONTRIBUTING.md = CONTRIBUTING.md
- dependencies.md = dependencies.md
- global.json = global.json
- Icon.png = Icon.png
- LICENSE = LICENSE
- REPO_RENAME.md = REPO_RENAME.md
- EndProjectSection
-EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NATS.Client.Hosting", "src\NATS.Client.Hosting\NATS.Client.Hosting.csproj", "{D3F09B30-1ED5-48C2-81CD-A2AD88E751AC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimumWebApp", "sandbox\MinimumWebApp\MinimumWebApp.csproj", "{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}"
@@ -139,6 +122,23 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{
.github\workflows\test.yml = .github\workflows\test.yml
EndProjectSection
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{1D1B89B7-5963-48AE-B5F8-BB9AA4834CD6}"
+ ProjectSection(SolutionItems) = preProject
+ .editorconfig = .editorconfig
+ .gitattributes = .gitattributes
+ .gitignore = .gitignore
+ CODE-OF-CONDUCT.md = CODE-OF-CONDUCT.md
+ CONTRIBUTING.md = CONTRIBUTING.md
+ dependencies.md = dependencies.md
+ Directory.Build.props = Directory.Build.props
+ global.json = global.json
+ Icon.png = Icon.png
+ LICENSE = LICENSE
+ README.md = README.md
+ REPO_RENAME.md = REPO_RENAME.md
+ version.txt = version.txt
+ EndProjectSection
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -396,7 +396,6 @@ Global
{227C88B1-0510-4010-B142-C44725578FCD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{8A676AAA-FEE3-4C18-870A-66E59AD9069F} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{9521D9E0-642A-4C7E-BD10-372DF235CF62} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
- {B9EF0111-6720-46DF-B11A-8F8C88C3F5C1} = {899BE3EA-C5CA-4394-9B62-C45CBFF3AF4E}
{0B7F1286-4426-45DE-82C2-FE7CF13CA0DA} = {B9EF0111-6720-46DF-B11A-8F8C88C3F5C1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
diff --git a/src/NATS.Client.JetStream/Models/StreamConfig.cs b/src/NATS.Client.JetStream/Models/StreamConfig.cs
index fdacdaf30..ada79c421 100644
--- a/src/NATS.Client.JetStream/Models/StreamConfig.cs
+++ b/src/NATS.Client.JetStream/Models/StreamConfig.cs
@@ -239,6 +239,13 @@ internal StreamConfig()
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public bool DiscardNewPerSubject { get; set; }
+ ///
+ /// AllowMsgTTL allows header initiated per-message TTLs. If disabled, then the `NATS-TTL` header will be ignored.
+ ///
+ [System.Text.Json.Serialization.JsonPropertyName("allow_msg_ttl")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
+ public bool AllowMsgTTL { get; set; }
+
///
/// Additional metadata for the Stream
///
diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs
index ecbf0d072..8972d76ad 100644
--- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs
+++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs
@@ -20,22 +20,24 @@ public interface INatsKVStore
///
/// Key of the entry
/// Value of the entry
+ /// Time to live for the entry (requires the to be set to true)
/// Serializer to use for the message type.
/// A used to cancel the API call.
/// Serialized value type
/// Revision number
- ValueTask PutAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default);
+ ValueTask PutAsync(string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default);
///
/// Create a new entry in the bucket only if it doesn't exist
///
/// Key of the entry
/// Value of the entry
+ /// Time to live for the entry (requires the to be set to true)
/// Serializer to use for the message type.
/// A used to cancel the API call.
/// Serialized value type
/// The revision number of the entry
- ValueTask CreateAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default);
+ ValueTask CreateAsync(string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default);
///
/// Update an entry in the bucket only if last update revision matches
@@ -43,11 +45,12 @@ public interface INatsKVStore
/// Key of the entry
/// Value of the entry
/// Last revision number to match
+ /// Time to live for the entry (requires the to be set to true)
/// Serializer to use for the message type.
/// A used to cancel the API call.
/// Serialized value type
/// The revision number of the updated entry
- ValueTask UpdateAsync(string key, T value, ulong revision, INatsSerialize? serializer = default, CancellationToken cancellationToken = default);
+ ValueTask UpdateAsync(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default);
///
/// Delete an entry from the bucket
diff --git a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs
index ac4432819..3240273cd 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs
@@ -77,6 +77,11 @@ public record NatsKVConfig
/// Sources defines the configuration for sources of a KeyValue store.
///
public ICollection? Sources { get; set; }
+
+ ///
+ /// If true, the bucket will allow TTL on individual keys.
+ ///
+ public bool AllowMsgTTL { get; set; }
}
///
diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs
index 9cbbdbc6b..c03ac2087 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs
@@ -234,6 +234,7 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config)
MirrorDirect = mirrorDirect,
Sources = sources,
Retention = StreamConfigRetention.Limits, // from ADR-8
+ AllowMsgTTL = config.AllowMsgTTL,
};
return streamConfig;
diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs
index cd2717fc3..902ff7bcc 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs
@@ -43,6 +43,7 @@ public class NatsKVStore : INatsKVStore
private const string NatsSubject = "Nats-Subject";
private const string NatsSequence = "Nats-Sequence";
private const string NatsTimeStamp = "Nats-Time-Stamp";
+ private const string NatsTtl = "Nats-TTL";
private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
private static readonly NatsKVException MissingSequenceHeaderException = new("Missing sequence header");
private static readonly NatsKVException MissingTimestampHeaderException = new("Missing timestamp header");
@@ -70,23 +71,33 @@ internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream
public string Bucket { get; }
///
- public async ValueTask PutAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default)
+ public async ValueTask PutAsync(string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
- var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken);
+
+ NatsHeaders? headers = default;
+ if (ttl != default)
+ {
+ headers = new NatsHeaders
+ {
+ { NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0") },
+ };
+ }
+
+ var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, headers: headers, cancellationToken: cancellationToken);
ack.EnsureSuccess();
return ack.Seq;
}
///
- public async ValueTask CreateAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default)
+ public async ValueTask CreateAsync(string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
// First try to create a new entry
try
{
- return await UpdateAsync(key, value, revision: 0, serializer, cancellationToken);
+ return await UpdateAsync(key, value, revision: 0, ttl, serializer, cancellationToken);
}
catch (NatsKVWrongLastRevisionException)
{
@@ -99,17 +110,21 @@ public async ValueTask CreateAsync(string key, T value, INatsSerialize
}
catch (NatsKVKeyDeletedException e)
{
- return await UpdateAsync(key, value, e.Revision, serializer, cancellationToken);
+ return await UpdateAsync(key, value, e.Revision, ttl, serializer, cancellationToken);
}
throw new NatsKVCreateException();
}
///
- public async ValueTask UpdateAsync(string key, T value, ulong revision, INatsSerialize? serializer = default, CancellationToken cancellationToken = default)
+ public async ValueTask UpdateAsync(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
var headers = new NatsHeaders { { NatsExpectedLastSubjectSequence, revision.ToString() } };
+ if (ttl != default)
+ {
+ headers.Add(NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0"));
+ }
try
{
diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
index 8504f04ac..8b6387774 100644
--- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
+++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
@@ -459,6 +459,73 @@ await Assert.ThrowsAsync(async () =>
}
}
+ [Fact]
+ public async Task TestMessageTTL()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+
+ var js = new NatsJSContext(nats);
+ var kv = new NatsKVContext(js);
+
+ var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true }, cancellationToken: cancellationToken);
+
+ for (var i = 0; i < 10; i++)
+ {
+ await store.PutAsync($"k{i}", $"v{i}", TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
+ }
+
+ var state = await store.GetStatusAsync();
+ Assert.Equal(10, state.Info.State.Messages);
+ Assert.Equal(1ul, state.Info.State.FirstSeq);
+ Assert.Equal(10ul, state.Info.State.LastSeq);
+
+ // Sleep for two seconds, now all the messages should be gone
+ await Task.Delay(2000);
+ state = await store.GetStatusAsync();
+ Assert.Equal(0, state.Info.State.Messages);
+ Assert.Equal(11ul, state.Info.State.FirstSeq);
+ Assert.Equal(10ul, state.Info.State.LastSeq);
+ }
+
+ [Fact]
+ public async Task TestMessageNeverExpire()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+
+ var js = new NatsJSContext(nats);
+ var kv = new NatsKVContext(js);
+
+ var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true, MaxAge = TimeSpan.FromSeconds(1) }, cancellationToken: cancellationToken);
+
+ // The first message we publish is set to "never expire", therefore it won't age out with the MaxAge policy.
+ await store.PutAsync($"k0", $"v0", TimeSpan.MaxValue, cancellationToken: cancellationToken);
+
+ for (var i = 1; i < 11; i++)
+ {
+ await store.PutAsync($"k{i}", $"v{i}", TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
+ }
+
+ var state = await store.GetStatusAsync();
+ Assert.Equal(11, state.Info.State.Messages);
+ Assert.Equal(1ul, state.Info.State.FirstSeq);
+ Assert.Equal(11ul, state.Info.State.LastSeq);
+
+ // Sleep for two seconds, only the first message should be there
+ await Task.Delay(2000);
+ state = await store.GetStatusAsync();
+ Assert.Equal(1, state.Info.State.Messages);
+ Assert.Equal(1ul, state.Info.State.FirstSeq);
+ Assert.Equal(11ul, state.Info.State.LastSeq);
+ }
+
[Fact]
public async Task History()
{