Skip to content

Commit

Permalink
Add KV Filtering of keys (#545)
Browse files Browse the repository at this point in the history
* Fixed version number sent to server and changed lang to .NET from C#. Same as V1 client.

* default version to 1.0.0

* changed default version to 2.0.0

* Use informational version attribute and cut down to get additional info such as -preview. Also changed name to .Net to .NET for the client name.

* Add filtering to KV method returning all keys

* Ignore KV keys filtering in Pre 2.10 tests
  • Loading branch information
darkwatchuk authored Jul 5, 2024
1 parent 4d47db1 commit 8e5493b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
10 changes: 10 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,14 @@ public interface INatsKVStore
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get a filtered set of keys in the bucket
/// </summary>
/// <param name="filters">Subject-based wildcard filters to filter on</param>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
}
8 changes: 6 additions & 2 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,18 @@ public async ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, Cancel
}

/// <inheritdoc />
public async IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default)
=> GetKeysAsync([">"], opts, cancellationToken);

/// <inheritdoc />
public async IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
opts ??= NatsKVWatchOpts.Default;

opts = opts with { IgnoreDeletes = false, MetaOnly = true, UpdatesOnly = false, };

// Type doesn't matter here, we're just using the watcher to get the keys
await using var watcher = await WatchInternalAsync<int>([">"], serializer: default, opts, cancellationToken);
await using var watcher = await WatchInternalAsync<int>(filters, serializer: default, opts, cancellationToken);

if (watcher.InitialConsumer.Info.NumPending == 0)
yield break;
Expand Down
36 changes: 36 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/GetKeysTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,40 @@ public async Task Get_keys_when_empty()

Assert.Equal(3, count);
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task Get_filtered_keys()
{
const string bucket = "b1";
var config = new NatsKVConfig(bucket);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJS();
await using var nats1 = server.CreateClientConnection();
var js1 = new NatsJSContext(nats1);
var kv1 = new NatsKVContext(js1);
var store1 = await kv1.CreateStoreAsync(config, cancellationToken: cancellationToken);

await store1.PutAsync("a.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("a.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("b.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("b.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("c.1", 1, cancellationToken: cancellationToken);
await store1.PutAsync("c.2", 2, cancellationToken: cancellationToken);
await store1.PutAsync("d", 2, cancellationToken: cancellationToken);

var ks1 = new List<string>();

// Multiple keys are only supported in NATS Server 2.10 and later
await foreach (var k in store1.GetKeysAsync(new string[] { "d", "a.>", "c.>" }, cancellationToken: cancellationToken))
{
ks1.Add(k);
}

ks1.Sort();

Assert.Equal(new List<string> { "a.1", "a.2", "c.1", "c.2", "d" }, ks1);
}
}

0 comments on commit 8e5493b

Please sign in to comment.