Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server error event #745

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public interface INatsConnection : INatsClient
/// </summary>
public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;

/// <summary>
/// Event that is raised when server sends an error message ('-ERR').
/// </summary>
public event AsyncEventHandler<NatsServerErrorEventArgs>? ServerError;

/// <summary>
/// Server information received from the NATS server.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,15 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
var newPosition = newBuffer.PositionOf((byte)'\n');
var error = ParseError(newBuffer.Slice(0, buffer.GetOffset(newPosition!.Value) - 1));
_logger.LogError(NatsLogEvents.Protocol, "Server error {Error}", error);
_connection.PushEvent(NatsEvent.ServerError, new NatsServerErrorEventArgs(error));
_waitForPongOrErrorSignal.TrySetException(new NatsServerException(error));
return newBuffer.Slice(newBuffer.GetPosition(1, newPosition!.Value));
}
else
{
var error = ParseError(buffer.Slice(0, buffer.GetOffset(position.Value) - 1));
_logger.LogError(NatsLogEvents.Protocol, "Server error {Error}", error);
_connection.PushEvent(NatsEvent.ServerError, new NatsServerErrorEventArgs(error));
_waitForPongOrErrorSignal.TrySetException(new NatsServerException(error));
return buffer.Slice(buffer.GetPosition(1, position.Value));
}
Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal enum NatsEvent
ReconnectFailed,
MessageDropped,
LameDuckModeActivated,
ServerError,
}

public partial class NatsConnection : INatsConnection
Expand Down Expand Up @@ -111,6 +112,8 @@ public NatsConnection(NatsOpts opts)

public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;

public event AsyncEventHandler<NatsServerErrorEventArgs>? ServerError;

public INatsConnection Connection => this;

public NatsOpts Opts { get; }
Expand Down Expand Up @@ -304,6 +307,9 @@ internal ValueTask UnsubscribeAsync(int sid)
return default;
}

internal void PushEvent(NatsEvent @event, NatsEventArgs args)
=> _eventChannel.Writer.TryWrite((@event, args));

private async ValueTask InitialConnectAsync()
{
Debug.Assert(ConnectionState == NatsConnectionState.Connecting, "Connection state");
Expand Down Expand Up @@ -784,6 +790,9 @@ private async Task PublishEventsAsync()
case NatsEvent.LameDuckModeActivated when LameDuckModeActivated != null && args is NatsLameDuckModeActivatedEventArgs uri:
await LameDuckModeActivated.InvokeAsync(this, uri).ConfigureAwait(false);
break;
case NatsEvent.ServerError when ServerError != null && args is NatsServerErrorEventArgs error:
await ServerError.InvokeAsync(this, error).ConfigureAwait(false);
break;
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/NATS.Client.Core/NatsEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ public NatsLameDuckModeActivatedEventArgs(Uri uri)

public Uri Uri { get; }
}

public class NatsServerErrorEventArgs : NatsEventArgs
{
public NatsServerErrorEventArgs(string error)
: base($"Server error {error}") => Error = error;

public string Error { get; }
}
119 changes: 119 additions & 0 deletions tests/NATS.Client.Core2.Tests/ErrorHandlerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Tests;
using NATS.Client.TestUtilities;

namespace NATS.Client.Core2.Tests;

[Collection("nats-server-restricted-user")]
public class ErrorHandlerTest
{
private readonly ITestOutputHelper _output;
private readonly NatsServerRestrictedUserFixture _server;

public ErrorHandlerTest(ITestOutputHelper output, NatsServerRestrictedUserFixture server)
{
_output = output;
_server = server;
}

[Fact]
public async Task Handle_permissions_violation()
{
var logger = new InMemoryTestLoggerFactory(LogLevel.Error);

var proxy = new NatsProxy(_server.Port);
await using var nats = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{proxy.Port}",
LoggerFactory = logger,
AuthOpts = new NatsAuthOpts { Username = "u" },
});

var errors = new List<NatsServerErrorEventArgs>();

nats.ServerError += (_, args) =>
{
lock (errors)
{
errors.Add(args);
}

return default;
};

var prefix = _server.GetNextId();

await nats.PublishAsync("x", $"_{prefix}_published_1_");

await nats.PingAsync();

await Retry.Until(
"published and pinged 1",
() =>
{
var published = false;
foreach (var frame in proxy.AllFrames)
{
if (frame.Origin == "C" && frame.Message.Contains($"_{prefix}_published_1_"))
{
published = true;
continue;
}

if (published && frame.Origin == "S" && frame.Message == "PONG")
{
return true;
}
}

return false;
});

await nats.PublishAsync("y", $"_{prefix}_published_2_");

await nats.PingAsync();

await Retry.Until(
"published and pinged 2",
() =>
{
var published = false;
foreach (var frame in proxy.AllFrames)
{
if (frame.Origin == "C" && frame.Message.Contains($"_{prefix}_published_2_"))
{
published = true;
continue;
}

if (published && frame.Origin == "S" && frame.Message == "PONG")
{
return true;
}
}

return false;
});

await Task.Delay(TimeSpan.FromSeconds(2));

Assert.Contains(proxy.AllFrames, f => f.Origin == "S" && f.Message == "-ERR 'Permissions Violation for Publish to \"y\"'");

await Retry.Until(
"error is logged",
() =>
{
return logger.Logs.Any(x => x.LogLevel == LogLevel.Error && x.Message == "Server error Permissions Violation for Publish to \"y\"");
});

await Retry.Until(
"server error event",
() =>
{
lock (errors)
{
return errors.Any(e => e.Error == "Permissions Violation for Publish to \"y\"");
}
});
}
}
2 changes: 2 additions & 0 deletions tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class MockConnection : INatsConnection
public event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;

public event AsyncEventHandler<NatsServerErrorEventArgs>? ServerError;
#pragma warning restore CS0067

public INatsServerInfo? ServerInfo { get; } = null;
Expand Down
Loading