Skip to content

Commit

Permalink
Add filtering to KV method returning all keys
Browse files Browse the repository at this point in the history
  • Loading branch information
darkwatchuk committed Jul 5, 2024
1 parent e40befd commit c0c5213
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
10 changes: 10 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,14 @@ public interface INatsKVStore
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get a filtered set of keys in the bucket
/// </summary>
/// <param name="filters">Subject-based wildcard filters to filter on</param>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
}
8 changes: 6 additions & 2 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,18 @@ public async ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, Cancel
}

/// <inheritdoc />
public async IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default)
=> GetKeysAsync([">"], opts, cancellationToken);

/// <inheritdoc />
public async IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
opts ??= NatsKVWatchOpts.Default;

opts = opts with { IgnoreDeletes = false, MetaOnly = true, UpdatesOnly = false, };

// Type doesn't matter here, we're just using the watcher to get the keys
await using var watcher = await WatchInternalAsync<int>([">"], serializer: default, opts, cancellationToken);
await using var watcher = await WatchInternalAsync<int>(filters, serializer: default, opts, cancellationToken);

if (watcher.InitialConsumer.Info.NumPending == 0)
yield break;
Expand Down
34 changes: 34 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/GetKeysTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,38 @@ public async Task Get_keys_when_empty()

Assert.Equal(3, count);
}

[Fact]
public async Task Get_filtered_keys()
{
const string bucket = "b1";
var config = new NatsKVConfig(bucket);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJS();
await using var nats1 = server.CreateClientConnection();
var js1 = new NatsJSContext(nats1);
var kv1 = new NatsKVContext(js1);
var store1 = await kv1.CreateStoreAsync(config, cancellationToken: cancellationToken);

await store1.PutAsync("a.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("a.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("b.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("b.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("c.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("c.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("d", 2, cancellationToken: cancellationToken);

var ks1 = new List<string>();
await foreach (var k in store1.GetKeysAsync(new string[] { "d", "a.>", "c.>" }, cancellationToken: cancellationToken))
{
ks1.Add(k);
}

ks1.Sort();

Assert.Equal(new List<string> { "a.1", "a.2", "c.1", "c.2", "d" }, ks1);
}
}

0 comments on commit c0c5213

Please sign in to comment.