Skip to content

Commit

Permalink
Fix JetStream test flappers (#726)
Browse files Browse the repository at this point in the history
* Fix JS flappers

* Fix test proxy signal subject

* Fix test connection retry

* Fix JS Test

* Fix test revert

* Fix test cancellation
  • Loading branch information
mtmk authored Jan 25, 2025
1 parent 226c1d6 commit a297cb4
Show file tree
Hide file tree
Showing 14 changed files with 310 additions and 203 deletions.
3 changes: 3 additions & 0 deletions tests/NATS.Client.Core.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using NATS.Client.TestUtilities2;

namespace NATS.Client.Core.Tests;

public class ClusterTests(ITestOutputHelper output)
Expand Down Expand Up @@ -47,6 +49,7 @@ public async Task Seed_urls_on_retry(bool userAuthInUrl)
NoRandomize = true,
Url = $"{url1},{url2}",
});
await nats.ConnectRetryAsync();

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));

Expand Down
1 change: 1 addition & 0 deletions tests/NATS.Client.Core.Tests/TlsClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using NATS.Client.TestUtilities2;

namespace NATS.Client.Core.Tests;

Expand Down
3 changes: 2 additions & 1 deletion tests/NATS.Client.JetStream.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NATS.Client.Core.Tests;
using NATS.Client.TestUtilities2;

namespace NATS.Client.JetStream.Tests;

Expand All @@ -15,7 +16,7 @@ public async Task Form_a_local_cluster()
await cluster.StartAsync();
await using var nats = await cluster.Server1.CreateClientConnectionAsync();

await nats.PingAsync();
await nats.ConnectRetryAsync();

var urls = nats.ServerInfo!.ClientConnectUrls!.ToList();

Expand Down
193 changes: 103 additions & 90 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

Large diffs are not rendered by default.

27 changes: 21 additions & 6 deletions tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
using System.Buffers;
using NATS.Client.Core.Tests;
using NATS.Client.Core2.Tests;
using NATS.Client.TestUtilities2;

namespace NATS.Client.JetStream.Tests;

[Collection("nats-server")]
public class CustomSerializerTest
{
private readonly ITestOutputHelper _output;
private readonly NatsServerFixture _server;

public CustomSerializerTest(ITestOutputHelper output, NatsServerFixture server)
{
_output = output;
_server = server;
}

[Fact]
public async Task When_consuming_ack_should_be_serialized_normally_if_custom_serializer_used()
{
await using var server = await NatsServer.StartJSAsync();
await using var nats = await server.CreateClientConnectionAsync(new NatsOpts
await using var nats = new NatsConnection(new NatsOpts
{
Url = _server.Url,
SerializerRegistry = new Level42SerializerRegistry(),
RequestTimeout = TimeSpan.FromSeconds(10),
});
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();

var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token);

await js.PublishAsync("s1.1", new byte[] { 0 }, cancellationToken: cts.Token);
await js.PublishAsync("s1.2", new byte[] { 0 }, cancellationToken: cts.Token);
await js.PublishAsync($"{prefix}s1.1", new byte[] { 0 }, cancellationToken: cts.Token);
await js.PublishAsync($"{prefix}s1.2", new byte[] { 0 }, cancellationToken: cts.Token);

var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token);

// single ack
{
Expand Down
38 changes: 25 additions & 13 deletions tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
using System.Text.RegularExpressions;
using NATS.Client.Core.Tests;
using NATS.Client.Core2.Tests;
using NATS.Client.TestUtilities2;

namespace NATS.Client.JetStream.Tests;

[Collection("nats-server")]
public class DoubleAckNakDelayTests
{
private readonly ITestOutputHelper _output;
private readonly NatsServerFixture _server;

public DoubleAckNakDelayTests(ITestOutputHelper output) => _output = output;
public DoubleAckNakDelayTests(ITestOutputHelper output, NatsServerFixture server)
{
_output = output;
_server = server;
}

[Fact]
public async Task Double_ack_received_messages()
{
await using var server = await NatsServer.StartJSAsync();
var (nats1, proxy) = server.CreateProxiedClientConnection(new NatsOpts { RequestTimeout = TimeSpan.FromSeconds(10) });
await using var nats = nats1;
var proxy = _server.CreateProxy();
await using var nats = proxy.CreateNatsConnection();
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();

var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
await js.CreateStreamAsync($"{prefix}s1", [$"{prefix}s1.*"], cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token);

var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token);
var ack = await js.PublishAsync($"{prefix}s1.foo", 42, cancellationToken: cts.Token);
ack.EnsureSuccess();
var next = await consumer.NextAsync<int>(cancellationToken: cts.Token);
if (next is { } msg)
Expand All @@ -46,17 +56,19 @@ public async Task Double_ack_received_messages()
[Fact]
public async Task Delay_nak_received_messages()
{
await using var server = await NatsServer.StartJSAsync();
var (nats1, proxy) = server.CreateProxiedClientConnection(new NatsOpts { RequestTimeout = TimeSpan.FromSeconds(10) });
await using var nats = nats1;
var proxy = _server.CreateProxy();
await using var nats = proxy.CreateNatsConnection();
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();

var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token);

var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token);
var ack = await js.PublishAsync($"{prefix}s1.foo", 42, cancellationToken: cts.Token);
ack.EnsureSuccess();
var next = await consumer.NextAsync<int>(cancellationToken: cts.Token);
if (next is { } msg)
Expand Down
15 changes: 11 additions & 4 deletions tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public async Task Ordered_consumer_consume_handling()

var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));

var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", new[] { $"{prefix}s1.*" }), cts.Token);
var consumer = (NatsJSOrderedConsumer)await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token);
Expand Down Expand Up @@ -281,7 +281,7 @@ public async Task Ordered_consumer_consume_handling()
// Swallow heartbeats
proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m);

var consumeCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
var consumeCts = new CancellationTokenSource();
var consume = Task.Run(
async () =>
{
Expand All @@ -308,11 +308,18 @@ public async Task Ordered_consumer_consume_handling()
},
cts.Token);

await Retry.Until("timed out", () => Volatile.Read(ref timeoutNotifications) > 0, timeout: TimeSpan.FromSeconds(20));
await Retry.Until("timed out", () => Volatile.Read(ref timeoutNotifications) > 0, timeout: TimeSpan.FromSeconds(30));
consumeCts.Cancel();
await consume;

Assert.True(Volatile.Read(ref timeoutNotifications) > 0);

try
{
await consume;
}
catch (OperationCanceledException)
{
}
}

[Fact]
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task Create_get_consumer()
[Fact]
public async Task List_delete_consumer()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await using var server = await NatsServer.StartJSAsync();
var nats = await server.CreateClientConnectionAsync();
Expand Down
Loading

0 comments on commit a297cb4

Please sign in to comment.