From 665a73d5d8477946ea4fdd14771d9a9a2a6afbbd Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 18 Feb 2025 11:15:36 +0000 Subject: [PATCH 1/3] Add server error event --- src/NATS.Client.Core/INatsConnection.cs | 5 + .../Internal/NatsReadProtocolProcessor.cs | 2 + src/NATS.Client.Core/NatsConnection.cs | 9 ++ src/NATS.Client.Core/NatsEventArgs.cs | 8 ++ .../ErrorHandlerTest.cs | 119 ++++++++++++++++++ 5 files changed, 143 insertions(+) create mode 100644 tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index a2b75f4ed..0d2221bde 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -30,6 +30,11 @@ public interface INatsConnection : INatsClient /// public event AsyncEventHandler? LameDuckModeActivated; + /// + /// Event that is raised when server sends an error message ('-ERR'). + /// + public event AsyncEventHandler? ServerError; + /// /// Server information received from the NATS server. /// diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index c0d426d15..05e6cbc5e 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -377,6 +377,7 @@ private async ValueTask> DispatchCommandAsync(int code, R var newPosition = newBuffer.PositionOf((byte)'\n'); var error = ParseError(newBuffer.Slice(0, buffer.GetOffset(newPosition!.Value) - 1)); _logger.LogError(NatsLogEvents.Protocol, "Server error {Error}", error); + _connection.PushEvent(NatsEvent.ServerError, new NatsServerErrorEventArgs(error)); _waitForPongOrErrorSignal.TrySetException(new NatsServerException(error)); return newBuffer.Slice(newBuffer.GetPosition(1, newPosition!.Value)); } @@ -384,6 +385,7 @@ private async ValueTask> DispatchCommandAsync(int code, R { var error = ParseError(buffer.Slice(0, buffer.GetOffset(position.Value) - 1)); _logger.LogError(NatsLogEvents.Protocol, "Server error {Error}", error); + _connection.PushEvent(NatsEvent.ServerError, new NatsServerErrorEventArgs(error)); _waitForPongOrErrorSignal.TrySetException(new NatsServerException(error)); return buffer.Slice(buffer.GetPosition(1, position.Value)); } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 267dc7238..d4294ee7d 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -25,6 +25,7 @@ internal enum NatsEvent ReconnectFailed, MessageDropped, LameDuckModeActivated, + ServerError, } public partial class NatsConnection : INatsConnection @@ -111,6 +112,8 @@ public NatsConnection(NatsOpts opts) public event AsyncEventHandler? LameDuckModeActivated; + public event AsyncEventHandler? ServerError; + public INatsConnection Connection => this; public NatsOpts Opts { get; } @@ -304,6 +307,9 @@ internal ValueTask UnsubscribeAsync(int sid) return default; } + internal void PushEvent(NatsEvent @event, NatsEventArgs args) + => _eventChannel.Writer.TryWrite((@event, args)); + private async ValueTask InitialConnectAsync() { Debug.Assert(ConnectionState == NatsConnectionState.Connecting, "Connection state"); @@ -784,6 +790,9 @@ private async Task PublishEventsAsync() case NatsEvent.LameDuckModeActivated when LameDuckModeActivated != null && args is NatsLameDuckModeActivatedEventArgs uri: await LameDuckModeActivated.InvokeAsync(this, uri).ConfigureAwait(false); break; + case NatsEvent.ServerError when ServerError != null && args is NatsServerErrorEventArgs error: + await ServerError.InvokeAsync(this, error).ConfigureAwait(false); + break; } } } diff --git a/src/NATS.Client.Core/NatsEventArgs.cs b/src/NATS.Client.Core/NatsEventArgs.cs index 265da8441..530a62162 100644 --- a/src/NATS.Client.Core/NatsEventArgs.cs +++ b/src/NATS.Client.Core/NatsEventArgs.cs @@ -41,3 +41,11 @@ public NatsLameDuckModeActivatedEventArgs(Uri uri) public Uri Uri { get; } } + +public class NatsServerErrorEventArgs : NatsEventArgs +{ + public NatsServerErrorEventArgs(string error) + : base($"Server error {error}") => Error = error; + + public string Error { get; } +} diff --git a/tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs b/tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs new file mode 100644 index 000000000..302b8dcbe --- /dev/null +++ b/tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs @@ -0,0 +1,119 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core.Tests; +using NATS.Client.TestUtilities; + +namespace NATS.Client.Core2.Tests; + +[Collection("nats-server-restricted-user")] +public class ErrorHandlerTest +{ + private readonly ITestOutputHelper _output; + private readonly NatsServerRestrictedUserFixture _server; + + public ErrorHandlerTest(ITestOutputHelper output, NatsServerRestrictedUserFixture server) + { + _output = output; + _server = server; + } + + [Fact] + public async Task Handle_permissions_violation() + { + var logger = new InMemoryTestLoggerFactory(LogLevel.Error); + + var proxy = new NatsProxy(_server.Port); + await using var nats = new NatsConnection(new NatsOpts + { + Url = $"nats://127.0.0.1:{proxy.Port}", + LoggerFactory = logger, + AuthOpts = new NatsAuthOpts { Username = "u" }, + }); + + var errors = new List(); + + nats.ServerError += (_, args) => + { + lock (errors) + { + errors.Add(args); + } + + return default; + }; + + var prefix = _server.GetNextId(); + + await nats.PublishAsync("x", $"_{prefix}_published_1_"); + + await nats.PingAsync(); + + await Retry.Until( + "published and pinged 1", + () => + { + var published = false; + foreach (var frame in proxy.AllFrames) + { + if (frame.Origin == "C" && frame.Message.Contains($"_{prefix}_published_1_")) + { + published = true; + continue; + } + + if (published && frame.Origin == "S" && frame.Message == "PONG") + { + return true; + } + } + + return false; + }); + + await nats.PublishAsync("y", $"_{prefix}_published_2_"); + + await nats.PingAsync(); + + await Retry.Until( + "published and pinged 2", + () => + { + var published = false; + foreach (var frame in proxy.AllFrames) + { + if (frame.Origin == "C" && frame.Message.Contains($"_{prefix}_published_2_")) + { + published = true; + continue; + } + + if (published && frame.Origin == "S" && frame.Message == "PONG") + { + return true; + } + } + + return false; + }); + + await Task.Delay(TimeSpan.FromSeconds(2)); + + Assert.Contains(proxy.AllFrames, f => f.Origin == "S" && f.Message == "-ERR 'Permissions Violation for Publish to \"y\"'"); + + await Retry.Until( + "error is logged", + () => + { + return logger.Logs.Any(x => x.LogLevel == LogLevel.Error && x.Message == "Server error Permissions Violation for Publish to \"y\""); + }); + + await Retry.Until( + "server error event", + () => + { + lock (errors) + { + return errors.Any(e => e.Error == "Permissions Violation for Publish to \"y\""); + } + }); + } +} From 88afe3bacfde9af3170e1542a0d92d41936c8cc5 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 18 Feb 2025 11:22:36 +0000 Subject: [PATCH 2/3] Format --- tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs b/tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs index 302b8dcbe..3b2a8a199 100644 --- a/tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs +++ b/tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs @@ -103,7 +103,7 @@ await Retry.Until( "error is logged", () => { - return logger.Logs.Any(x => x.LogLevel == LogLevel.Error && x.Message == "Server error Permissions Violation for Publish to \"y\""); + return logger.Logs.Any(x => x.LogLevel == LogLevel.Error && x.Message == "Server error Permissions Violation for Publish to \"y\""); }); await Retry.Until( From 69c6c47e8e591dca0697c019ebd10199690c0e11 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 18 Feb 2025 11:29:49 +0000 Subject: [PATCH 3/3] Fix build --- tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs index 049b3cf6c..6651980e3 100644 --- a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs +++ b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs @@ -94,6 +94,8 @@ public class MockConnection : INatsConnection public event AsyncEventHandler? MessageDropped; public event AsyncEventHandler? LameDuckModeActivated; + + public event AsyncEventHandler? ServerError; #pragma warning restore CS0067 public INatsServerInfo? ServerInfo { get; } = null;