Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KV Filtering of keys #545

Merged
merged 7 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,14 @@ public interface INatsKVStore
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get a filtered set of keys in the bucket
/// </summary>
/// <param name="filters">Subject-based wildcard filters to filter on</param>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
}
8 changes: 6 additions & 2 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,18 @@ public async ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, Cancel
}

/// <inheritdoc />
public async IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default)
=> GetKeysAsync([">"], opts, cancellationToken);

/// <inheritdoc />
public async IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
opts ??= NatsKVWatchOpts.Default;

opts = opts with { IgnoreDeletes = false, MetaOnly = true, UpdatesOnly = false, };

// Type doesn't matter here, we're just using the watcher to get the keys
await using var watcher = await WatchInternalAsync<int>([">"], serializer: default, opts, cancellationToken);
await using var watcher = await WatchInternalAsync<int>(filters, serializer: default, opts, cancellationToken);

if (watcher.InitialConsumer.Info.NumPending == 0)
yield break;
Expand Down
36 changes: 36 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/GetKeysTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,40 @@ public async Task Get_keys_when_empty()

Assert.Equal(3, count);
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task Get_filtered_keys()
{
const string bucket = "b1";
var config = new NatsKVConfig(bucket);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJS();
await using var nats1 = server.CreateClientConnection();
var js1 = new NatsJSContext(nats1);
var kv1 = new NatsKVContext(js1);
var store1 = await kv1.CreateStoreAsync(config, cancellationToken: cancellationToken);

await store1.PutAsync("a.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("a.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("b.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("b.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("c.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("c.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("d", 2, cancellationToken: cancellationToken);

var ks1 = new List<string>();

// Multiple keys are only supported in NATS Server 2.10 and later
await foreach (var k in store1.GetKeysAsync(new string[] { "d", "a.>", "c.>" }, cancellationToken: cancellationToken))
{
ks1.Add(k);
}

ks1.Sort();

Assert.Equal(new List<string> { "a.1", "a.2", "c.1", "c.2", "d" }, ks1);
}
}
Loading