diff --git a/tests/NATS.Client.Core.Tests/ClusterTests.cs b/tests/NATS.Client.Core.Tests/ClusterTests.cs index 33cb324e..c6acd099 100644 --- a/tests/NATS.Client.Core.Tests/ClusterTests.cs +++ b/tests/NATS.Client.Core.Tests/ClusterTests.cs @@ -1,3 +1,5 @@ +using NATS.Client.TestUtilities2; + namespace NATS.Client.Core.Tests; public class ClusterTests(ITestOutputHelper output) @@ -47,6 +49,7 @@ public async Task Seed_urls_on_retry(bool userAuthInUrl) NoRandomize = true, Url = $"{url1},{url2}", }); + await nats.ConnectRetryAsync(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); diff --git a/tests/NATS.Client.Core.Tests/TlsClientTest.cs b/tests/NATS.Client.Core.Tests/TlsClientTest.cs index 830ef709..5655df56 100644 --- a/tests/NATS.Client.Core.Tests/TlsClientTest.cs +++ b/tests/NATS.Client.Core.Tests/TlsClientTest.cs @@ -2,6 +2,7 @@ using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Text; +using NATS.Client.TestUtilities2; namespace NATS.Client.Core.Tests; diff --git a/tests/NATS.Client.JetStream.Tests/ClusterTests.cs b/tests/NATS.Client.JetStream.Tests/ClusterTests.cs index 47d3cf30..bd773846 100644 --- a/tests/NATS.Client.JetStream.Tests/ClusterTests.cs +++ b/tests/NATS.Client.JetStream.Tests/ClusterTests.cs @@ -1,4 +1,5 @@ using NATS.Client.Core.Tests; +using NATS.Client.TestUtilities2; namespace NATS.Client.JetStream.Tests; @@ -15,7 +16,7 @@ public async Task Form_a_local_cluster() await cluster.StartAsync(); await using var nats = await cluster.Server1.CreateClientConnectionAsync(); - await nats.PingAsync(); + await nats.ConnectRetryAsync(); var urls = nats.ServerInfo!.ClientConnectUrls!.ToList(); diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index d1101f65..533a358a 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -3,6 +3,7 @@ using NATS.Client.Core.Tests; using NATS.Client.Core2.Tests; using NATS.Client.JetStream.Models; +using NATS.Client.TestUtilities; using NATS.Client.TestUtilities2; namespace NATS.Client.JetStream.Tests; @@ -61,23 +62,26 @@ await Assert.ThrowsAnyAsync(async () => [Fact] public async Task Consume_msgs_test() { - await using var server = await NatsServer.StartJSAsync(); - var (nats, proxy) = server.CreateProxiedClientConnection(); + var proxy = _server.CreateProxy(); + await using var nats = proxy.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); for (var i = 0; i < 30; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10 }; - var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); var count = 0; await foreach (var msg in consumer.ConsumeAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token)) { @@ -90,14 +94,13 @@ public async Task Consume_msgs_test() int? PullCount() => proxy? .ClientFrames - .Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")); + .Count(f => f.Message.StartsWith($"PUB $JS.API.CONSUMER.MSG.NEXT.{prefix}s1.{prefix}c1")); await Retry.Until( reason: "received enough pulls", condition: () => PullCount() > 4, action: () => { - _output.WriteLine($"### PullCount:{PullCount()}"); return Task.CompletedTask; }, retryDelay: TimeSpan.FromSeconds(3), @@ -105,7 +108,7 @@ await Retry.Until( var msgNextRequests = proxy .ClientFrames - .Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) + .Where(f => f.Message.StartsWith($"PUB $JS.API.CONSUMER.MSG.NEXT.{prefix}s1.{prefix}c1")) .ToList(); foreach (var frame in msgNextRequests) @@ -120,38 +123,44 @@ await Retry.Until( [Fact] public async Task Consume_idle_heartbeat_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await using var server = await NatsServer.StartJSWithTraceAsync(_output); + var signal = new WaitSignal(TimeSpan.FromSeconds(60)); + var logger = new InMemoryTestLoggerFactory(LogLevel.Debug, log => + { + if (log is { Category: "NATS.Client.JetStream.Internal.NatsJSConsume", LogLevel: LogLevel.Debug }) + { + if (log.EventId == NatsJSLogEvents.IdleTimeout) + signal.Pulse(); + } + }); - var (nats, proxy) = server.CreateProxiedClientConnection(); + var proxy = _server.CreateProxy(); + await using var nats = new NatsConnection(new NatsOpts + { + Url = $"nats://127.0.0.1:{proxy.Port}", + ConnectTimeout = TimeSpan.FromSeconds(10), + LoggerFactory = logger, + }); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); // Swallow heartbeats proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); - await nats.ConnectRetryAsync(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var js = new NatsJSContext(nats); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); - var ack = await js.PublishAsync("s1.foo", new TestData { Test = 0 }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", new TestData { Test = 0 }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); - var signal = new WaitSignal(TimeSpan.FromSeconds(30)); - server.OnLog += log => - { - if (log is { Category: "NATS.Client.JetStream.Internal.NatsJSConsume", LogLevel: LogLevel.Debug }) - { - if (log.EventId == NatsJSLogEvents.IdleTimeout) - signal.Pulse(); - } - }; var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10, IdleHeartbeat = TimeSpan.FromSeconds(5), }; - var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); var count = 0; var cc = await consumer.ConsumeInternalAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) @@ -164,28 +173,13 @@ public async Task Consume_idle_heartbeat_test() await Retry.Until( "all pull requests are received", - () => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) >= 2); + () => proxy.ClientFrames.Count(f => f.Message.StartsWith($"PUB $JS.API.CONSUMER.MSG.NEXT.{prefix}s1.{prefix}c1")) >= 2); var msgNextRequests = proxy .ClientFrames - .Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) + .Where(f => f.Message.StartsWith($"PUB $JS.API.CONSUMER.MSG.NEXT.{prefix}s1.{prefix}c1")) .ToList(); - // In some cases we are receiving more than two requests which - // is possible if the tests are running in a slow container and taking - // more than the timeout? Looking at the test and the code I can't make - // sense of it, really, but I'm going to assume it's fine to receive 3 pull - // requests as well as 2 since test failure reported 3 and failed once. - if (msgNextRequests.Count > 2) - { - _output.WriteLine($"Pull request count more than expected: {msgNextRequests.Count}"); - foreach (var frame in msgNextRequests) - { - _output.WriteLine($"PULL REQUEST: {frame}"); - } - } - - // Still fail and check traces if it happens again Assert.True(msgNextRequests.Count is 2); // Pull requests @@ -198,23 +192,28 @@ await Retry.Until( [Fact] public async Task Consume_reconnect_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await using var server = await NatsServer.StartJSAsync(); + var proxy = _server.CreateProxy(); + await using var nats = proxy.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + + await using var nats2 = _server.CreateNatsConnection(); + await nats2.ConnectRetryAsync(); + + var prefix = _server.GetNextId(); - var (nats, proxy) = server.CreateProxiedClientConnection(); - await using var nats2 = await server.CreateClientConnectionAsync(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var js = new NatsJSContext(nats); var js2 = new NatsJSContext(nats2); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10, }; - var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); // Not interested in management messages sent upto this point await proxy.FlushFramesAsync(nats); @@ -238,13 +237,13 @@ public async Task Consume_reconnect_test() // Send a message before reconnect { - var ack = await js2.PublishAsync("s1.foo", new TestData { Test = 0 }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js2.PublishAsync($"{prefix}s1.foo", new TestData { Test = 0 }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } await Retry.Until( "acked", - () => proxy.ClientFrames.Any(f => f.Message.StartsWith("PUB $JS.ACK.s1.c1"))); + () => proxy.ClientFrames.Any(f => f.Message.StartsWith($"PUB $JS.ACK.{prefix}s1.{prefix}c1"))); Assert.Contains(proxy.ClientFrames, f => f.Message.Contains("CONSUMER.MSG.NEXT")); @@ -258,7 +257,7 @@ await Retry.Until( // Send a message to be received after reconnect { - var ack = await js2.PublishAsync("s1.foo", new TestData { Test = 1 }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js2.PublishAsync($"{prefix}s1.foo", new TestData { Test = 1 }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -348,14 +347,15 @@ await Retry.Until( [Fact] public async Task Consume_stop_test() { - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var stream = await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); var consumerOpts = new NatsJSConsumeOpts { @@ -366,7 +366,7 @@ public async Task Consume_stop_test() for (var i = 0; i < 10; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -398,7 +398,7 @@ await Retry.Until( "ack pending 9", async () => { - var c = await js.GetConsumerAsync("s1", "c1", cts.Token); + var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); return c.Info.NumAckPending == 9; }, timeout: TimeSpan.FromSeconds(20)); @@ -413,7 +413,7 @@ await Retry.Until( "ack pending 0", async () => { - var c = await js.GetConsumerAsync("s1", "c1", cts.Token); + var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); return c.Info.NumAckPending == 0; }, timeout: TimeSpan.FromSeconds(20)); @@ -424,18 +424,21 @@ await Retry.Until( [Fact] public async Task Serialization_errors() { - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); - var ack = await js.PublishAsync("s1.foo", "not an int", cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", "not an int", cancellationToken: cts.Token); ack.EnsureSuccess(); - var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); await foreach (var msg in consumer.ConsumeAsync(cancellationToken: cts.Token)) { @@ -452,24 +455,25 @@ public async Task Serialization_errors() [Fact] public async Task Consume_right_amount_of_messages() { - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var js = new NatsJSContext(nats); - await js.CreateStreamAsync("s1", ["s1.*"], cts.Token); + await js.CreateStreamAsync($"{prefix}s1", [$"{prefix}s1.*"], cts.Token); var payload = new byte[1024]; for (var i = 0; i < 50; i++) { - var ack = await js.PublishAsync("s1.foo", payload, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", payload, cancellationToken: cts.Token); ack.EnsureSuccess(); } // Max messages { - var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); var opts = new NatsJSConsumeOpts { MaxMsgs = 10, }; var count = 0; await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) @@ -479,16 +483,21 @@ public async Task Consume_right_amount_of_messages() break; } - await Retry.Until("consumer stats updated", async () => - { - var info = (await js.GetConsumerAsync("s1", "c1", cts.Token)).Info; - return info is { NumAckPending: 6, NumPending: 40 }; - }); + await Retry.Until( + "consumer stats updated for Max messages", + async () => + { + var info = (await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token)).Info; + _output.WriteLine($"Consumer stats updated for Max messages {info.NumAckPending} of {info.NumPending}"); + return info is { NumAckPending: 6, NumPending: 40 }; + }, + retryDelay: TimeSpan.FromSeconds(3), + timeout: TimeSpan.FromSeconds(60)); } // Max bytes { - var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c2", cancellationToken: cts.Token); + var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c2", cancellationToken: cts.Token); var opts = new NatsJSConsumeOpts { MaxBytes = 10 * (1024 + 50), }; var count = 0; await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) @@ -498,35 +507,39 @@ await Retry.Until("consumer stats updated", async () => break; } - await Retry.Until("consumer stats updated", async () => - { - var info = (await js.GetConsumerAsync("s1", "c2", cts.Token)).Info; - return info is { NumAckPending: 6, NumPending: 40 }; - }); + await Retry.Until( + "consumer stats updated for Max bytes", + async () => + { + var info = (await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c2", cts.Token)).Info; + _output.WriteLine($"Consumer stats updated for Max bytes {info.NumAckPending} of {info.NumPending}"); + return info is { NumAckPending: 5, NumPending: 41 }; + }, + retryDelay: TimeSpan.FromSeconds(3), + timeout: TimeSpan.FromSeconds(60)); } } [Fact] public async Task Consume_right_amount_of_messages_when_ack_wait_exceeded() { - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); var js = new NatsJSContext(nats); - await js.CreateStreamAsync("email-queue", ["email.>"], cts.Token); - await js.PublishAsync("email.queue", "1", cancellationToken: cts.Token); - await js.PublishAsync("email.queue", "2", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}email-queue", [$"{prefix}email.>"], cts.Token); + await js.PublishAsync($"{prefix}email.queue", "1", cancellationToken: cts.Token); + await js.PublishAsync($"{prefix}email.queue", "2", cancellationToken: cts.Token); var consumer = await js.CreateOrUpdateConsumerAsync( - stream: "email-queue", - new ConsumerConfig("email-queue-consumer") { AckWait = TimeSpan.FromSeconds(10) }, + stream: $"{prefix}email-queue", + new ConsumerConfig($"{prefix}email-queue-consumer") { AckWait = TimeSpan.FromSeconds(10) }, cancellationToken: cts.Token); var count = 0; await foreach (var msg in consumer.ConsumeAsync(opts: new NatsJSConsumeOpts { MaxMsgs = 1 }, cancellationToken: cts.Token)) { - _output.WriteLine($"Received: {msg.Data}"); - // Only wait for the first couple of messages // to get close to the ack wait time if (count < 2) diff --git a/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs b/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs index e910b709..a865e486 100644 --- a/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs @@ -1,29 +1,44 @@ using System.Buffers; using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; +using NATS.Client.TestUtilities2; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class CustomSerializerTest { + private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; + + public CustomSerializerTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } + [Fact] public async Task When_consuming_ack_should_be_serialized_normally_if_custom_serializer_used() { - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(new NatsOpts + await using var nats = new NatsConnection(new NatsOpts { + Url = _server.Url, SerializerRegistry = new Level42SerializerRegistry(), RequestTimeout = TimeSpan.FromSeconds(10), }); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); - await js.PublishAsync("s1.1", new byte[] { 0 }, cancellationToken: cts.Token); - await js.PublishAsync("s1.2", new byte[] { 0 }, cancellationToken: cts.Token); + await js.PublishAsync($"{prefix}s1.1", new byte[] { 0 }, cancellationToken: cts.Token); + await js.PublishAsync($"{prefix}s1.2", new byte[] { 0 }, cancellationToken: cts.Token); - var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); // single ack { diff --git a/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs b/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs index d7d388b8..c7d3cd3b 100644 --- a/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs +++ b/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs @@ -1,28 +1,38 @@ using System.Text.RegularExpressions; using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; +using NATS.Client.TestUtilities2; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class DoubleAckNakDelayTests { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public DoubleAckNakDelayTests(ITestOutputHelper output) => _output = output; + public DoubleAckNakDelayTests(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Double_ack_received_messages() { - await using var server = await NatsServer.StartJSAsync(); - var (nats1, proxy) = server.CreateProxiedClientConnection(new NatsOpts { RequestTimeout = TimeSpan.FromSeconds(10) }); - await using var nats = nats1; + var proxy = _server.CreateProxy(); + await using var nats = proxy.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", [$"{prefix}s1.*"], cts.Token); + var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); - var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", 42, cancellationToken: cts.Token); ack.EnsureSuccess(); var next = await consumer.NextAsync(cancellationToken: cts.Token); if (next is { } msg) @@ -46,17 +56,19 @@ public async Task Double_ack_received_messages() [Fact] public async Task Delay_nak_received_messages() { - await using var server = await NatsServer.StartJSAsync(); - var (nats1, proxy) = server.CreateProxiedClientConnection(new NatsOpts { RequestTimeout = TimeSpan.FromSeconds(10) }); - await using var nats = nats1; + var proxy = _server.CreateProxy(); + await using var nats = proxy.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + var consumer = await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); - var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", 42, cancellationToken: cts.Token); ack.EnsureSuccess(); var next = await consumer.NextAsync(cancellationToken: cts.Token); if (next is { } msg) diff --git a/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs b/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs index 54daee92..abf07a35 100644 --- a/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs @@ -245,7 +245,7 @@ public async Task Ordered_consumer_consume_handling() var js = new NatsJSContext(nats); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", new[] { $"{prefix}s1.*" }), cts.Token); var consumer = (NatsJSOrderedConsumer)await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); @@ -281,7 +281,7 @@ public async Task Ordered_consumer_consume_handling() // Swallow heartbeats proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); - var consumeCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); + var consumeCts = new CancellationTokenSource(); var consume = Task.Run( async () => { @@ -308,11 +308,18 @@ public async Task Ordered_consumer_consume_handling() }, cts.Token); - await Retry.Until("timed out", () => Volatile.Read(ref timeoutNotifications) > 0, timeout: TimeSpan.FromSeconds(20)); + await Retry.Until("timed out", () => Volatile.Read(ref timeoutNotifications) > 0, timeout: TimeSpan.FromSeconds(30)); consumeCts.Cancel(); - await consume; Assert.True(Volatile.Read(ref timeoutNotifications) > 0); + + try + { + await consume; + } + catch (OperationCanceledException) + { + } } [Fact] diff --git a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs index 7038aae6..bc239aa2 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs @@ -41,7 +41,7 @@ public async Task Create_get_consumer() [Fact] public async Task List_delete_consumer() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await using var server = await NatsServer.StartJSAsync(); var nats = await server.CreateClientConnectionAsync(); diff --git a/tests/NATS.Client.JetStream.Tests/OrderedConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/OrderedConsumerTest.cs index 79f7cbd7..29ef2b24 100644 --- a/tests/NATS.Client.JetStream.Tests/OrderedConsumerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/OrderedConsumerTest.cs @@ -1,33 +1,43 @@ using System.Diagnostics; using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; +using NATS.Client.TestUtilities2; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class OrderedConsumerTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public OrderedConsumerTest(ITestOutputHelper output) => _output = output; + public OrderedConsumerTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Consume_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync($"{prefix}s1", [$"{prefix}s1.*"], cts.Token); for (var i = 0; i < 10; i++) { - await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + await js.PublishAsync($"{prefix}s1.foo", i, cancellationToken: cts.Token); } var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); var count = 0; - _output.WriteLine("Consuming..."); var consumeOpts = new NatsJSConsumeOpts { MaxMsgs = 3, @@ -35,7 +45,6 @@ public async Task Consume_test() }; await foreach (var msg in consumer.ConsumeAsync(opts: consumeOpts, cancellationToken: cts.Token)) { - _output.WriteLine($"[RCV] {msg.Data}"); Assert.Equal(count, msg.Data); if (++count == 10) break; @@ -51,7 +60,7 @@ public async Task Consume_reconnect_publish() var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var stream = await js.CreateStreamAsync("s1", ["s1.*"], cts.Token); for (var i = 0; i < 50; i++) { @@ -87,23 +96,25 @@ public async Task Consume_reconnect_publish() [Fact] public async Task Fetch_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var stream = await js.CreateStreamAsync($"{prefix}s1", [$"{prefix}s1.*"], cts.Token); for (var i = 0; i < 10; i++) { - await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + await js.PublishAsync($"{prefix}s1.foo", i, cancellationToken: cts.Token); } var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); for (var i = 0; i < 10;) { - _output.WriteLine("Fetching..."); var fetchOpts = new NatsJSFetchOpts { MaxMsgs = 3, @@ -111,7 +122,6 @@ public async Task Fetch_test() }; await foreach (var msg in consumer.FetchAsync(opts: fetchOpts, cancellationToken: cts.Token)) { - _output.WriteLine($"[RCV] {msg.Data}"); Assert.Equal(i, msg.Data); i++; } @@ -121,12 +131,15 @@ public async Task Fetch_test() [Fact] public async Task Fetch_no_wait_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync($"{prefix}s1", [$"{prefix}s1.*"], cts.Token); var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); @@ -140,16 +153,14 @@ public async Task Fetch_no_wait_test() stopwatch.Stop(); - _output.WriteLine($"stopwatch.Elapsed: {stopwatch.Elapsed}"); - Assert.Equal(0, count); - Assert.True(stopwatch.Elapsed < TimeSpan.FromSeconds(3)); + Assert.True(stopwatch.Elapsed < TimeSpan.FromSeconds(10)); // Where there is less than we want to fetch, we should get all the messages // without waiting for the timeout. for (var i = 0; i < 10; i++) { - await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + await js.PublishAsync($"{prefix}s1.foo", i, cancellationToken: cts.Token); } stopwatch.Restart(); @@ -159,7 +170,6 @@ public async Task Fetch_no_wait_test() var currentCount = 0; await foreach (var msg in consumer.FetchNoWaitAsync(opts: new NatsJSFetchOpts { MaxMsgs = 6 }, cancellationToken: cts.Token)) { - _output.WriteLine($"[RCV][{iterationCount}] {msg.Data}"); Assert.Equal(count, msg.Data); count++; currentCount++; @@ -178,29 +188,31 @@ public async Task Fetch_no_wait_test() Assert.Equal(2, iterationCount); Assert.Equal(10, count); - Assert.True(stopwatch.Elapsed < TimeSpan.FromSeconds(3)); + Assert.True(stopwatch.Elapsed < TimeSpan.FromSeconds(10)); } [Fact] public async Task Next_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync($"{prefix}s1", [$"{prefix}s1.*"], cts.Token); for (var i = 0; i < 10; i++) { - await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + await js.PublishAsync($"{prefix}s1.foo", i, cancellationToken: cts.Token); } var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); for (var i = 0; i < 10;) { - _output.WriteLine("Next..."); var nextOpts = new NatsJSNextOpts { Expires = TimeSpan.FromSeconds(3), @@ -209,7 +221,6 @@ public async Task Next_test() if (next is { } msg) { - _output.WriteLine($"[RCV] {msg.Data}"); Assert.Equal(i, msg.Data); i++; } diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs index a067554b..fae93e94 100644 --- a/tests/NATS.Client.JetStream.Tests/PublishTest.cs +++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs @@ -1,32 +1,42 @@ using System.Text.RegularExpressions; using Microsoft.Extensions.Logging; using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; +using NATS.Client.TestUtilities; using NATS.Client.TestUtilities2; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class PublishTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public PublishTest(ITestOutputHelper output) => _output = output; + public PublishTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Publish_test() { - await using var server = await NatsServer.StartJSAsync(); - await using var nats = await server.CreateClientConnectionAsync(); + await using var nats = _server.CreateNatsConnection(); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await js.CreateStreamAsync("s1", new[] { "s1.>" }, cts.Token); - await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.>" }, cts.Token); + await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); // Publish { var ack = await js.PublishAsync( - "s1.foo", + $"{prefix}s1.foo", new TestData { Test = 1, @@ -35,7 +45,7 @@ public async Task Publish_test() cancellationToken: cts.Token); Assert.Null(ack.Error); Assert.Equal(1, (int)ack.Seq); - Assert.Equal("s1", ack.Stream); + Assert.Equal($"{prefix}s1", ack.Stream); Assert.False(ack.Duplicate); Assert.True(ack.IsSuccess()); } @@ -43,7 +53,7 @@ public async Task Publish_test() // Duplicate { var ack1 = await js.PublishAsync( - subject: "s1.foo", + subject: $"{prefix}s1.foo", data: new TestData { Test = 2 }, serializer: TestDataJsonSerializer.Default, opts: new NatsJSPubOpts { MsgId = "2" }, @@ -54,7 +64,7 @@ public async Task Publish_test() Assert.True(ack1.IsSuccess()); var ack2 = await js.PublishAsync( - subject: "s1.foo", + subject: $"{prefix}s1.foo", data: new TestData { Test = 2 }, serializer: TestDataJsonSerializer.Default, opts: new NatsJSPubOpts { MsgId = "2" }, @@ -67,14 +77,14 @@ public async Task Publish_test() // ExpectedStream { var ack1 = await js.PublishAsync( - subject: "s1.foo", + subject: $"{prefix}s1.foo", data: 1, - opts: new NatsJSPubOpts { ExpectedStream = "s1" }, + opts: new NatsJSPubOpts { ExpectedStream = $"{prefix}s1" }, cancellationToken: cts.Token); Assert.Null(ack1.Error); var ack2 = await js.PublishAsync( - subject: "s1.foo", + subject: $"{prefix}s1.foo", data: 2, opts: new NatsJSPubOpts { ExpectedStream = "non-existent-stream" }, cancellationToken: cts.Token); @@ -87,20 +97,20 @@ public async Task Publish_test() // ExpectedLastSequence { var ack1 = await js.PublishAsync( - subject: "s1.foo", + subject: $"{prefix}s1.foo", data: 1, cancellationToken: cts.Token); Assert.Null(ack1.Error); var ack2 = await js.PublishAsync( - subject: "s1.foo", + subject: $"{prefix}s1.foo", data: 2, opts: new NatsJSPubOpts { ExpectedLastSequence = ack1.Seq }, cancellationToken: cts.Token); Assert.Null(ack2.Error); var ack3 = await js.PublishAsync( - subject: "s1.foo", + subject: $"{prefix}s1.foo", data: 3, opts: new NatsJSPubOpts { ExpectedLastSequence = ack1.Seq }, cancellationToken: cts.Token); @@ -112,20 +122,20 @@ public async Task Publish_test() // ExpectedLastSubjectSequence { var ack1 = await js.PublishAsync( - subject: "s1.foo.ExpectedLastSubjectSequence", + subject: $"{prefix}s1.foo.ExpectedLastSubjectSequence", data: 1, cancellationToken: cts.Token); Assert.Null(ack1.Error); var ack2 = await js.PublishAsync( - subject: "s1.foo.ExpectedLastSubjectSequence", + subject: $"{prefix}s1.foo.ExpectedLastSubjectSequence", data: 2, opts: new NatsJSPubOpts { ExpectedLastSubjectSequence = ack1.Seq }, cancellationToken: cts.Token); Assert.Null(ack2.Error); var ack3 = await js.PublishAsync( - subject: "s1.foo.ExpectedLastSubjectSequence", + subject: $"{prefix}s1.foo.ExpectedLastSubjectSequence", data: 3, opts: new NatsJSPubOpts { ExpectedLastSubjectSequence = ack1.Seq }, cancellationToken: cts.Token); @@ -137,21 +147,21 @@ public async Task Publish_test() // ExpectedLastMsgId { var ack1 = await js.PublishAsync( - subject: "s1.foo.ExpectedLastSubjectSequence", + subject: $"{prefix}s1.foo.ExpectedLastSubjectSequence", data: 1, opts: new NatsJSPubOpts { MsgId = "ExpectedLastMsgId-1" }, cancellationToken: cts.Token); Assert.Null(ack1.Error); var ack2 = await js.PublishAsync( - subject: "s1.foo.ExpectedLastSubjectSequence", + subject: $"{prefix}s1.foo.ExpectedLastSubjectSequence", data: 2, opts: new NatsJSPubOpts { MsgId = "ExpectedLastMsgId-2", ExpectedLastMsgId = "ExpectedLastMsgId-1" }, cancellationToken: cts.Token); Assert.Null(ack2.Error); var ack3 = await js.PublishAsync( - subject: "s1.foo.ExpectedLastSubjectSequence", + subject: $"{prefix}s1.foo.ExpectedLastSubjectSequence", data: 3, opts: new NatsJSPubOpts { MsgId = "ExpectedLastMsgId-3", ExpectedLastMsgId = "unexpected-msg-id" }, cancellationToken: cts.Token); @@ -164,14 +174,26 @@ public async Task Publish_test() [Fact] public async Task Publish_retry_test() { - var ackRegex = new Regex(@"{""stream"":""s1"",\s*""seq"":\d+}"); + var retryCount = 0; + var logger = new InMemoryTestLoggerFactory(LogLevel.Debug, log => + { + if (log is { LogLevel: LogLevel.Debug } && log.EventId == NatsJSLogEvents.PublishNoResponseRetry) + { + Interlocked.Increment(ref retryCount); + } + }); - // give enough time for retries to avoid NatsJSPublishNoResponseExceptions - var natsOpts = NatsOpts.Default with { RequestTimeout = TimeSpan.FromSeconds(3) }; + var proxy = _server.CreateProxy(); + await using var nats = new NatsConnection(new NatsOpts + { + Url = $"nats://127.0.0.1:{proxy.Port}", + ConnectTimeout = TimeSpan.FromSeconds(10), + RequestTimeout = TimeSpan.FromSeconds(3), // give enough time for retries to avoid NatsJSPublishNoResponseExceptions + LoggerFactory = logger, + }); + var prefix = _server.GetNextId(); - await using var server = await NatsServer.StartJSAsync(); - var (nats1, proxy) = server.CreateProxiedClientConnection(natsOpts); - await using var nats = nats1; + var ackRegex = new Regex($$"""{"stream":"{{prefix}}s1",\s*"seq":\s*\d+}"""); var swallowAcksCount = 0; proxy.ServerInterceptors.Add(m => @@ -187,32 +209,23 @@ public async Task Publish_retry_test() return m; }); - var retryCount = 0; - server.OnLog += log => - { - if (log is { LogLevel: LogLevel.Debug } && log.EventId == NatsJSLogEvents.PublishNoResponseRetry) - { - Interlocked.Increment(ref retryCount); - } - }; - - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(45)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); // use different connection to create stream and consumer to avoid request timeouts - await using var nats0 = await server.CreateClientConnectionAsync(); + await using var nats0 = _server.CreateNatsConnection(); await nats.ConnectRetryAsync(); await nats0.ConnectRetryAsync(); var js0 = new NatsJSContext(nats0); - await js0.CreateStreamAsync("s1", new[] { "s1.>" }, cts.Token); - await js0.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js0.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.>" }, cts.Token); + await js0.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); var js = new NatsJSContext(nats); // Publish succeeds without retry { - var ack = await js.PublishAsync("s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 2 }, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 2 }, cancellationToken: cts.Token); ack.EnsureSuccess(); Assert.Equal(0, Volatile.Read(ref retryCount)); @@ -226,7 +239,7 @@ public async Task Publish_retry_test() Interlocked.Exchange(ref retryCount, 0); Interlocked.Exchange(ref swallowAcksCount, 1); - var ack = await js.PublishAsync("s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 2 }, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 2 }, cancellationToken: cts.Token); ack.EnsureSuccess(); Assert.Equal(1, Volatile.Read(ref retryCount)); @@ -239,7 +252,7 @@ public async Task Publish_retry_test() Interlocked.Exchange(ref retryCount, 0); Interlocked.Exchange(ref swallowAcksCount, 2); - var ack = await js.PublishAsync("s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 3 }, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 3 }, cancellationToken: cts.Token); ack.EnsureSuccess(); Assert.Equal(2, Volatile.Read(ref retryCount)); @@ -253,7 +266,7 @@ public async Task Publish_retry_test() Interlocked.Exchange(ref swallowAcksCount, 2); await Assert.ThrowsAsync(async () => - await js.PublishAsync("s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 2 }, cancellationToken: cts.Token)); + await js.PublishAsync($"{prefix}s1.foo", 1, opts: new NatsJSPubOpts { RetryAttempts = 2 }, cancellationToken: cts.Token)); Assert.Equal(2, Volatile.Read(ref retryCount)); await Retry.Until("ack received", () => proxy.Frames.Count(f => ackRegex.IsMatch(f.Message)) == 2, timeout: TimeSpan.FromSeconds(20)); diff --git a/tests/NATS.Client.JetStream.Tests/Utils.cs b/tests/NATS.Client.JetStream.Tests/Utils.cs index 57f010fe..d72e65a0 100644 --- a/tests/NATS.Client.JetStream.Tests/Utils.cs +++ b/tests/NATS.Client.JetStream.Tests/Utils.cs @@ -1,3 +1,5 @@ +using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Tests; @@ -9,4 +11,21 @@ public static ValueTask CreateOrUpdateConsumerAsync(this NatsJS public static ValueTask CreateStreamAsync(this NatsJSContext context, string stream, string[] subjects, CancellationToken cancellationToken = default) => context.CreateStreamAsync(new StreamConfig { Name = stream, Subjects = subjects }, cancellationToken); + + public static NatsProxy CreateProxy(this NatsServerFixture server) + => new(new Uri(server.Url).Port); + + public static NatsConnection CreateNatsConnection(this NatsProxy proxy) + => new(new NatsOpts + { + Url = $"nats://127.0.0.1:{proxy.Port}", + ConnectTimeout = TimeSpan.FromSeconds(10), + }); + + public static NatsConnection CreateNatsConnection(this NatsServerFixture server) + => new(new NatsOpts + { + Url = server.Url, + ConnectTimeout = TimeSpan.FromSeconds(10), + }); } diff --git a/tests/NATS.Client.TestUtilities/NatsProxy.cs b/tests/NATS.Client.TestUtilities/NatsProxy.cs index 62a21c85..62909b51 100644 --- a/tests/NATS.Client.TestUtilities/NatsProxy.cs +++ b/tests/NATS.Client.TestUtilities/NatsProxy.cs @@ -121,7 +121,7 @@ public IReadOnlyList Frames } } - public IReadOnlyList ClientFrames => Frames.Where(f => f.Origin == "C").ToList(); + public IReadOnlyList ClientFrames => Frames.Where(f => f.Origin == "C" && !f.Message.Contains("__PROXY_SIGNAL_SYNC__")).ToList(); public IReadOnlyList ServerFrames => Frames.Where(f => f.Origin == "S").ToList(); @@ -152,13 +152,19 @@ public void Reset() public async Task FlushFramesAsync(NatsConnection nats) { - var subject = $"_SIGNAL_SYNC_{Interlocked.Increment(ref _syncCount)}"; + var subject = $"__PROXY_SIGNAL_SYNC__{Interlocked.Increment(ref _syncCount)}"; await nats.PublishAsync(subject); await Retry.Until( "flush sync frame", - () => AllFrames.Any(f => f.Message == $"PUB {subject} 0␍␊")); + async () => + { + await nats.PublishAsync(subject); + return AllFrames.Any(f => f.Message == $"PUB {subject} 0␍␊"); + }, + retryDelay: TimeSpan.FromSeconds(1), + timeout: TimeSpan.FromSeconds(60)); lock (_frames) _frames.Clear(); diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index 86cbf4ca..ba8cc305 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -308,9 +308,12 @@ public async ValueTask StopAsync() { _cancellationTokenSource?.Cancel(); // trigger of process kill. _cancellationTokenSource?.Dispose(); - ServerProcess!.Kill(); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await ServerProcess!.WaitForExitAsync(cts.Token); + if (ServerProcess != null) + { + ServerProcess.Kill(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await ServerProcess.WaitForExitAsync(cts.Token); + } } catch (OperationCanceledException) { @@ -333,9 +336,12 @@ public async ValueTask DisposeAsync() { _cancellationTokenSource?.Cancel(); // trigger of process kill. _cancellationTokenSource?.Dispose(); - ServerProcess!.Kill(); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await ServerProcess!.WaitForExitAsync(cts.Token); + if (ServerProcess != null) + { + ServerProcess.Kill(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await ServerProcess.WaitForExitAsync(cts.Token)!; + } } catch (OperationCanceledException) { diff --git a/tests/NATS.Client.TestUtilities/Utils.cs b/tests/NATS.Client.TestUtilities/Utils.cs index f55f9e90..835f362b 100644 --- a/tests/NATS.Client.TestUtilities/Utils.cs +++ b/tests/NATS.Client.TestUtilities/Utils.cs @@ -15,7 +15,7 @@ public static class Retry { public static async Task Until(string reason, Func condition, Func? action = null, TimeSpan? timeout = null, TimeSpan? retryDelay = null) { - timeout ??= TimeSpan.FromSeconds(10); + timeout ??= TimeSpan.FromSeconds(30); var delay1 = retryDelay ?? TimeSpan.FromSeconds(.1); var stopwatch = Stopwatch.StartNew(); @@ -33,7 +33,7 @@ public static async Task Until(string reason, Func condition, Func? public static async Task Until(string reason, Func> condition, Func? action = null, TimeSpan? timeout = null, TimeSpan? retryDelay = null) { - timeout ??= TimeSpan.FromSeconds(10); + timeout ??= TimeSpan.FromSeconds(30); var delay1 = retryDelay ?? TimeSpan.FromSeconds(.1); var stopwatch = Stopwatch.StartNew();